When creating a table or source with an external connector, you can use the INCLUDE clause to extract fields not included in the main message payload (the data defined by your schema) as separate columns. This is useful for accessing metadata like Kafka message offsets, timestamps, or keys.

Syntax

INCLUDE { header | key | offset | partition | timestamp | payload | subject | file | database_name | collection_name } [AS column_name]
  • { header | key | offset | partition | timestamp | payload | subject | file | database_name | collection_name }: The field you want to include. The available fields depend on the connector (see Supported Connectors below).

  • [AS column_name]: (Optional) The name you want to give to the new column in RisingWave. If you omit AS column_name, RisingWave uses a default name in the format rw_, where:

    • {connector} is the connector name (e.g., kafka, pulsar).
    • {col} is the type of column (e.g., key, offset, timestamp).
    • Example: For an offset column from Kafka, the default name would be _rw_kafka_offset.

You include this clause after the schema definition in your CREATE SOURCE or CREATE TABLE statement:

CREATE { SOURCE | TABLE } name (
    column_name data_type, ...  -- Your regular schema definition
)
[INCLUDE ... [AS ...]]
WITH (
    connector='...',
    ...
)
FORMAT ... ENCODE ...;
When using UPSERT formats (which handle updates and deletes based on a key), you must use INCLUDE KEY. You cannot define a multi-column primary key in this case; the included key becomes the primary key.

Supported connectors

The INCLUDE clause is supported by several RisingWave connectors. For the specific fields supported by each connector, and their default data types, see the documentation for the individual connector. For example:

  • Connect to Kafka
  • Connect to Kinesis
  • Connect to Pulsar
  • Connect to MongoDB CDC
  • Connect to NATS JetStream
  • Connect to MQTT
  • Connect to S3, GCS, or Azure Blob

Examples

Include all supported fields

Here we create a table, additional_columns, that ingests data from a Kafka broker. Aside from the a column, which is part of the message payload, the additional fields key, partition, offset, timestamp, header, and payload are also added to the table.

CREATE TABLE additional_columns (
  a int,
  primary key (key_col)
)
INCLUDE key AS key_col
INCLUDE partition AS partition_col
INCLUDE offset AS offset_col
INCLUDE timestamp AS timestamp_col
INCLUDE header AS header_col
INCLUDE payload AS payload_col
WITH (
	connector = 'kafka',
  properties.bootstrap.server = 'message_queue:29092',
	topic = 'kafka_additional_columns')
FORMAT UPSERT ENCODE JSON;

Include a timestamp column

CREATE TABLE table_include_timestamp(v1 int, v2 varchar)
INCLUDE timestamp AS event_timestamp
WITH (
 connector='kafka',
 topic='kafka_1_partition_topic',
 properties.bootstrap.server='localhost:9092',
 scan.startup.mode='earliest'
) format plain encode json;

Include header columns

There are two ways to define header columns.

You can choose to generate headers with type List[Struct<Varchar, Bytea>].

INCLUDE header [AS kafka_header]

Or you can generate a type bytea header, where the column content will be specified as the value associated with the specified key, header_col. The header_col field can only be defined when including a header. In this case, the generated column name will have the format _rw_kafka_header_{header col name}_{col type}, where col type is the data type of the header column.

INCLUDE header 'header_col' [AS kafka_header]

Include payload when upstream schema is unknown

Use the payload keyword to ingest JSON data when you are unsure of the exact schema beforehand. Instead of defining specific column names and types at the very beginning, you can load all JSON data first and then prune and filter the data during runtime.

CREATE TABLE table_include_payload (v1 int, v2 varchar)
INCLUDE payload
WITH (
    connector = 'kafka',
    topic = 'kafka_1_partition_topic',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;