Configure RisingWave to handle various data formats and encodings from your sources.
FORMAT
and ENCODE
keywords in your CREATE SOURCE
or CREATE TABLE
statement. This page explains the supported combinations and how to configure them.
FORMAT
and ENCODE
FORMAT
: Specifies the high-level structure of the data. Options:
PLAIN
: Standard formats like JSON, Avro, Protobuf, CSV, Bytes, or Parquet.UPSERT
: A stream of updates and deletes (key-value pairs), typically from Kafka.DEBEZIUM
: The Debezium change data capture (CDC) format.MAXWELL
: The Maxwell CDC format.CANAL
: The Canal CDC format (TiCDC dialect).DEBEZIUM_MONGO
: The Debezium CDC format for MongoDB.ENCODE
: Specifies the specific data encoding. Options:
JSON
: JavaScript Object Notation.AVRO
: Apache Avro.PROTOBUF
: Protocol Buffers.CSV
: Comma-Separated Values.BYTES
: Raw byte stream (no decoding).PARQUET
: Apache ParquetFORMAT
and ENCODE
work together to define how RisingWave interprets incoming data.
FORMAT | ENCODE | Description |
---|---|---|
PLAIN | JSON | Standard JSON data. |
UPSERT | JSON | JSON for updates/deletes (key-value pairs). |
PLAIN | AVRO | Standard Avro data (requires schema registry). |
UPSERT | AVRO | Avro for updates/deletes (requires schema registry). |
DEBEZIUM | AVRO | Debezium-formatted Avro (requires schema registry). |
PLAIN | PROTOBUF | Standard Protobuf data (requires schema location/registry). |
DEBEZIUM | JSON | Debezium-formatted JSON data. |
MAXWELL | JSON | Maxwell-formatted JSON data. |
CANAL | JSON | Canal-formatted JSON data. |
DEBEZIUM_MONGO | JSON | Debezium-formatted JSON data for MongoDB. |
PLAIN | CSV | Comma-Separated Values. |
PLAIN | BYTES | Raw byte stream. |
PLAIN | PARQUET | Apache Parquet |
schema.registry
parameter (where applicable) can accept multiple addresses; RisingWave tries each until it finds the schema.TopicNameStrategy
: For Avro, RisingWave uses the TopicNameStrategy
by default for the schema registry, looking for a schema with the subject name {topic name}-value
.FORMAT
options add specific behaviors:
PLAIN JSON
CREATE SOURCE
/CREATE TABLE
statement or use a schema registry.
DEBEZIUM JSON
ignore_key
option (default: false) lets you consume only the payload.
DEBEZIUM_MONGO JSON
MAXWELL JSON
CANAL JSON
PLAIN AVRO
UPSERT AVRO
DEBEZIUM AVRO
PLAIN PROTOBUF
http://...
, https://...
, or S3://...
format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from.
For more details about using Schema Registry for Kafka data, see Kafka source configurations.
If you provide a file location, the schema file must be a FileDescriptorSet
, which can be compiled from a .proto
file with a command like this:
PLAIN CSV
ENCODE PLAIN FORMAT CSV
with options. Configurable options include delimiter and without_header.
PLAIN BYTES
BYTES
row format. However, the table or source can have exactly one field of BYTEA
data.
PLAIN PARQUET
Parquet data type | RisingWave file source data type |
---|---|
boolean | boolean |
int16 | smallint |
int32 | int |
int64 | bigint |
float | real |
double | double precision |
string | varchar |
date | date |
decimal | decimal |
int8 | smallint |
uint8 | smallint |
uint16 | int |
uint32 | bigint |
uint64 | decimal |
float16 | double precision |
timestamp(_, Some(_)) | timestamptz |
timestamp(_, None) | timestamp |
FORMAT | ENCODE | Parameter | Description | Required |
---|---|---|---|---|
PLAIN | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
UPSERT | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
DEBEZIUM | AVRO | schema.registry | URL(s) of the schema registry. | Yes |
DEBEZIUM | AVRO | message | The main message name. | Yes |
DEBEZIUM | AVRO | ignore_key | Whether to ignore the key (default: false). | No |
PLAIN | JSON | schema.registry | URL(s) of the schema registry (optional). | No |
PLAIN | JSON | schema.registry.username | The username for schema registry (optional). | No |
PLAIN | JSON | schema.registry.password | The password for schema registry (optional). | No |
UPSERT | JSON | schema.registry | URL(s) of the schema registry (optional). | No |
UPSERT | JSON | schema.registry.username | The username for schema registry (optional). | No |
UPSERT | JSON | schema.registry.password | The password for schema registry (optional). | No |
PLAIN | PROTOBUF | message | Fully qualified name of the Protobuf message. | Yes |
PLAIN | PROTOBUF | schema.location | URL of the schema file (or schema.registry). | Yes |
PLAIN | PROTOBUF | schema.registry | URL(s) of the schema registry (alternative to schema.location). | Yes |
DEBEZIUM | JSON | ignore_key | Whether to ignore the key (default: false). | No |
PLAIN | CSV | delimiter | Delimiter character. | Yes |
PLAIN | CSV | without_header | Whether the CSV data has a header row (default: false). | No |
PLAIN | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
UPSERT | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
DEBEZIUM | AVRO | map.handling.mode | How to ingest Avro map type. Available values: ‘map’(default) and ‘jsonb’. | No |
timestamptz.handling.mode
timestamptz.handling.mode
parameter controls the input format for timestamptz values. It accepts the following values:
micro
: The input number will be interpreted as the number of microseconds since 1970-01-01T00:00:00Z in UTC.
milli
: The input number will be interpreted as the number of milliseconds since 1970-01-01T00:00:00Z in UTC.
guess_number_unit
: This has been the default setting and restricts the range of timestamptz values to [1973-03-03 09:46:40, 5138-11-16 09:46:40) in UTC.
utc_string
: This format is the least ambiguous and can usually be correctly inferred without needing explicit specification.
utc_without_suffix
: Allows the user to indicate that a naive timestamp is in UTC, rather than local time.
FORMAT PLAIN ENCODE JSON
FORMAT UPSERT ENCODE JSON
FORMAT DEBEZIUM ENCODE JSON
FORMAT DEBEZIUM_MONGO ENCODE JSON
FORMAT MAXWELL ENCODE JSON
FORMAT CANAL ENCODE JSON