For the full list of the sources we support, see Supported sources.

If you choose to persist the data from the source in RisingWave, use the CREATE TABLE command with connector settings. Or if you need to create the primary key (which is required by some formats like FORMAT UPSERT/DEBEZIUM), you have to use CREATE TABLE too. For more details about the differences between sources and tables, see here.

Regardless of whether the data is persisted in RisingWave, you can create materialized views to perform analysis or data transformations.

Syntax

CREATE SOURCE [ IF NOT EXISTS ] source_name (
    col_name data_type [ AS generation_expression ],
    ...
   [ watermark_clause ]
)
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
[ WITH (
    connector='connector_name',
    connector_parameter='value', ...)]
[FORMAT data_format ENCODE data_encode [ (
    message='message',
    schema.location='location', ...) ]
];

Notes

A generated column is defined with non-deterministic functions. When the data is ingested, the function will be evaluated to generate the value of this field.

Names and unquoted identifiers are case-insensitive. Therefore, you must double-quote any of these fields for them to be case-sensitive. See also Identifiers.

To know when a data record is loaded to RisingWave, you can define a column that is generated based on the processing time (<column_name> timestamptz AS proctime()) when creating the table or source. See also proctime().

For a source with schema from an external connector, use * to represent all columns from the external connector first, so that you can define a generated column on the source with an external connector. See the example below.

CREATE SOURCE from_kafka (
  *,
  gen_i32_field INT AS int32_field + 2
)
INCLUDE KEY AS some_key
WITH (
  connector = 'kafka',
  topic = 'test-rw-sink-upsert-avro',
  properties.bootstrap.server = 'message_queue:29092'
)
FORMAT upsert ENCODE AVRO (
  schema.registry = 'http://message_queue:8081'
);

The generated column is created in RisingWave and will not be accessed through the external connector. Therefore, if the external upstream system has a schema, it does not need to include the generated column within the table’s schema in the external system.

Parameter

ParameterDescription
source_nameThe name of the source. If a schema name is given (for example, CREATE SOURCE <schema>.<source> …), then the table is created in the specified schema. Otherwise it is created in the current schema.
col_nameThe name of a column.
data_typeThe data type of a column. With the struct data type, you can create a nested table. Elements in a nested table need to be enclosed with angle brackets (<>).
generation_expressionThe expression for the generated column. For details about generated columns, see Generated columns.
watermark_clauseA clause that defines the watermark for a timestamp column. The syntax is WATERMARK FOR column_name as expr. For details about watermarks, refer to Watermarks.
INCLUDE clauseExtract fields not included in the payload as separate columns. For more details on its usage, see INCLUDE clause.
WITH clauseSpecify the connector settings here if trying to store all the source data. See Supported sources for the full list of supported source as well as links to specific connector pages detailing the syntax for each source.
FORMAT and ENCODE optionsSpecify the data format and the encoding format of the source data. To learn about the supported data formats, see Supported formats.

Please distinguish between the parameters set in the FORMAT and ENCODE options and those set in the WITH clause. Ensure that you place them correctly and avoid any misuse.

Watermarks

RisingWave supports generating watermarks when creating a source. Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. The WATERMARK clause should be used within the schema_definition. For more information on how to create a watermark, see Watermarks.

Change Data Capture (CDC)

Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, and then delivering the changes to a downstream service in real-time.

RisingWave provides native MySQL and PostgreSQL CDC connectors. With these CDC connectors, you can ingest CDC data from these databases directly, without setting up additional services like Kafka.

If Kafka is part of your technical stack, you can also use the Kafka connector in RisingWave to ingest CDC data in the form of Kafka topics from databases into RisingWave. You need to use a CDC tool such as Debezium connector for MySQL or Maxwell’s daemon to convert CDC data into Kafka topics.

For complete step-to-step guides about ingesting MySQL and PostgreSQL data using both approaches, see Ingest data from MySQL and Ingest data from PostgreSQL.

Shared source

Shared source improves resource utilization and data consistency when working with Kafka sources in RisingWave. This will only affect Kafka sources created after the version updated and will not affect any existing Kafka sources.

PUBLIC PREVIEW

This feature is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our Slack channel. Your input is valuable in helping us improve this feature. For more details, see our Public Preview Feature List.

Configure

Shared source is enabled by default. You can also set the session variable streaming_use_shared_source to control whether to enable it.

# change the config in the current session
SET streaming_use_shared_source=[true|false];

# change the default value of the session variable in the cluster
# (the current session is not affected)
ALTER SYSTEM SET streaming_use_shared_source=[true|false];

To completely disable it at the cluster level, go to risingwave.toml configuration file, and set the stream_enable_shared_source to false.

Compared with non-shared source

With non-shared sources, when using the CREATE SOURCE statement:

  • No streaming jobs would be instantiated. A source is just a set of metadata stored in the catalog.
  • Only when a materialized view or sink references the source, a SourceExecutor will be created to start the process of data ingestion.

This leads to increased resource usage and potential inconsistencies:

  • Each SourceExecutor consumed Kafka resources independently, adding pressure to both the Kafka broker and RisingWave.
  • Independent SourceExecutor instances could result in different consumption progress, causing temporary inconsistencies when joining materialized views.

With shared sources, when using the CREATE SOURCE statement:

  • It will instantiate a single SourceExecutor immediately.
  • All materialized views referencing the same source share the SourceExecutor.
  • The downstream materialized views will only forwards data from the upstream sources, instead of consuming from Kafka independently.

This improves resource utilization and consistency.

When creating a materialized view, RisingWave backfills historical data from Kafka. The process blocks the DDL statement until backfill completes.

  • To configure this behavior, use the SET BACKGROUND_DDL command. This is similar to the backfilling procedure when creating a materialized view on tables and materialized views.

  • To monitoring backfill progress, use the SHOW JOBS command or check Kafka Consumer Lag Size in the Grafana dashboard (under Streaming).

If you set up a retention policy or if the external system can only be accessed once (like message queues), and the data is no longer available, any newly created materialized views won’t be able to backfill the complete historical data. This can lead to inconsistencies with earlier materialized views.

Compared with table

A CREATE TABLE statement can provide similar benefits to shared sources, except that it needs to persist all consumed data.

For table with connector, downstream materialized views backfill historical data from the table instead of external sources, which may be more efficient and cause less pressure to the external system. This also gives table stronger consistency guarantee, as historical data will be ensured to be present.

Tables offer other features that enhance their utility in data ingestion workflows. See Table with connectors.

LIMITATION

Currently, shared source is only applicable to Kafka sources. Other sources are unaffected. We plan to gradually upgrade other sources to the be shared as well in the future.

Shared sources do not support ALTER SOURCE. Use non-shared sources if you require this functionality.

See also