Skip to main content

Ingest data from Kafka

This topic describes how to connect RisingWave to a Kafka broker that you want to receive data from, and how to specify data formats, schemas, and security (encryption and authentication) settings.

A source is a resource that RisingWave can read data from. You can create a source in RisingWave using the CREATE SOURCE command. When creating a source, you can choose to persist the data from the source in RisingWave by using the CREATE TABLE command and specifying the connection settings and data format.

Regardless of whether the data is persisted in RisingWave, you can create materialized views to perform analysis or data transformations.

RisingWave supports exactly-once semantics by reading transactional messages only when the associated transaction has been committed. This is the set behavior for RisingWave and not configurable.

Guided setup

RisingWave Cloud provides an intuitive guided setup for creating a Kafka source. For more information, see Create a source using guided setup in the RisingWave Cloud documentation.

Sign up for RisingWave Cloud

Syntax

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
WITH (
connector='kafka',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
info

For Avro and Protobuf data, do not specify schema_definition in the CREATE SOURCE statement. The schema should be provided either in a Web location or a Confluent Schema Registry link in the ROW SCHEMA LOCATION section.

note

RisingWave performs primary key constraint checks on tables but not on sources. If you need the checks to be performed, please create a table.

For tables with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.

Connector parameters

FieldNotes
topicRequired. Address of the Kafka topic. One source can only correspond to one topic.
properties.bootstrap.serverRequired. Address of the Kafka broker. Format: 'ip:port,ip:port'.
scan.startup.modeOptional. The offset mode that RisingWave will use to consume data. The two supported modes are earliest (earliest offset) and latest (latest offset). If not specified, the default value earliest will be used.
scan.startup.timestamp_millisOptional. RisingWave will start to consume data from the specified UNIX timestamp (milliseconds). If this field is specified, the value for scan.startup.mode will be ignored.
properties.sync.call.timeoutOptional. Specify the timeout. By default, the timeout is 5 seconds.

Other parameters

FieldNotes
data_formatData format. Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN.
data_encodeData encode. Supported encodes: JSON, AVRO, PROTOBUF, CSV.
messageMessage name of the main Message in schema definition. Required for Protobuf.
locationWeb location of the schema file in http://..., https://..., or S3://... format. For Avro and Protobuf data, you must specify either a schema location or a schema registry but not both.
schema.registryConfluent Schema Registry URL. Example: http://127.0.0.1:8081. For Avro or Protobuf data, you must specify either a schema location or a Confluent Schema Registry but not both.
schema.registry.usernameConditional. User name for the schema registry. It must be specified with schema.registry.password.
schema.registry.passwordConditional. Password for the schema registry. It must be specified with schema.registry.username.

Additional Kafka parameters

When creating a source in RisingWave, you can specify the following Kafka parameters. To set the parameter, add the RisingWave equivalent of the Kafka parameter under the WITH options. For an example of the usage of these parameters, see the JSON example. For additional details on these parameters, see the Configuration properties.

Kafka parameter nameRisingWave parameter nameType
enable.auto.commitproperties.enable.auto.commitboolean
fetch.max.bytesproperties.fetch.max.bytesint
fetch.queue.backoff.msproperties.fetch.queue.backoff.msint
fetch.wait.max.msproperties.fetch.wait.max.msint
message.max.bytesproperties.message.max.bytesint
queued.max.messages.kbytesproperties.queued.max.messages.kbytesint
queued.min.messagesproperties.queued.min.messagesint
receive.message.max.bytesproperties.receive.message.max.bytesint

Examples

Here are examples of connecting RisingWave to a Kafka broker to read data from individual topics.

note

RisingWave supports reading messages that have been compressed by zstd. Additional configurations are not required.

CREATE SOURCE IF NOT EXISTS source_abc
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp_millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081'
);

Query Kafka timestamp

For each Kafka source created, the virtual column, _rw_kafka_timestamp, will also exist. This column includes the timestamp of the Kafka message.

You can include this column in your views or materialized views to display the Kafka timestamp. Here is an example.

CREATE MATERIALIZED VIEW v1 AS
SELECT _rw_kafka_timestamp, col1
FROM source_name;

If directly querying from the source, you can use _rw_kafka_timestamp to filter messages sent within a specific time period. For example, the following query only selects messages sent in the past 10 minutes.

SELECT * FROM source_name
WHERE _rw_kafka_timestamp > now() - interval '10 minute';

Read schemas from locations

RisingWave supports reading schemas from a Web location in http://..., https://..., or S3://... format, or a Confluent Schema Registry for Kafka data in Avro or Protobuf format.

For Protobuf, if a schema location is specified, the schema file must be a FileDescriptorSet, which can be compiled from a .proto file with a command like this:

protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.proto

To specify a schema location, add this clause to a CREATE SOURCE statement.

ENCODE data_encode (
schema.location = 'location'
)

If a primary key also needs to be defined, use the table constraint syntax.

CREATE TABLE table1 (PRIMARY KEY(id))

Read schemas from Schema Registry

Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your schemas.

RisingWave supports reading schemas from a Schema Registry. The latest schema will be retrieved from the specified Schema Registry using the TopicNameStrategy strategy when the CREATE SOURCE statement is issued. Then the schema parser in RisingWave will automatically determine the columns and data types to use in the source.

