When connecting to a data source, you specify how the data is formatted and encoded using the FORMAT and ENCODE keywords in your CREATE SOURCE or CREATE TABLE statement. This page explains the supported combinations and how to configure them.

Understanding FORMAT and ENCODE

  • FORMAT: Specifies the high-level structure of the data. Options:

    • PLAIN: Standard formats like JSON, Avro, Protobuf, CSV, Bytes, or Parquet.
    • UPSERT: A stream of updates and deletes (key-value pairs), typically from Kafka.
    • DEBEZIUM: The Debezium change data capture (CDC) format.
    • MAXWELL: The Maxwell CDC format.
    • CANAL: The Canal CDC format (TiCDC dialect).
    • DEBEZIUM_MONGO: The Debezium CDC format for MongoDB.
  • ENCODE: Specifies the specific data encoding. Options:

    • JSON: JavaScript Object Notation.
    • AVRO: Apache Avro.
    • PROTOBUF: Protocol Buffers.
    • CSV: Comma-Separated Values.
    • BYTES: Raw byte stream (no decoding).
    • PARQUET: Apache Parquet

FORMAT and ENCODE work together to define how RisingWave interprets incoming data.

Supported combinations

FORMATENCODEDescription
PLAINJSONStandard JSON data.
UPSERTJSONJSON for updates/deletes (key-value pairs).
PLAINAVROStandard Avro data (requires schema registry).
UPSERTAVROAvro for updates/deletes (requires schema registry).
DEBEZIUMAVRODebezium-formatted Avro (requires schema registry).
PLAINPROTOBUFStandard Protobuf data (requires schema location/registry).
DEBEZIUMJSONDebezium-formatted JSON data.
MAXWELLJSONMaxwell-formatted JSON data.
CANALJSONCanal-formatted JSON data.
DEBEZIUM_MONGOJSONDebezium-formatted JSON data for MongoDB.
PLAINCSVComma-Separated Values.
PLAINBYTESRaw byte stream.
PLAINPARQUETApache Parquet

General considerations

  • Schema Registry: For Avro and Protobuf, you often need a schema registry (Confluent Schema Registry or AWS Glue Schema Registry). The schema.registry parameter (where applicable) can accept multiple addresses; RisingWave tries each until it finds the schema.
  • TopicNameStrategy: For Avro, RisingWave uses the TopicNameStrategy by default for the schema registry, looking for a schema with the subject name {topic name}-value.

JSON handling

RisingWave directly decodes JSON data. Different FORMAT options add specific behaviors:

PLAIN JSON

Standard JSON ingestion. Define the schema in your CREATE SOURCE/CREATE TABLE statement or use a schema registry.

FORMAT PLAIN
ENCODE JSON [ (
   schema.registry = 'schema_registry_url [, ...]',
   [schema.registry.username = 'username'],
   [schema.registry.password = 'password']
) ]

## `UPSERT JSON`

For Kafka topics with updates/deletes (key-value pairs). RisingWave inserts, updates, or deletes rows based on the key and value.

