The CREATE SOURCE command establishes a connection to an external data source without storing the ingested data within RisingWave’s internal storage. This is useful for:

  • Quick data exploration: Inspecting data from a source without the overhead of persistent storage.
  • Streaming-only use cases: Building real-time applications where you only need to process data as it arrives, without needing historical data.
  • Ad-hoc queries: Running one-off queries directly against the source.
  • Manual data copying: Creating a temporary connection and then manually copying data to a table using INSERT INTO ... SELECT.

Data ingested via CREATE SOURCE is typically available only while your RisingWave session is active. The exact behavior (continuous streaming vs. one-time snapshot) depends on the nature of the incoming data (continuous stream vs. static data).

Syntax

CREATE SOURCE [IF NOT EXISTS] source_name (
    column_name data_type [AS source_column_name] [NOT NULL],
    ...
    [, PRIMARY KEY (column_name, ...)]
)
WITH (
    connector='connector_name',
    connector_property='value',
    ...
)
FORMAT format_type ENCODE encode_type (
    ... -- Format-specific options
);

Parameters

  • CREATE SOURCE [IF NOT EXISTS] source_name: This is the command to create a connection to a data source.

    • IF NOT EXISTS: This part is optional. If you include IF NOT EXISTS, RisingWave will not return an error if a source with the same name already exists.
    • source_name: This is a user-defined name for your source (e.g., my_kafka_source). Choose a descriptive name.
  • (column_name data_type [AS source_column_name] [NOT NULL], ...): This section defines the schema of the data you are ingesting. It’s a comma-separated list of column definitions.

    • column_name: The name you want to use for the column within RisingWave (e.g., user_id).
    • data_type: The RisingWave data type of the column (e.g., INT, VARCHAR, TIMESTAMP). See Data Types for a complete list of supported types.
    • AS source_column_name: This part is optional. Use AS if the column name in the source is different from the name you want to use in RisingWave. For example: user_id INT AS external_user_id.
    • NOT NULL: This part is optional. Use it to specify that a column cannot contain NULL values.
  • [, PRIMARY KEY (column_name, ...)]: This part is optional. If PRIMARY KEY is defined, it only indicates semantic meaning instead of a real constraint.

  • WITH (connector='connector_name', connector_property='value', ...): This section specifies the connector to use and its connection properties.

    • connector: This is required. It specifies the name of the connector to use (e.g., 'kafka', 'pulsar', 's3'). See Sources for a list of available connectors.
    • connector_property='value': These are connector-specific settings. The available properties and their required values depend on the connector you are using. See the documentation for each individual connector for details (e.g., for Kafka, you’ll need to specify topic and properties.bootstrap.server).
  • FORMAT format_type ENCODE encode_type (...): This section specifies how the data is formatted and encoded.

    • FORMAT: This is required. It specifies the high-level data format (e.g., PLAIN, UPSERT, DEBEZIUM).
    • ENCODE: This is required. It specifies the specific data encoding (e.g., JSON, AVRO, PROTOBUF).
    • (...): These are format- and encoding-specific options. The available options depend on the chosen FORMAT and ENCODE. See Data formats and encoding options for details.

Example (Kafka)

This example creates a connection to a Kafka topic named user_activity.

CREATE SOURCE my_kafka_source (
    user_id INT,
    product_id VARCHAR,
    timestamp TIMESTAMP
) WITH (
    connector='kafka',
    topic='user_activity',
    properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;

Querying a source

Once you’ve created a source, you can query it directly using SELECT:

SELECT * FROM my_kafka_source;

Next steps