To specify the Schema Registry, add this clause to a CREATE SOURCE statement.

ENCODE data_encode (
schema.registry = 'schema_registry_url'
)

To learn more about Confluent Schema Registry and how to set up a Schema Registry, refer to the Confluent Schema Registry documentation.

If a primary key also needs to be defined, use the table constraint syntax.

CREATE TABLE table1 (PRIMARY KEY(id))

Schema evolution

Based on the compatibility type that is configured for the schema registry, some changes are allowed without changing the schema to a different version. In this case, RisingWave will continue using the original schema definition. To use a newer version of the writer schema in RisingWave, you need to drop and recreate the source.

To learn about compatibility types for Schema Registry and the changes allowed, see Compatibility Types.

Create source with VPC connection

If your Kafka source service is located in a different VPC from RisingWave, use AWS PrivateLink to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see Create an AWS PrivateLink connection.

To create a Kafka source with a VPC connection, in the WITH section of your CREATE SOURCE or CREATE TABLE statement, specify the following parameters.

ParameterNotes
privatelink.targetsThe PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues.
privatelink.endpointThe DNS name of the VPC endpoint.
If you're using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in Create a VPC connection.
connection.nameThe name of the connection.
This parameter should only be included if you are using a connection created with the CREATE CONNECTION statement. Omit this parameter if you have provisioned a VPC endpoint using privatelink.endpoint (recommended).

Here is an example of creating a Kafka source using a PrivateLink connection. Notice that {"port": 9094} corresponds to the broker broker1-endpoint, {"port": 9095} corresponds to the broker broker2-endpoint, and {"port": 9096} corresponds to the broker broker3-endpoint.

CREATE TABLE IF NOT EXISTS crypto_source (
product_id VARCHAR,
price NUMERIC,
open_24h NUMERIC,
volume_24h NUMERIC,
low_24h NUMERIC,
high_24h NUMERIC,
volume_30d NUMERIC,
best_bid NUMERIC,
best_ask NUMERIC,
side VARCHAR,
time timestamp,
trade_id bigint,
)
WITH (
connector='kafka',
topic='crypto',
privatelink.endpoint='10.148.0.4',
privatelink.targets='[{"port": 9094}, {"port": 9095}, {"port": 9096}]',
properties.bootstrap.server='broker1-endpoint,broker2-endpoint,broker3-endpoint',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;

TLS/SSL encryption and SASL authentication

RisingWave can read Kafka data that is encrypted with Transport Layer Security (TLS) and/or authenticated with SASL.

Secure Sockets Layer (SSL) was the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015. For historical reasons, SSL is used in configuration and code instead of TLS.

Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.

RisingWave supports these SASL authentication mechanisms:

  • SASL/PLAIN
  • SASL/SCRAM

SSL encryption can be used concurrently with SASL authentication mechanisms.

To learn about how to enable SSL encryption and SASL authentication in Kafka, including how to generate the keys and certificates, see the Security Tutorial from Confluent.

You need to specify encryption and authentication parameters in the WITH section of a CREATE SOURCE statement.

SSL without SASL

To read data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SOURCE statement.

ParameterNotes
properties.security.protocolSet to SSL.
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password
note

For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.

Here is an example of creating a table encrypted with SSL without using SASL authentication.

CREATE TABLE IF NOT EXISTS table_1 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.security.protocol='SSL',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;

SASL/PLAIN

ParameterNotes
properties.security.protocolFor SASL/PLAIN without SSL, set to SASL_PLAINTEXT. For SASL/PLAIN with SSL, set to SASL_SSL.
properties.sasl.mechanismSet to PLAIN.
properties.sasl.username
properties.sasl.password
note

For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.

For SASL/PLAIN with SSL, you need to include these SSL parameters:

  • properties.ssl.ca.location
  • properties.ssl.certificate.location
  • properties.ssl.key.location
  • properties.ssl.key.password

Here is an example of creating a source authenticated with SASL/PLAIN without SSL encryption.

CREATE SOURCE IF NOT EXISTS source_2 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
) FORMAT PLAIN ENCODE JSON;

This is an example of creating a source authenticated with SASL/PLAIN with SSL encryption.

CREATE SOURCE IF NOT EXISTS source_3 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_SSL',
properties.sasl.username='admin',
properties.sasl.password='admin-secret',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;

SASL/SCRAM

ParameterNotes
properties.security.protocolFor SASL/SCRAM without SSL, set to SASL_PLAINTEXT. For SASL/SCRAM with SSL, set to SASL_SSL.
properties.sasl.mechanismSet to SCRAM-SHA-256 or SCRAM-SHA-512 depending on the encryption method used.
properties.sasl.username
properties.sasl.password
note

For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.

For SASL/SCRAM with SSL, you also need to include these SSL parameters:

  • properties.ssl.ca.location
  • properties.ssl.certificate.location
  • properties.ssl.key.location
  • properties.ssl.key.password

Here is an example of creating a table authenticated with SASL/SCRAM without SSL encryption.

CREATE TABLE IF NOT EXISTS table_4 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
) FORMAT PLAIN ENCODE JSON;

Help us make this doc better!