FORMAT and ENCODE keywords in your CREATE SOURCE or CREATE TABLE statement. This page explains the supported combinations and how to configure them.
Understanding FORMAT and ENCODE
-
FORMAT: Specifies the high-level structure of the data. Options:PLAIN: Standard formats like JSON, Avro, Protobuf, CSV, Bytes, or Parquet.UPSERT: A stream of updates and deletes (key-value pairs), typically from Kafka.DEBEZIUM: The Debezium change data capture (CDC) format.MAXWELL: The Maxwell CDC format.CANAL: The Canal CDC format (TiCDC dialect).DEBEZIUM_MONGO: The Debezium CDC format for MongoDB.
-
ENCODE: Specifies the specific data encoding. Options:JSON: JavaScript Object Notation.AVRO: Apache Avro.PROTOBUF: Protocol Buffers.CSV: Comma-Separated Values.BYTES: Raw byte stream (no decoding).PARQUET: Apache Parquet
FORMAT and ENCODE work together to define how RisingWave interprets incoming data.
Supported combinations
| FORMAT | ENCODE | Description |
|---|---|---|
PLAIN | JSON | Standard JSON data. |
UPSERT | JSON | JSON for updates/deletes (key-value pairs). |
PLAIN | AVRO | Standard Avro data (requires schema registry). |
UPSERT | AVRO | Avro for updates/deletes (requires schema registry). |
DEBEZIUM | AVRO | Debezium-formatted Avro (requires schema registry). |
PLAIN | PROTOBUF | Standard Protobuf data (requires schema location/registry). |
DEBEZIUM | JSON | Debezium-formatted JSON data. |
MAXWELL | JSON | Maxwell-formatted JSON data. |
CANAL | JSON | Canal-formatted JSON data. |
DEBEZIUM_MONGO | JSON | Debezium-formatted JSON data for MongoDB. |
PLAIN | CSV | Comma-Separated Values. |
PLAIN | BYTES | Raw byte stream. |
PLAIN | PARQUET | Apache Parquet |
General considerations
- Schema Registry: For Avro and Protobuf, you often need a schema registry (Confluent Schema Registry or AWS Glue Schema Registry). The
schema.registryparameter (where applicable) can accept multiple addresses; RisingWave tries each until it finds the schema. TopicNameStrategy: For Avro, RisingWave uses theTopicNameStrategyby default for the schema registry, looking for a schema with the subject name{topic name}-value.
JSON handling
RisingWave directly decodes JSON data. DifferentFORMAT options add specific behaviors:
PLAIN JSON
Standard JSON ingestion. Define the schema in your CREATE SOURCE/CREATE TABLE statement or use a schema registry.
UPSERT JSON
For Kafka topics with updates/deletes (key-value pairs). RisingWave inserts, updates, or deletes rows based on the key and value.
When using FORMAT UPSERT, you must include the INCLUDE KEY clause and define a primary key. This format can be used with both CREATE SOURCE and CREATE TABLE.
Example
When using
FORMAT UPSERT with CREATE SOURCE, be aware that upsert sources cannot be used directly with stateful operations like JOIN, aggregations, or window functions. For these use cases, either materialize the source as a view first, or use CREATE TABLE instead. See Upsert sources for details.DEBEZIUM JSON
For Debezium CDC data. You can usually define the schema directly. The ignore_key option (default: false) lets you consume only the payload.
DEBEZIUM_MONGO JSON
When loading data from MongoDB via Kafka topics in Debezium Mongo JSON format, the source table schema has a few limitations. The table schema must have the columns _id and payload, where _id comes from the MongoDB document’s id and is the primary key, and payload is type jsonb and contains the rest of the document. If the document’s _id is type ObjectID, then when creating the column in RisingWave, specify the type of _id as varchar. If the document’s _id is of type int32 or int64, specify the type of _id as int or bigint in RisingWave.
MAXWELL JSON
For Maxwell CDC data. Define the schema directly.
CANAL JSON
For Canal CDC data (TiCDC dialect). Define the schema directly.
AVRO handling
RisingWave supports Apache Avro. You must use a schema registry with Avro.PLAIN AVRO
Standard Avro ingestion.
UPSERT AVRO
Handles Avro updates/deletes (key-value pairs) from Kafka.
DEBEZIUM AVRO
For Debezium-formatted Avro. Specify the message and schema.registry.
Protobuf handling
PLAIN PROTOBUF
For data in protobuf format, you must specify a message (fully qualified by package path) and a schema location. The schema location can be an actual Web location that is in http://..., https://..., or S3://... format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from.
For more details about using Schema Registry for Kafka data, see Kafka source configurations.
If you provide a file location, the schema file must be a FileDescriptorSet, which can be compiled from a .proto file with a command like this:
CSV handling
PLAIN CSV
To consume data in CSV format, you can use ENCODE PLAIN FORMAT CSV with options. Configurable options include delimiter and without_header.
Bytes handling
PLAIN BYTES
RisingWave allows you to read data streams without decoding the data by using the BYTES row format. However, the table or source can have exactly one field of BYTEA data.
Parquet handling
PLAIN PARQUET
Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage.
| Parquet data type | RisingWave file source data type |
|---|---|
| boolean | boolean |
| int16 | smallint |
| int32 | int |
| int64 | bigint |
| float | real |
| double | double precision |
| string | varchar |
| date | date |
| decimal | decimal |
| int8 | smallint |
| uint8 | smallint |
| uint16 | int |
| uint32 | bigint |
| uint64 | decimal |
| float16 | double precision |
timestamp(_, Some(_)) | timestamptz |
timestamp(_, None) | timestamp |
Parquet sources require case-sensitive column names. However, PostgreSQL converts unquoted column names to lowercase by default. To preserve case sensitivity when defining the schema, use double quotes around column names.
Parameter reference
| FORMAT | ENCODE | Parameter | Description | Required |
|---|---|---|---|---|
| PLAIN | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
| UPSERT | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
| DEBEZIUM | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
| DEBEZIUM | AVRO | message | The main message name. | Yes |
| DEBEZIUM | AVRO | ignore_key | Whether to ignore the key (default: false). | No |
| PLAIN | JSON | schema.registry | URL(s) of the schema registry (optional). | No |
| PLAIN | JSON | schema.registry.username | The username for schema registry (optional). | No |
| PLAIN | JSON | schema.registry.password | The password for schema registry (optional). | No |
| UPSERT | JSON | schema.registry | URL(s) of the schema registry (optional). | No |
| UPSERT | JSON | schema.registry.username | The username for schema registry (optional). | No |
| UPSERT | JSON | schema.registry.password | The password for schema registry (optional). | No |
| PLAIN | PROTOBUF | message | Fully qualified name of the Protobuf message. | Yes |
| PLAIN | PROTOBUF | schema.location | URL of the schema file (or schema.registry). | Yes |
| PLAIN | PROTOBUF | schema.registry | URL(s) of the schema registry (alternative to schema.location). | Yes |
| DEBEZIUM | JSON | ignore_key | Whether to ignore the key (default: false). | No |
| PLAIN | CSV | delimiter | Delimiter character. | Yes |
| PLAIN | CSV | without_header | Whether the CSV data has a header row (default: false). | No |
| PLAIN | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
| UPSERT | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
| DEBEZIUM | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
General parameters
timestamptz.handling.mode
The timestamptz.handling.mode parameter controls the input format for timestamptz values. It accepts the following values:
-
micro: The input number will be interpreted as the number of microseconds since 1970-01-01T00:00:00Z in UTC. -
milli: The input number will be interpreted as the number of milliseconds since 1970-01-01T00:00:00Z in UTC. -
guess_number_unit: This has been the default setting and restricts the range of timestamptz values to [1973-03-03 09:46:40, 5138-11-16 09:46:40) in UTC. -
utc_string: This format is the least ambiguous and can usually be correctly inferred without needing explicit specification. -
utc_without_suffix: Allows the user to indicate that a naive timestamp is in UTC, rather than local time.
FORMAT PLAIN ENCODE JSONFORMAT UPSERT ENCODE JSONFORMAT DEBEZIUM ENCODE JSON
FORMAT DEBEZIUM_MONGO ENCODE JSONFORMAT MAXWELL ENCODE JSONFORMAT CANAL ENCODE JSON