Extract metadata from sources
Extract metadata from sources during ingestion.
When creating a table or source with an external connector, you can use the INCLUDE
clause to extract fields not included in the main message payload (the data defined by your schema) as separate columns. This is useful for accessing metadata like Kafka message offsets, timestamps, or keys.
Syntax
-
{ header | key | offset | partition | timestamp | payload | subject | file | database_name | collection_name }
: The field you want to include. The available fields depend on the connector (see Supported Connectors below). -
[AS column_name]
: (Optional) The name you want to give to the new column in RisingWave. If you omit AS column_name, RisingWave uses a default name in the format rw_, where:{connector}
is the connector name (e.g., kafka, pulsar).{col}
is the type of column (e.g., key, offset, timestamp).- Example: For an offset column from Kafka, the default name would be _rw_kafka_offset.
You include this clause after the schema definition in your CREATE SOURCE or CREATE TABLE statement:
Supported connectors
The INCLUDE clause is supported by several RisingWave connectors. For the specific fields supported by each connector, and their default data types, see the documentation for the individual connector. For example:
- Connect to Kafka
- Connect to Kinesis
- Connect to Pulsar
- Connect to MongoDB CDC
- Connect to NATS JetStream
- Connect to MQTT
- Connect to S3, GCS, or Azure Blob
Examples
Include all supported fields
Here we create a table, additional_columns, that ingests data from a Kafka broker. Aside from the a column, which is part of the message payload, the additional fields key, partition, offset, timestamp, header, and payload are also added to the table.
Include a timestamp column
Include header columns
There are two ways to define header columns.
You can choose to generate headers with type List[Struct<Varchar, Bytea>]
.
Or you can generate a type bytea header, where the column content will be specified as the value associated with the specified key, header_col. The header_col
field can only be defined when including a header.
In this case, the generated column name will have the format _rw_kafka_header_{header col name}_{col type}
, where col type
is the data type of the header column.
Include payload when upstream schema is unknown
Use the payload keyword to ingest JSON data when you are unsure of the exact schema beforehand. Instead of defining specific column names and types at the very beginning, you can load all JSON data first and then prune and filter the data during runtime.
Was this page helpful?