Connect to Kafka
Quickly connect RisingWave to your Apache Kafka broker to start ingesting data.
This guide shows you how to connect RisingWave to your Apache Kafka broker using either CREATE SOURCE
(for a connection without storing data) or CREATE TABLE
(for a connection that stores data in RisingWave).
Prerequisites
- A running Apache Kafka cluster.
- The broker address(es) (
ip:port
). - The topic name.
- Network access from RisingWave to your Kafka brokers.
Connecting to Kafka
RisingWave supports two primary methods for connecting to Kafka:
CREATE SOURCE
: Creates a connection without storing data in RisingWave. See Connecting withCREATE SOURCE
for details.CREATE TABLE
: Creates a connection and stores the ingested data in RisingWave. See Connecting withCREATE TABLE
for details.
Choose the method that best suits your needs.
Basic connection examples
Using CREATE SOURCE
(data not stored)
Using CREATE TABLE (data stored)
Extract metadata from Kafka sources
RisingWave supports the INCLUDE clause, which allows you to extract fields not part of the main message payload (like Kafka metadata) as separate columns in your source or table.
For the general syntax and explanation of the INCLUDE clause, see Extracting metadata from sources.
Supported fields for Kafka
Field | Default Type | Note |
---|---|---|
key | BYTEA | Can be overwritten by ENCODE and KEY ENCODE. |
timestamp | TIMESTAMP WITH TIME ZONE | Refers to CreateTime (not LogAppendTime). |
partition | VARCHAR | The partition the message is from. |
offset | VARCHAR | The offset in the partition. |
headers | STRUCT<VARCHAR, BYTEA>[] | Key-value pairs (headers) associated with the message. |
payload | JSON | The actual content or data of the message. Only supports JSON format. |
Examples
Data format examples
This section shows examples for different data formats.
Security examples
To read data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SOURCE statement.
Here is an example of creating a table encrypted with SSL without using SASL authentication.
What’s next?
- All configuration options: Kafka Config
- Supported data formats: Data formats and encoding options
Was this page helpful?