This guide shows you how to connect RisingWave to your Apache Kafka broker using either CREATE SOURCE
(for a connection without storing data) or CREATE TABLE
(for a connection that stores data in RisingWave).
Prerequisites
A running Apache Kafka cluster.
The broker address(es) (ip:port
).
The topic name.
Network access from RisingWave to your Kafka brokers.
Connecting to Kafka
RisingWave supports two primary methods for connecting to Kafka:
Choose the method that best suits your needs.
Basic connection examples
Using CREATE SOURCE
(data not stored)
CREATE SOURCE my_kafka_source (
user_id INT,
product_id VARCHAR,
timestamp TIMESTAMP
) WITH (
connector='kafka',
topic='user_activity',
properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;
Using CREATE TABLE (data stored)
CREATE TABLE my_kafka_table (
user_id INT,
product_id VARCHAR,
timestamp TIMESTAMP,
PRIMARY KEY (user_id)
) WITH (
connector='kafka',
topic='user_activity',
properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;
RisingWave supports the INCLUDE clause, which allows you to extract fields not part of the main message payload (like Kafka metadata) as separate columns in your source or table.
For the general syntax and explanation of the INCLUDE clause, see Extracting metadata from sources .
Supported fields for Kafka
Field Default Type Note key
BYTEA
Can be overwritten by ENCODE and KEY ENCODE. timestamp
TIMESTAMP WITH TIME ZONE
Refers to CreateTime (not LogAppendTime). partition
VARCHAR
The partition the message is from. offset
VARCHAR
The offset in the partition. headers
STRUCT<VARCHAR, BYTEA>[]
Key-value pairs (headers) associated with the message. payload
JSON
The actual content or data of the message. Only supports JSON format.
Examples
-- Example: Include key, partition, offset, and timestamp
CREATE SOURCE kafka_source_with_metadata (
user_id INT,
product_id VARCHAR
-- timestamp and payload are usually not included if you defined in the schema already
)
INCLUDE key AS kafka_key
INCLUDE partition AS kafka_partition
INCLUDE offset AS kafka_offset
INCLUDE timestamp AS kafka_timestamp
WITH (
connector='kafka',
topic='user_activity',
properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;
This section shows examples for different data formats.
Avro Upsert Avro JSON Upsert JSON Protobuf CSV Bytes CREATE SOURCE IF NOT EXISTS source_abc
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
);
CREATE SOURCE IF NOT EXISTS source_abc
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
);
CREATE TABLE IF NOT EXISTS source_abc (
primary key (rw_key)
)
INCLUDE key AS rw_key
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_topic'
)
FORMAT UPSERT ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
);
CREATE SOURCE IF NOT EXISTS source_abc (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE JSON;
CREATE TABLE IF NOT EXISTS source_abc (
column1 varchar,
column2 integer,
primary key (rw_key)
)
INCLUDE key AS rw_key
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='t1'
) FORMAT UPSERT ENCODE JSON;
CREATE SOURCE IF NOT EXISTS source_abc
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE PROTOBUF (
message = 'package.message_name',
access_key = 'your_access_key',
secret_key = 'your secret_key',
-- compiled from protoc
location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/schema_descriptor.pb'
);
CREATE TABLE s0 (v1 int, v2 varchar)
WITH (
connector = 'kafka',
topic = 'kafka_csv_topic',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ','
);
CSV header is not supported when creating a table with Kafka connector. Add the without_header
option to the encode parameters.
The delimiter
option specifies the delimiter character used in the CSV data, and it can be one of ,
, ;
, E'\t'
.
CREATE SOURCE t1 (id bytea)
WITH (
connector='kafka',
topic='topic1',
properties.bootstrap.server='localhost:9093',
) FORMAT PLAIN ENCODE BYTES;
Security examples
SSL without SASL SASL/PLAIN SASL/SCRAM SASL/GSSAPI SASL/OAUTHBEARER To read data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SOURCE statement.
Here is an example of creating a table encrypted with SSL without using SASL authentication.
CREATE TABLE IF NOT EXISTS table_1 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.security.protocol='SSL',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;
To read data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SOURCE statement.
Here is an example of creating a table encrypted with SSL without using SASL authentication.
CREATE TABLE IF NOT EXISTS table_1 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.security.protocol='SSL',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;
Here is an example of creating a source authenticated with SASL/PLAIN without SSL encryption.
CREATE SOURCE IF NOT EXISTS source_2 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
) FORMAT PLAIN ENCODE JSON;
This is an example of creating a source authenticated with SASL/PLAIN with SSL encryption.
CREATE SOURCE IF NOT EXISTS source_3 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_SSL',
properties.sasl.username='admin',
properties.sasl.password='admin-secret',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;
Here is an example of creating a source authenticated with SASL/SCRAM without SSL encryption.
CREATE TABLE IF NOT EXISTS table_4 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
) FORMAT PLAIN ENCODE JSON;
Here is an example of creating a source authenticated with SASL/GSSAPI without SSL encryption.
CREATE SOURCE IF NOT EXISTS source_5 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='GSSAPI',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.kerberos.service.name='kafka',
properties.sasl.kerberos.keytab='/etc/krb5kdc/kafka.client.keytab',
properties.sasl.kerberos.principal='kafkaclient4@AP-SOUTHEAST-1.COMPUTE.INTERNAL',
properties.sasl.kerberos.kinit.cmd='sudo kinit -R -kt "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal} || sudo kinit -kt "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}',
properties.sasl.kerberos.min.time.before.relogin='10000'
) FORMAT PLAIN ENCODE JSON;
Here is an example of creating a source authenticated with SASL/OAUTHBEARER without SSL encryption.
CREATE TABLE IF NOT EXISTS source_6 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='OAUTHBEARER',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.oauthbearer.config='principal=bob'
) FORMAT PLAIN ENCODE JSON;
What’s next?