Connecting with CREATE TABLE
Connect to a data source and store the ingested data in RisingWave.
The CREATE TABLE
command, when used with connector settings, connects to an external data source and continuously ingests the data into RisingWave’s internal storage. This makes the data readily available for querying and analysis, even after your session ends.
Using CREATE TABLE
with a connector provides several key benefits:
- Data storage: Data is stored within RisingWave, so it’s always available, even if the external source becomes unavailable.
- Improved query performance: Queries against the table are generally faster because the data is stored locally within RisingWave.
- Primary key support: You can define primary keys on tables, which ensures data consistency and enables efficient handling of updates and deletes (especially important for CDC sources).
- Historical analysis: You can analyze historical data trends, not just the current stream of data.
- Indexes: You can create indexes on tables to further optimize query performance.
- DML statements: You can use standard SQL commands like INSERT, UPDATE, and DELETE to modify the data stored in the table.
Syntax
Parameters
-
CREATE TABLE [IF NOT EXISTS] table_name
: This is the command to create a table and connect it to a data source.IF NOT EXISTS
: This part is optional. If you includeIF NOT EXISTS
, RisingWave will not return an error if a table with the same name already exists.table_name
: This is a user-defined name for your table (e.g.,my_kafka_data
). Choose a descriptive name.
-
(column_name data_type [AS source_column_name] [NOT NULL], ...)
: This section defines the schema of the data you are ingesting. It’s a comma-separated list of column definitions.column_name
: The name you want to use for the column within RisingWave (e.g.,user_id
).data_type
: The RisingWave data type of the column (e.g.,INT
,VARCHAR
,TIMESTAMP
). See Data Types for a complete list of supported types.AS source_column_name
: This part is optional. UseAS
if the column name in the source is different from the name you want to use in RisingWave. For example:user_id INT AS external_id
.NOT NULL
: This part is optional. Use it to specify that a column cannot containNULL
values.
-
[, PRIMARY KEY (column_name, ...)]
: This part is optional in general but required when usingUPSERT
formats (to handle updates and deletes). The primary key uniquely identifies each row in the table. -
WITH (connector='connector_name', connector_property='value', ...)
: This section specifies the connector to use and its connection properties.connector
: This is required. It specifies the name of the connector to use (e.g.,'kafka'
,'pulsar'
,'s3'
). See Sources for a list of available connectors.connector_property='value'
: These are connector-specific settings. The available properties and their required values depend on the connector you are using. See the documentation for each individual connector for details (e.g., for Kafka, you’ll need to specifytopic
andproperties.bootstrap.server
).
-
FORMAT format_type ENCODE encode_type (...)
: This section specifies how the data is formatted and encoded.FORMAT
: This is required. It specifies the high-level data format (e.g.,PLAIN
,UPSERT
,DEBEZIUM
).ENCODE
: This is required. It specifies the specific data encoding (e.g.,JSON
,AVRO
,PROTOBUF
).(...)
: These are format- and encoding-specific options. The available options depend on the chosenFORMAT
andENCODE
. See Data formats and encoding options for details.
Example (Kafka)
This example creates a table named my_kafka_table
that connects to a Kafka topic named user_activity
. The data from the Kafka topic will be continuously ingested and stored in RisingWave. A primary key is defined on the user_id
column.
Next steps
- See all Sources.
- Learn about Data formats and encoding options.
- See the documentation for specific connectors (e.g., Connect to Kafka).
Was this page helpful?