Skip to main content

Sink to Kafka

This topic describes how to sink data from RisingWave to a Kafka broker and how to specify security (encryption and authentication) settings.

A sink is an external target that you can send data to. To stream data out of RisingWave, you need to create a sink. Use the CREATE SINK statement to create a sink. You can create a sink with data from a materialized view or a table. RisingWave only supports writing messages in non-transactional mode.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='kafka',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
key = 'value'
) ]
;
note

Names and unquoted identifiers are case-insensitive. Therefore, you must double-quote any of these fields for them to be case-sensitive.

Basic parameters

All WITH options are required unless explicitly mentioned as optional.

Parameter or clauseDescription
sink_nameName of the sink to be created.
sink_fromA clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified.
AS select_queryA SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See SELECT for the syntax and examples of the SELECT command.
connectorSink connector type must be 'kafka' for Kafka sink.
properties.bootstrap.serverAddress of the Kafka broker. Format: ‘ip:port’. If there are multiple brokers, separate them with commas.
topicAddress of the Kafka topic. One sink can only correspond to one topic.
primary_keyConditional. The primary keys of the sink. Use ',' to delimit the primary key columns. This field is optional if creating a PLAIN sink, but required if creating a DEBEZIUM or UPSERT sink.

Additional Kafka parameters

When creating a Kafka sink in RisingWave, you can specify the following Kafka-specific parameters. To set the parameter, add the RisingWave equivalent of the Kafka parameter as a WITH option. For additional details on these parameters, see the Configuration properties.

Kafka parameter nameRisingWave parameter nameType
allow.auto.create.topicsproperties.allow.auto.create.topicsbool
batch.num.messagesproperties.batch.num.messagesint
batch.sizeproperties.batch.sizeint
client.idproperties.client.idstring
enable.idempotenceproperties.enable.idempotencebool
enable.ssl.certificate.verificationproperties.enable.ssl.certificate.verificationbool
max.in.flight.requests.per.connectionproperties.max.in.flight.requests.per.connectionint
message.max.bytesproperties.message.max.bytesint
message.send.max.retriesproperties.message.send.max.retriesint
message.timeout.msproperties.message.timeout.msint
queue.buffering.max.kbytesproperties.queue.buffering.max.kbytesint
queue.buffering.max.messagesproperties.queue.buffering.max.messagesint
queue.buffering.max.msproperties.queue.buffering.max.msfloat
retry.backoff.msproperties.retry.backoff.msint
receive.message.max.bytesproperties.receive.message.max.bytesint
ssl.endpoint.identification.algorithmproperties.ssl.endpoint.identification.algorithmstr
note

Set properties.ssl.endpoint.identification.algorithm to none to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either https or none. By default, it is https.

FORMAT and ENCODE options

note

These options should be set in FORMAT data_format ENCODE data_encode (key = 'value'), instead of the WITH clause

FieldNotes
data_formatData format. Allowed formats:
  • PLAIN: Output data with insert operations.
  • DEBEZIUM: Output change data capture (CDC) log in Debezium format.
  • UPSERT: Output data as a changelog stream. primary_key must be specified in this case.
To learn about when to define the primary key if creating an UPSERT sink, see the Overview.
data_encodeData encode. Supported encodes: JSON, AVRO, and PROTOBUF. For AVRO encode, only UPSERT AVRO sinks are supported. For PROTOBUF encode, only PLAIN PROTOBUF sinks are supported.
force_append_onlyIf true, forces the sink to be PLAIN (also known as append-only), even if it cannot be.
timestamptz.handling.modeControls the timestamptz output format. This parameter specifically applies to append-only or upsert sinks using JSON encoding.
- If omitted, the output format of timestamptz is 2023-11-11T18:30:09.453000Z which includes the UTC suffix Z.
- When utc_without_suffix is specified, the format is changed to 2023-11-11 18:30:09.453000.
schemas.enableOnly configurable for upsert JSON sinks. By default, this value is false for upsert JSON sinks and true for debezium JSON sinks. If true, RisingWave will sink the data with the schema to the Kafka sink. Note that this is not referring to a schema registry containing a JSON schema, but rather schema formats defined using Kafka Connect.

Avro specific parameters

When creating an upsert Avro sink, the following options can be used following FORMAT UPSERT ENCODE AVRO.

FieldNotes
schema.registryRequired. The address of the schema registry.
schema.registry.usernameOptional. The user name used to access the schema registry.
schema.registry.passwordOptional. The password associated with the user name.
schema.registry.name.strategyOptional. Accepted options include topic_name_strategy (default), record_name_strategy, and topic_record_name_strategy.
key.messageRequired if schema.registry.name.strategy is set to record_name_strategy or topic_record_name_strategy.
messageRequired if schema.registry.name.strategy is set to record_name_strategy or topic_record_name_strategy.

Syntax:

FORMAT UPSERT
ENCODE AVRO (
schema.registry = 'schema_registry_url',
[schema.registry.username = 'username'],
[schema.registry.password = 'password'],
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key'],
[message = 'main_message',]
)

Protobuf specific parameters

When creating an append-only Protobuf sink, the following options can be used following FORMAT PLAIN ENCODE PROTOBUF.

FieldNotes
messageRequired. Message name of the main Message in the schema definition. .
schema.locationRequired if schema.registry is not specified. Only one of schema.location or schema.registry can be defined. The schema location. This can be in either file://, http://, https:// format.
schema.registryRequired if schema.location is not specified. Only one of schema.location or schema.registry can be defined. The address of the schema registry.
schema.registry.usernameOptional. The user name used to access the schema registry.
schema.registry.passwordOptional. The password associated with the user name.
schema.registry.name.strategyOptional. Accepted options include topic_name_strategy (default), record_name_strategy, and topic_record_name_strategy.
note

