This guide shows you how to connect RisingWave to your Apache Kafka broker using either CREATE SOURCE (for a connection without storing data) or CREATE TABLE (for a connection that stores data in RisingWave).

Prerequisites

  • A running Apache Kafka cluster.
  • The broker address(es) (ip:port).
  • The topic name.
  • Network access from RisingWave to your Kafka brokers.

Connecting to Kafka

RisingWave supports two primary methods for connecting to Kafka:

Choose the method that best suits your needs.

Basic connection examples

Using CREATE SOURCE (data not stored)

CREATE SOURCE my_kafka_source (
    user_id INT,
    product_id VARCHAR,
    timestamp TIMESTAMP
) WITH (
    connector='kafka',
    topic='user_activity',
    properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;

Using CREATE TABLE (data stored)

CREATE TABLE my_kafka_table (
    user_id INT,
    product_id VARCHAR,
    timestamp TIMESTAMP,
    PRIMARY KEY (user_id)
) WITH (
    connector='kafka',
    topic='user_activity',
    properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;

Extract metadata from Kafka sources

RisingWave supports the INCLUDE clause, which allows you to extract fields not part of the main message payload (like Kafka metadata) as separate columns in your source or table.

For the general syntax and explanation of the INCLUDE clause, see Extracting metadata from sources.

Supported fields for Kafka

FieldDefault TypeNote
keyBYTEACan be overwritten by ENCODE and KEY ENCODE.
timestampTIMESTAMP WITH TIME ZONERefers to CreateTime (not LogAppendTime).
partitionVARCHARThe partition the message is from.
offsetVARCHARThe offset in the partition.
headersSTRUCT<VARCHAR, BYTEA>[]Key-value pairs (headers) associated with the message.
payloadJSONThe actual content or data of the message. Only supports JSON format.

Examples

-- Example: Include key, partition, offset, and timestamp
CREATE SOURCE kafka_source_with_metadata (
    user_id INT,
    product_id VARCHAR
    -- timestamp and payload are usually not included if you defined in the schema already
)
INCLUDE key AS kafka_key
INCLUDE partition AS kafka_partition
INCLUDE offset AS kafka_offset
INCLUDE timestamp AS kafka_timestamp
WITH (
    connector='kafka',
    topic='user_activity',
    properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;

Data format examples

This section shows examples for different data formats.

CREATE SOURCE IF NOT EXISTS source_abc
WITH (
   connector='kafka',
   topic='demo_topic',
   properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
   scan.startup.mode='latest',
   scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
   message = 'message_name',
   schema.registry = 'http://127.0.0.1:8081',
   schema.registry.username='your_schema_registry_username',
   schema.registry.password='your_schema_registry_password'
);

Security examples

To read data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SOURCE statement.

Here is an example of creating a table encrypted with SSL without using SASL authentication.

CREATE TABLE IF NOT EXISTS table_1 (
   column1 varchar,
   column2 integer,
)
WITH (
   connector='kafka',
   topic='quickstart-events',
   properties.bootstrap.server='localhost:9093',
   scan.startup.mode='earliest',
   properties.security.protocol='SSL',
   properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
   properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
   properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
   properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;

What’s next?