This topic describes how to sink data from RisingWave to a Kafka broker and how to specify security (encryption and authentication) settings.
CREATE SINK
statement to create a sink. You can create a sink with data from a materialized view or a table. RisingWave only supports writing messages in non-transactional mode.
WITH
options are required unless explicitly mentioned as optional.
Parameter or clause | Description |
---|---|
sink_name | Name of the sink to be created. |
sink_from | A clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified. |
AS select_query | A SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See SELECT for the syntax and examples of the SELECT command. |
connector | Sink connector type must be kafka for Kafka sink. |
properties.bootstrap.server | Address of the Kafka broker. Format: ip:port . If there are multiple brokers, separate them with commas. |
topic | Address of the Kafka topic. One sink can only correspond to one topic. |
primary_key | Conditional. The primary keys of the sink. Use , to delimit the primary key columns. This field is optional if creating a PLAIN sink, but required if creating a DEBEZIUM or UPSERT sink. |
WITH
option. For additional details on these parameters, see the Configuration properties.
Kafka parameter name | RisingWave parameter name | Type |
---|---|---|
allow.auto.create.topics | properties.allow.auto.create.topics | bool |
batch.num.messages | properties.batch.num.messages | int |
batch.size | properties.batch.size | int |
client.id | properties.client.id | string |
enable.idempotence | properties.enable.idempotence | bool |
enable.ssl.certificate.verification | properties.enable.ssl.certificate.verification | bool |
max.in.flight.requests.per.connection | properties.max.in.flight.requests.per.connection | int |
message.max.bytes | properties.message.max.bytes | int |
message.send.max.retries | properties.message.send.max.retries | int |
message.timeout.ms | properties.message.timeout.ms | int |
queue.buffering.max.kbytes | properties.queue.buffering.max.kbytes | int |
queue.buffering.max.messages | properties.queue.buffering.max.messages | int |
queue.buffering.max.ms | properties.queue.buffering.max.ms | float |
request.required.acks | properties.request.required.acks | int |
retry.backoff.ms | properties.retry.backoff.ms | int |
receive.message.max.bytes | properties.receive.message.max.bytes | int |
ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |
statistics.interval.ms | properties.statistics.interval.ms | int |
properties.ssl.endpoint.identification.algorithm
to none
to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either https
or none
. By default, it is https
.
To monitor Kafka metrics in Grafana, set properties.statistics.interval.ms
to a non-zero value. The granularity is 1000ms.
Starting with version 2.0, the default value for properties.message.timeout.ms
has changed from 5 seconds to 5 minutes, aligning with the default setting in the official Kafka library.
FORMAT data_format ENCODE data_encode (key = 'value')
, instead of the WITH
clause.Field | Notes |
---|---|
data_format | Data format. Allowed formats:
|
data_encode | Data encode. Allowed encodes:
UPSERT PROTOBUF sinks, you must specify key encode text , while it remains optional for other format/encode combinations. |
force_append_only | If true, forces the sink to be PLAIN (also known as append-only), even if it cannot be. |
timestamptz.handling.mode | Controls the timestamptz output format. This parameter specifically applies to append-only or upsert sinks using JSON encoding.
|
schemas.enable | Only configurable for upsert JSON sinks. By default, this value is false for upsert JSON sinks and true for debezium JSON sinks. If true, RisingWave will sink the data with the schema to the Kafka sink. This is not referring to a schema registry containing a JSON schema, but rather schema formats defined using Kafka Connect. |
key_encode | Optional. When specified, the key encode can only be TEXT or BYTES. If set to TEXT, the primary key should be one and only one of the following types: varchar , bool , smallint , int , and bigint ; If set to BYTES, the primary key should be one and only one of type bytea ; When absent, both key and value will use the same setting of ENCODE data_encode ( ... ) . |
FORMAT UPSERT ENCODE AVRO
or FORMAT PLAIN ENCODE AVRO
.
Field | Notes |
---|---|
schema.registry | Required. The address of the schema registry. |
schema.registry.username | Optional. The user name used to access the schema registry. |
schema.registry.password | Optional. The password associated with the user name. |
schema.registry.name.strategy | Optional. Accepted options include topic_name_strategy (default), record_name_strategy, and topic_record_name_strategy. |
key.message | Required if schema.registry.name.strategy is set to record_name_strategy or topic_record_name_strategy. |
message | Required if schema.registry.name.strategy is set to record_name_strategy or topic_record_name_strategy. |
FORMAT PLAIN ENCODE AVRO (...)
or FORMAT UPSERT ENCODE AVRO (...)
clause. This allows RisingWave to load schemas from and encode metadata for AWS Glue Schema Registry, in addition to Confluent Schema Registry.
Auth-related configurations:
Parameter | Description |
---|---|
aws.region | The region of the AWS Glue Schema Registry. For example, us-west-2 . |
aws.credentials.access_key_id | Your AWS access key ID. |
aws.credentials.secret_access_key | Your AWS secret access key. |
aws.credentials.role.arn | The Amazon Resource Name (ARN) of the role to assume. For example, arn:aws:iam::123456123456:role/MyGlueRole . This IAM role shall be granted permissions for the action glue:GetSchemaVersion . |
Parameter | Description |
---|---|
aws.glue.schema_arn | The ARN of the schema in AWS Glue Schema Registry. For example, 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent' . |
FORMAT PLAIN ENCODE PROTOBUF
or FORMAT UPSERT ENCODE PROTOBUF
.
Field | Notes |
---|---|
message | Required. Package qualified message name of the main Message in the schema definition. |
schema.location | Required if schema.registry is not specified. Only one of schema.location or schema.registry can be defined. The schema location. This can be in either file:// , http:// , https:// format. |
schema.registry | Required if schema.location is not specified. Only one of schema.location or schema.registry can be defined. The address of the schema registry. |
schema.registry.username | Optional. The user name used to access the schema registry. |
schema.registry.password | Optional. The password associated with the user name. |
schema.registry.name.strategy | Optional. Accepted options include topic_name_strategy (default), record_name_strategy, and topic_record_name_strategy. |
file://
format is not recommended for production use. If it is used, it needs to be available for both meta and compute nodes.jsonb.handling.mode
jsonb.handling.mode
determines how jsonb
data types are encoded. This parameter has two possible values:
string
: Encodes the jsonb
type to a string. For example, if you set this parameter, {"k": 2}
will be converted to "{\"k\": 2}"
.dynamic
: Dynamically encodes a jsonb
type value to a JSON type value. For example, if you set this parameter, {"k": 2}
will be converted to {"k": 2}
. Here the jsonb
value is encoded to a JSON object type value.WITH
clause of ENCODE JSON
.
"0x05fb93d677c4e000"
instead of a JSON number 431100738685689856
. This string form avoids JSON number precision issues with large int64 values, and you can still order by the fixed-length hexadecimal string to obtain the same order as the serial number (whereas variable-length string "12"
sorts before "7"
).
message.max.bytes
set at 2000 by setting properties.message.max.bytes
to 2000.
distance
and duration
from taxi_trips
.
The schema of taxi_trips
is like this:
CREATE SINK
statement, specify the following parameters.
Parameter | Notes |
---|---|
privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. |
privatelink.endpoint | The DNS name of the VPC endpoint. If you’re using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in Create a VPC connection. |
{"port": 8001}
corresponds to the broker ip1:9092
, and {"port": 8002}
corresponds to the broker ip2:9092
.
SSL
is used in configuration and code instead of TLS
.
Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.
RisingWave supports these SASL authentication mechanisms:
SASL/PLAIN
SASL/SCRAM
SASL/GSSAPI
SASL/OAUTHBEARER
CREATE SINK
statement.
CREATE SINK
statement.Parameter | Notes |
---|---|
properties.security.protocol | Set to SSL. |
properties.ssl.ca.location | |
properties.ssl.certificate.location | |
properties.ssl.key.location | |
properties.ssl.key.password |
properties.
and therefore do not include this prefix.RisingWave Data Type | Schema Type in JSON | Schema Name in JSON |
---|---|---|
boolean | boolean | n/a |
smallint | int16 | n/a |
integer | int32 | n/a |
bigint | int64 | n/a |
real | float | n/a |
double precision | double | n/a |
character varying | string | n/a |
bytea | bytes | n/a |
numeric | string | n/a |
date | int32 | org.apache.kafka.connect.data.Date |
time without time zone | int64 | org.apache.kafka.connect.data.Time |
timestamp | int64 | org.apache.kafka.connect.data.Timestamp |
timestamptz | string | io.debezium.time.ZonedTimestamp |
interval | string | io.debezium.time.Interval |
JSONB | string | io.debezium.data.Json |
struct | string | n/a |
array | string | n/a |