The file:// format is not recommended for production use. If it is used, it needs to be available for both meta and compute nodes.

Syntax:

FORMAT PLAIN
ENCODE PROTOBUF (
message = 'main_message',
schema.location = 'location'
)

Examples

Create a sink by selecting an entire materialized view.

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;

Create a sink with the Kafka configuration message.max.bytes set at 2000 by setting properties.message.max.bytes to 2000.

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test',
properties.message.max.bytes = 2000
)
FORMAT PLAIN ENCODE JSON;

Create a sink by selecting the average distance and duration from taxi_trips.

The schema of taxi_trips is like this:

{
"id": VARCHAR,
"distance": DOUBLE PRECISION,
"duration": DOUBLE PRECISION,
"fare": DOUBLE PRECISION
}

The table may look like this:

 id | distance | duration |   city
----+----------+----------+----------
1 | 16 | 23 | Dallas
2 | 23 | 9 | New York
3 | 6 | 15 | Chicago
4 | 9 | 35 | New York
CREATE SINK sink2 AS
SELECT
avg(distance) as avg_distance,
avg(duration) as avg_duration
FROM taxi_trips
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;

Create sink with VPC connection

If your Kafka sink service is located in a different VPC from RisingWave, use AWS PrivateLink or GCP Private Service Connect to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see Create an AWS PrivateLink connection.

To create a Kafka sink with a VPC connection, in the WITH section of your CREATE SINK statement, specify the following parameters.

ParameterNotes
privatelink.targetsThe PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues.
privatelink.endpointThe DNS name of the VPC endpoint.
If you're using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in Create a VPC connection.
connection.nameThe name of the connection, which comes from the connection created using the CREATE CONNECTION statement. Omit this parameter if you have provisioned a VPC endpoint using privatelink.endpoint (recommended).

Here is an example of creating a Kafka sink using a PrivateLink connection. Notice that {"port": 8001} corresponds to the broker ip1:9092, and {"port": 8002} corresponds to the broker ip2:9092.

CREATE SINK sink2 FROM mv2
WITH (
connector='kafka',
properties.bootstrap.server='b-1.xxx.amazonaws.com:9092,b-2.test.xxx.amazonaws.com:9092',
topic='msk_topic',
privatelink.endpoint='10.148.0.4',
privatelink.targets = '[{"port": 8001}, {"port": 8002}]'
)
FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

TLS/SSL encryption and SASL authentication

RisingWave can sink data to Kafka that is encrypted with Transport Layer Security (TLS) and/or authenticated with SASL.

Secure Sockets Layer (SSL) was the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015. For historical reasons, SSL is used in configuration and code instead of TLS.

Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.

RisingWave supports these SASL authentication mechanisms:

  • SASL/PLAIN
  • SASL/SCRAM

SSL encryption can be used concurrently with SASL authentication mechanisms.

To learn about how to enable SSL encryption and SASL authentication in Kafka, including how to generate the keys and certificates, see the Security Tutorial from Confluent.

You need to specify encryption and authentication parameters in the WITH section of a CREATE SINK statement.

SSL without SASL

To sink data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SINK statement.

ParameterNotes
properties.security.protocolSet to SSL.
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password
note

For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.

Here is an example of creating a sink encrypted with SSL without using SASL authentication.

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.security.protocol='SSL',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
)
FORMAT PLAIN ENCODE JSON;

SASL/PLAIN

ParameterNotes
properties.security.protocolFor SASL/PLAIN without SSL, set to SASL_PLAINTEXT. For SASL/PLAIN with SSL, set to SASL_SSL.
properties.sasl.mechanismSet to PLAIN.
properties.sasl.username
properties.sasl.password
note

For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.

For SASL/PLAIN with SSL, you need to include these SSL parameters:

  • properties.ssl.ca.location
  • properties.ssl.certificate.location
  • properties.ssl.key.location
  • properties.ssl.key.password

Here is an example of creating a sink authenticated with SASL/PLAIN without SSL encryption.

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
)
FORMAT PLAIN ENCODE JSON;

This is an example of creating a sink authenticated with SASL/PLAIN with SSL encryption.

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_SSL',
properties.sasl.username='admin',
properties.sasl.password='admin-secret',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
)
FORMAT PLAIN ENCODE JSON;

SASL/SCRAM

ParameterNotes
properties.security.protocolFor SASL/SCRAM without SSL, set to SASL_PLAINTEXT. For SASL/SCRAM with SSL, set to SASL_SSL.
properties.sasl.mechanismSet to SCRAM-SHA-256 or SCRAM-SHA-512 depending on the encryption method used.
properties.sasl.username
properties.sasl.password
note

For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.

For SASL/SCRAM with SSL, you also need to include these SSL parameters:

  • properties.ssl.ca.location
  • properties.ssl.certificate.location
  • properties.ssl.key.location
  • properties.ssl.key.password

Here is an example of creating a sink authenticated with SASL/SCRAM without SSL encryption.

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
)
FORMAT PLAIN ENCODE JSON;

Data type mapping - RisingWave and Debezium JSON

RisingWave Data TypeSchema Type in JSONSchema Name in JSON
booleanbooleann/a
smallintint16n/a
integerint32n/a
bigintint64n/a
realfloatn/a
double precisiondoublen/a
character varyingstringn/a
byteabytesn/a
numericstringn/a
dateint32org.apache.kafka.connect.data.Date
time without time zoneint64org.apache.kafka.connect.data.Time
timestampint64org.apache.kafka.connect.data.Timestamp
timestamptzstringio.debezium.time.ZonedTimestamp
intervalstringio.debezium.time.Interval
JSONBstringio.debezium.data.Json
structstringn/a
arraystringn/a

Help us make this doc better!