Data formats and encoding options
Configure RisingWave to handle various data formats and encodings from your sources.
When connecting to a data source, you specify how the data is formatted and encoded using the FORMAT
and ENCODE
keywords in your CREATE SOURCE
or CREATE TABLE
statement. This page explains the supported combinations and how to configure them.
Understanding FORMAT
and ENCODE
-
FORMAT
: Specifies the high-level structure of the data. Options:PLAIN
: Standard formats like JSON, Avro, Protobuf, CSV, Bytes, or Parquet.UPSERT
: A stream of updates and deletes (key-value pairs), typically from Kafka.DEBEZIUM
: The Debezium change data capture (CDC) format.MAXWELL
: The Maxwell CDC format.CANAL
: The Canal CDC format (TiCDC dialect).DEBEZIUM_MONGO
: The Debezium CDC format for MongoDB.
-
ENCODE
: Specifies the specific data encoding. Options:JSON
: JavaScript Object Notation.AVRO
: Apache Avro.PROTOBUF
: Protocol Buffers.CSV
: Comma-Separated Values.BYTES
: Raw byte stream (no decoding).PARQUET
: Apache Parquet
FORMAT
and ENCODE
work together to define how RisingWave interprets incoming data.
Supported combinations
FORMAT | ENCODE | Description |
---|---|---|
PLAIN | JSON | Standard JSON data. |
UPSERT | JSON | JSON for updates/deletes (key-value pairs). |
PLAIN | AVRO | Standard Avro data (requires schema registry). |
UPSERT | AVRO | Avro for updates/deletes (requires schema registry). |
DEBEZIUM | AVRO | Debezium-formatted Avro (requires schema registry). |
PLAIN | PROTOBUF | Standard Protobuf data (requires schema location/registry). |
DEBEZIUM | JSON | Debezium-formatted JSON data. |
MAXWELL | JSON | Maxwell-formatted JSON data. |
CANAL | JSON | Canal-formatted JSON data. |
DEBEZIUM_MONGO | JSON | Debezium-formatted JSON data for MongoDB. |
PLAIN | CSV | Comma-Separated Values. |
PLAIN | BYTES | Raw byte stream. |
PLAIN | PARQUET | Apache Parquet |
General considerations
- Schema Registry: For Avro and Protobuf, you often need a schema registry (Confluent Schema Registry or AWS Glue Schema Registry). The
schema.registry
parameter (where applicable) can accept multiple addresses; RisingWave tries each until it finds the schema. TopicNameStrategy
: For Avro, RisingWave uses theTopicNameStrategy
by default for the schema registry, looking for a schema with the subject name{topic name}-value
.
JSON handling
RisingWave directly decodes JSON data. Different FORMAT
options add specific behaviors:
PLAIN JSON
Standard JSON ingestion. Define the schema in your CREATE SOURCE
/CREATE TABLE
statement or use a schema registry.
DEBEZIUM JSON
For Debezium CDC data. You can usually define the schema directly. The ignore_key
option (default: false) lets you consume only the payload.
DEBEZIUM_MONGO JSON
When loading data from MongoDB via Kafka topics in Debezium Mongo JSON format, the source table schema has a few limitations. The table schema must have the columns _id and payload, where _id comes from the MongoDB document’s id and is the primary key, and payload is type jsonb and contains the rest of the document. If the document’s _id is type ObjectID, then when creating the column in RisingWave, specify the type of _id as varchar. If the document’s _id is of type int32 or int64, specify the type of _id as int or bigint in RisingWave.
MAXWELL JSON
For Maxwell CDC data. Define the schema directly.
CANAL JSON
For Canal CDC data (TiCDC dialect). Define the schema directly.
AVRO handling
RisingWave supports Apache Avro. You must use a schema registry with Avro.
PLAIN AVRO
Standard Avro ingestion.
You can ingest Avro map type into RisingWave map type or jsonb:
UPSERT AVRO
Handles Avro updates/deletes (key-value pairs) from Kafka.
DEBEZIUM AVRO
For Debezium-formatted Avro. Specify the message and schema.registry.
Protobuf handling
PLAIN PROTOBUF
For data in protobuf format, you must specify a message (fully qualified by package path) and a schema location. The schema location can be an actual Web location that is in http://...
, https://...
, or S3://...
format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from.
For more details about using Schema Registry for Kafka data, see Kafka source configurations.
If you provide a file location, the schema file must be a FileDescriptorSet
, which can be compiled from a .proto
file with a command like this:
CSV handling
PLAIN CSV
To consume data in CSV format, you can use ENCODE PLAIN FORMAT CSV
with options. Configurable options include delimiter and without_header.
Bytes handling
PLAIN BYTES
RisingWave allows you to read data streams without decoding the data by using the BYTES
row format. However, the table or source can have exactly one field of BYTEA
data.
Parquet handling
PLAIN PARQUET
Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage.
Parameter reference
FORMAT | ENCODE | Parameter | Description | Required |
---|---|---|---|---|
PLAIN | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
UPSERT | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
DEBEZIUM | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
DEBEZIUM | AVRO | message | The main message name. | Yes |
DEBEZIUM | AVRO | ignore_key | Whether to ignore the key (default: false). | No |
PLAIN | JSON | schema.registry | URL(s) of the schema registry (optional). | No |
PLAIN | JSON | schema.registry.username | The username for schema registry (optional). | No |
PLAIN | JSON | schema.registry.password | The password for schema registry (optional). | No |
UPSERT | JSON | schema.registry | URL(s) of the schema registry (optional). | No |
UPSERT | JSON | schema.registry.username | The username for schema registry (optional). | No |
UPSERT | JSON | schema.registry.password | The password for schema registry (optional). | No |
PLAIN | PROTOBUF | message | Fully qualified name of the Protobuf message. | Yes |
PLAIN | PROTOBUF | schema.location | URL of the schema file (or schema.registry). | Yes |
PLAIN | PROTOBUF | schema.registry | URL(s) of the schema registry (alternative to schema.location). | Yes |
DEBEZIUM | JSON | ignore_key | Whether to ignore the key (default: false). | No |
PLAIN | CSV | delimiter | Delimiter character. | Yes |
PLAIN | CSV | without_header | Whether the CSV data has a header row (default: false). | No |
PLAIN | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
UPSERT | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
DEBEZIUM | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
General parameters
timestamptz.handling.mode
The timestamptz.handling.mode
parameter controls the input format for timestamptz values. It accepts the following values:
-
micro
: The input number will be interpreted as the number of microseconds since 1970-01-01T00:00:00Z in UTC. -
milli
: The input number will be interpreted as the number of milliseconds since 1970-01-01T00:00:00Z in UTC. -
guess_number_unit
: This has been the default setting and restricts the range of timestamptz values to [1973-03-03 09:46:40, 5138-11-16 09:46:40) in UTC. -
utc_string
: This format is the least ambiguous and can usually be correctly inferred without needing explicit specification. -
utc_without_suffix
: Allows the user to indicate that a naive timestamp is in UTC, rather than local time.
You can set this parameter for these three combinations:
FORMAT PLAIN ENCODE JSON
FORMAT UPSERT ENCODE JSON
FORMAT DEBEZIUM ENCODE JSON
You cannot set this parameter for these three combinations:
FORMAT DEBEZIUM_MONGO ENCODE JSON
FORMAT MAXWELL ENCODE JSON
FORMAT CANAL ENCODE JSON
Was this page helpful?