```sql
FORMAT UPSERT
ENCODE JSON [ (
   schema.registry = 'schema_registry_url [, ...]',
   [schema.registry.username = 'username'],
   [schema.registry.password = 'password']
) ]

DEBEZIUM JSON

For Debezium CDC data. You can usually define the schema directly. The ignore_key option (default: false) lets you consume only the payload.

FORMAT DEBEZIUM
ENCODE JSON [ (
   [ ignore_key = 'true | false ' ]
) ]

DEBEZIUM_MONGO JSON

When loading data from MongoDB via Kafka topics in Debezium Mongo JSON format, the source table schema has a few limitations. The table schema must have the columns _id and payload, where _id comes from the MongoDB document’s id and is the primary key, and payload is type jsonb and contains the rest of the document. If the document’s _id is type ObjectID, then when creating the column in RisingWave, specify the type of _id as varchar. If the document’s _id is of type int32 or int64, specify the type of _id as int or bigint in RisingWave.

FORMAT DEBEZIUM_MONGO
ENCODE JSON

MAXWELL JSON

For Maxwell CDC data. Define the schema directly.

label="Syntax"
FORMAT MAXWELL
ENCODE JSON

CANAL JSON

For Canal CDC data (TiCDC dialect). Define the schema directly.

label="Syntax"
FORMAT CANAL
ENCODE JSON

AVRO handling

RisingWave supports Apache Avro. You must use a schema registry with Avro.

PLAIN AVRO

Standard Avro ingestion.

FORMAT PLAIN
ENCODE AVRO (
    schema.registry = 'schema_registry_url [, ...]',
)

You can ingest Avro map type into RisingWave map type or jsonb:

FORMAT [ DEBEZIUM | UPSERT | PLAIN ] ENCODE AVRO (
    map.handling.mode = 'map' | 'jsonb'
)

UPSERT AVRO

Handles Avro updates/deletes (key-value pairs) from Kafka.

FORMAT UPSERT
ENCODE AVRO (
   schema.registry = 'schema_registry_url [, ...]',
)

DEBEZIUM AVRO

For Debezium-formatted Avro. Specify the message and schema.registry.

FORMAT DEBEZIUM
ENCODE AVRO (
    message = 'main_message',
    schema.registry = 'schema_registry_url [, ...]',
    [ignore_key = 'true | false']
)

Protobuf handling

PLAIN PROTOBUF

For data in protobuf format, you must specify a message (fully qualified by package path) and a schema location. The schema location can be an actual Web location that is in http://..., https://..., or S3://... format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from. For more details about using Schema Registry for Kafka data, see Kafka source configurations.

If you provide a file location, the schema file must be a FileDescriptorSet, which can be compiled from a .proto file with a command like this:

protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.proto
FORMAT PLAIN
ENCODE PROTOBUF (
   message = 'com.example.MyMessage',
   schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
)

CSV handling

PLAIN CSV

To consume data in CSV format, you can use ENCODE PLAIN FORMAT CSV with options. Configurable options include delimiter and without_header.

FORMAT PLAIN
ENCODE CSV (
    delimiter = 'delimiter',
    without_header = 'false' | 'true'
)

Bytes handling

PLAIN BYTES

RisingWave allows you to read data streams without decoding the data by using the BYTES row format. However, the table or source can have exactly one field of BYTEA data.

FORMAT PLAIN
ENCODE BYTES

Parquet handling

PLAIN PARQUET

Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage.

FORMAT PLAIN
ENCODE PARQUET

Parameter reference

FORMATENCODEParameterDescriptionRequired
PLAINAVROschema.registryURL(s) of the schema registry.Yes
UPSERTAVROschema.registryURL(s) of the schema registry.Yes
DEBEZIUMAVROschema.registryURL(s) of the schema registry.Yes
DEBEZIUMAVROmessageThe main message name.Yes
DEBEZIUMAVROignore_keyWhether to ignore the key (default: false).No
PLAINJSONschema.registryURL(s) of the schema registry (optional).No
PLAINJSONschema.registry.usernameThe username for schema registry (optional).No
PLAINJSONschema.registry.passwordThe password for schema registry (optional).No
UPSERTJSONschema.registryURL(s) of the schema registry (optional).No
UPSERTJSONschema.registry.usernameThe username for schema registry (optional).No
UPSERTJSONschema.registry.passwordThe password for schema registry (optional).No
PLAINPROTOBUFmessageFully qualified name of the Protobuf message.Yes
PLAINPROTOBUFschema.locationURL of the schema file (or schema.registry).Yes
PLAINPROTOBUFschema.registryURL(s) of the schema registry (alternative to schema.location).Yes
DEBEZIUMJSONignore_keyWhether to ignore the key (default: false).No
PLAINCSVdelimiterDelimiter character.Yes
PLAINCSVwithout_headerWhether the CSV data has a header row (default: false).No
PLAINAVROmap.handling.modeHow to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’.No
UPSERTAVROmap.handling.modeHow to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’.No
DEBEZIUMAVROmap.handling.modeHow to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’.No

General parameters

timestamptz.handling.mode

The timestamptz.handling.mode parameter controls the input format for timestamptz values. It accepts the following values:

  • micro: The input number will be interpreted as the number of microseconds since 1970-01-01T00:00:00Z in UTC.

  • milli: The input number will be interpreted as the number of milliseconds since 1970-01-01T00:00:00Z in UTC.

  • guess_number_unit: This has been the default setting and restricts the range of timestamptz values to [1973-03-03 09:46:40, 5138-11-16 09:46:40) in UTC.

  • utc_string: This format is the least ambiguous and can usually be correctly inferred without needing explicit specification.

  • utc_without_suffix: Allows the user to indicate that a naive timestamp is in UTC, rather than local time.

You can set this parameter for these three combinations:

  • FORMAT PLAIN ENCODE JSON
  • FORMAT UPSERT ENCODE JSON
  • FORMAT DEBEZIUM ENCODE JSON

You cannot set this parameter for these three combinations:

  • FORMAT DEBEZIUM_MONGO ENCODE JSON
  • FORMAT MAXWELL ENCODE JSON
  • FORMAT CANAL ENCODE JSON