Establish a non-persistent connection to an external data source in RisingWave.
CREATE SOURCE
command establishes a connection to an external data source without storing the ingested data within RisingWave’s internal storage. This is useful for:
INSERT INTO ... SELECT
.CREATE SOURCE
is typically available only while your RisingWave session is active. The exact behavior (continuous streaming vs. one-time snapshot) depends on the nature of the incoming data (continuous stream vs. static data).CREATE SOURCE [IF NOT EXISTS] source_name
: This is the command to create a connection to a data source.
IF NOT EXISTS
: This part is optional. If you include IF NOT EXISTS
, RisingWave will not return an error if a source with the same name already exists.source_name
: This is a user-defined name for your source (e.g., my_kafka_source
). Choose a descriptive name.(column_name data_type [AS source_column_name] [NOT NULL], ...)
: This section defines the schema of the data you are ingesting. It’s a comma-separated list of column definitions.
column_name
: The name you want to use for the column within RisingWave (e.g., user_id
).data_type
: The RisingWave data type of the column (e.g., INT
, VARCHAR
, TIMESTAMP
). See Data Types for a complete list of supported types.AS source_column_name
: This part is optional. Use AS
if the column name in the source is different from the name you want to use in RisingWave. For example: user_id INT AS external_user_id
.NOT NULL
: This part is optional. Use it to specify that a column cannot contain NULL
values.[, PRIMARY KEY (column_name, ...)]
: This part is optional. If PRIMARY KEY
is defined, it only indicates semantic meaning instead of a real constraint.
WITH (connector='connector_name', connector_property='value', ...)
: This section specifies the connector to use and its connection properties.
connector
: This is required. It specifies the name of the connector to use (e.g., 'kafka'
, 'pulsar'
, 's3'
). See Sources for a list of available connectors.connector_property='value'
: These are connector-specific settings. The available properties and their required values depend on the connector you are using. See the documentation for each individual connector for details (e.g., for Kafka, you’ll need to specify topic
and properties.bootstrap.server
).FORMAT format_type ENCODE encode_type (...)
: This section specifies how the data is formatted and encoded.
FORMAT
: This is required. It specifies the high-level data format (e.g., PLAIN
, UPSERT
, DEBEZIUM
).ENCODE
: This is required. It specifies the specific data encoding (e.g., JSON
, AVRO
, PROTOBUF
).(...)
: These are format- and encoding-specific options. The available options depend on the chosen FORMAT
and ENCODE
. See Data formats and encoding options for details.user_activity
.
SELECT
:
_rw_kafka_timestamp
. You can use this column to filter messages based on the messages’ timestamp: