Connecting with CREATE SOURCE
Establish a non-persistent connection to an external data source in RisingWave.
The CREATE SOURCE
command establishes a connection to an external data source without storing the ingested data within RisingWave’s internal storage. This is useful for:
- Quick data exploration: Inspecting data from a source without the overhead of persistent storage.
- Streaming-only use cases: Building real-time applications where you only need to process data as it arrives, without needing historical data.
- Ad-hoc queries: Running one-off queries directly against the source.
- Manual data copying: Creating a temporary connection and then manually copying data to a table using
INSERT INTO ... SELECT
.
Data ingested via CREATE SOURCE
is typically available only while your RisingWave session is active. The exact behavior (continuous streaming vs. one-time snapshot) depends on the nature of the incoming data (continuous stream vs. static data).
Syntax
Parameters
-
CREATE SOURCE [IF NOT EXISTS] source_name
: This is the command to create a connection to a data source.IF NOT EXISTS
: This part is optional. If you includeIF NOT EXISTS
, RisingWave will not return an error if a source with the same name already exists.source_name
: This is a user-defined name for your source (e.g.,my_kafka_source
). Choose a descriptive name.
-
(column_name data_type [AS source_column_name] [NOT NULL], ...)
: This section defines the schema of the data you are ingesting. It’s a comma-separated list of column definitions.column_name
: The name you want to use for the column within RisingWave (e.g.,user_id
).data_type
: The RisingWave data type of the column (e.g.,INT
,VARCHAR
,TIMESTAMP
). See Data Types for a complete list of supported types.AS source_column_name
: This part is optional. UseAS
if the column name in the source is different from the name you want to use in RisingWave. For example:user_id INT AS external_user_id
.NOT NULL
: This part is optional. Use it to specify that a column cannot containNULL
values.
-
[, PRIMARY KEY (column_name, ...)]
: This part is optional. IfPRIMARY KEY
is defined, it only indicates semantic meaning instead of a real constraint. -
WITH (connector='connector_name', connector_property='value', ...)
: This section specifies the connector to use and its connection properties.connector
: This is required. It specifies the name of the connector to use (e.g.,'kafka'
,'pulsar'
,'s3'
). See Sources for a list of available connectors.connector_property='value'
: These are connector-specific settings. The available properties and their required values depend on the connector you are using. See the documentation for each individual connector for details (e.g., for Kafka, you’ll need to specifytopic
andproperties.bootstrap.server
).
-
FORMAT format_type ENCODE encode_type (...)
: This section specifies how the data is formatted and encoded.FORMAT
: This is required. It specifies the high-level data format (e.g.,PLAIN
,UPSERT
,DEBEZIUM
).ENCODE
: This is required. It specifies the specific data encoding (e.g.,JSON
,AVRO
,PROTOBUF
).(...)
: These are format- and encoding-specific options. The available options depend on the chosenFORMAT
andENCODE
. See Data formats and encoding options for details.
Example (Kafka)
This example creates a connection to a Kafka topic named user_activity
.
Querying a source
Once you’ve created a source, you can query it directly using SELECT
:
Next steps
- See all Sources.
- Learn about Data formats and encoding options.
- See the documentation for specific connectors (e.g., Connect to Kafka).
Was this page helpful?