The CREATE TABLE command, when used with connector settings, connects to an external data source and continuously ingests the data into RisingWave’s internal storage. This makes the data readily available for querying and analysis, even after your session ends.

Using CREATE TABLE with a connector provides several key benefits:

  • Data storage: Data is stored within RisingWave, so it’s always available, even if the external source becomes unavailable.
  • Improved query performance: Queries against the table are generally faster because the data is stored locally within RisingWave.
  • Primary key support: You can define primary keys on tables, which ensures data consistency and enables efficient handling of updates and deletes (especially important for CDC sources).
  • Historical analysis: You can analyze historical data trends, not just the current stream of data.
  • Indexes: You can create indexes on tables to further optimize query performance.
  • DML statements: You can use standard SQL commands like INSERT, UPDATE, and DELETE to modify the data stored in the table.

Syntax

CREATE TABLE [IF NOT EXISTS] table_name (
    column_name data_type [AS source_column_name] [NOT NULL],
    ...
    [, PRIMARY KEY (column_name, ...)]
)
WITH (
    connector='connector_name',
    connector_property='value',
    ...
)
FORMAT format_type ENCODE encode_type (
    ... -- Format-specific options
);

Parameters

  • CREATE TABLE [IF NOT EXISTS] table_name: This is the command to create a table and connect it to a data source.

    • IF NOT EXISTS: This part is optional. If you include IF NOT EXISTS, RisingWave will not return an error if a table with the same name already exists.
    • table_name: This is a user-defined name for your table (e.g., my_kafka_data). Choose a descriptive name.
  • (column_name data_type [AS source_column_name] [NOT NULL], ...): This section defines the schema of the data you are ingesting. It’s a comma-separated list of column definitions.

    • column_name: The name you want to use for the column within RisingWave (e.g., user_id).
    • data_type: The RisingWave data type of the column (e.g., INT, VARCHAR, TIMESTAMP). See Data Types for a complete list of supported types.
    • AS source_column_name: This part is optional. Use AS if the column name in the source is different from the name you want to use in RisingWave. For example: user_id INT AS external_id.
    • NOT NULL: This part is optional. Use it to specify that a column cannot contain NULL values.
  • [, PRIMARY KEY (column_name, ...)]: This part is optional in general but required when using UPSERT formats (to handle updates and deletes). The primary key uniquely identifies each row in the table.

  • WITH (connector='connector_name', connector_property='value', ...): This section specifies the connector to use and its connection properties.

    • connector: This is required. It specifies the name of the connector to use (e.g., 'kafka', 'pulsar', 's3'). See Sources for a list of available connectors.
    • connector_property='value': These are connector-specific settings. The available properties and their required values depend on the connector you are using. See the documentation for each individual connector for details (e.g., for Kafka, you’ll need to specify topic and properties.bootstrap.server).
  • FORMAT format_type ENCODE encode_type (...): This section specifies how the data is formatted and encoded.

    • FORMAT: This is required. It specifies the high-level data format (e.g., PLAIN, UPSERT, DEBEZIUM).
    • ENCODE: This is required. It specifies the specific data encoding (e.g., JSON, AVRO, PROTOBUF).
    • (...): These are format- and encoding-specific options. The available options depend on the chosen FORMAT and ENCODE. See Data formats and encoding options for details.

Example (Kafka)

This example creates a table named my_kafka_table that connects to a Kafka topic named user_activity. The data from the Kafka topic will be continuously ingested and stored in RisingWave. A primary key is defined on the user_id column.

CREATE TABLE my_kafka_table (
    user_id INT,
    product_id VARCHAR,
    timestamp TIMESTAMP,
    PRIMARY KEY (user_id)
) WITH (
    connector='kafka',
    topic='user_activity',
    properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;

Next steps