Ingest additional fields with INCLUDE clause
Extract fields not in the payload as separate columns using the INCLUDE clause.
When creating a table or source with an external connector, you can use the INCLUDE
clause to extract fields not included in the main message payload (the data defined by your schema) as separate columns. This is useful for accessing metadata like Kafka message offsets, timestamps, or keys.
Syntax
-
{ header | key | offset | partition | timestamp | payload | subject | file | database_name | collection_name }
: The field you want to include. The available fields depend on the connector (see Supported Connectors below). -
[AS column_name]
: (Optional) The name you want to give to the new column in RisingWave. If you omit AS column_name, RisingWave uses a default name in the format rw_, where:{connector}
is the connector name (e.g., kafka, pulsar).{col}
is the type of column (e.g., key, offset, timestamp).- Example: For an offset column from Kafka, the default name would be _rw_kafka_offset.
You include this clause after the schema definition in your CREATE SOURCE or CREATE TABLE statement:
Important Note for UPSERT Formats: When using UPSERT formats (which handle updates and deletes based on a key), you must use INCLUDE KEY. You cannot define a multi-column primary key in this case; the included key becomes the primary key.
Supported connectors
The INCLUDE clause is supported by several RisingWave connectors. For the specific fields supported by each connector, and their default data types, see the documentation for the individual connector. For example:
- Connect to Kafka
- Connect to Kinesis
- Connect to Pulsar
- Connect to MongoDB CDC
- Connect to NATS JetStream
- Connect to MQTT
- Connect to S3, GCS, or Azure Blob
Examples
Include all supported fields
Here we create a table, additional_columns, that ingests data from a Kafka broker. Aside from the a column, which is part of the message payload, the additional fields key, partition, offset, timestamp, header, and payload are also added to the table.
Include a timestamp column
Include header columns
There are two ways to define header columns.
You can choose to generate headers with type List[Struct<Varchar, Bytea>]
.
Or you can generate a type bytea header, where the column content will be specified as the value associated with the specified key, header_col. The header_col
field can only be defined when including a header.
In this case, the generated column name will have the format _rw_kafka_header_{header col name}_{col type}
, where col type
is the data type of the header column.
Include payload when upstream schema is unknown
Use the payload keyword to ingest JSON data when you are unsure of the exact schema beforehand. Instead of defining specific column names and types at the very beginning, you can load all JSON data first and then prune and filter the data during runtime.
Was this page helpful?