A source is a resource that RisingWave can read data from. You can create a source in RisingWave using the CREATE SOURCE command. When creating a source, you can choose to persist the data from the source in RisingWave by using the CREATE TABLE command and specifying the connection settings and data format.

Regardless of whether the data is persisted in RisingWave, you can create materialized views to perform analysis or data transformations.

RisingWave supports exactly-once semantics by reading transactional messages only when the associated transaction has been committed. This is the set behavior for RisingWave and not configurable.

GUIDED SETUP

RisingWave Cloud provides an intuitive guided setup for creating a Kafka source. For more information, see Create a source using guided setup in the RisingWave Cloud documentation.

Syntax

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
[INCLUDE { header | key | offset | partition | timestamp | payload } [AS <column_name>]]
WITH (
   connector='kafka',
   connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
   message = 'message',
   schema.location = 'location' | schema.registry = 'schema_registry_url'
);

schema_definition:

(
   column_name data_type [ PRIMARY KEY ], ...
   [ PRIMARY KEY ( column_name, ... ) ]
)

For Avro and Protobuf data, do not specify schema_definition in the CREATE SOURCE statement.

RisingWave performs primary key constraint checks on tables but not on sources. If you need the checks to be performed, please create a table. For tables with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.

Parameters

Connector parameters

FieldNotes
topicRequired. Address of the Kafka topic. One source can only correspond to one topic.
properties.bootstrap.serverRequired. Address of the Kafka broker. Format: ip:port,ip:port.
scan.startup.modeOptional. The offset mode that RisingWave will use to consume data. The two supported modes are earliest (read from low watermark) and latest (read from high watermark). If not specified, the default value earliest will be used.
scan.startup.timestamp.millisOptional. RisingWave will start to consume data from the specified UNIX timestamp (milliseconds). If this field is specified, the value for scan.startup.mode will be ignored.
group.id.prefixOptional. Specify a custom group ID prefix for the source. The default prefix is rw-consumer. Each job (materialized view) will have a separate consumer group with a generated suffix in the group ID, so the format of the consumer group is {group_id_prefix}-{fragment_id}. This is used to monitor progress in external Kafka tools and for authorization purposes. RisingWave does not rely on committed offsets or join the consumer group. It only reports offsets to the group.
properties.sync.call.timeoutOptional. Specify the timeout. By default, the timeout is 5 seconds.
properties.client.idOptional. Client ID associated with the Kafka client.

FAQ: GroupAuthorizationFailed

If you encounter GroupAuthorizationFailed(Broker: Group authorization failed) while consuming from Kafka topics, consider adding all consumer groups with the group.id.prefix (default: rw-consumer) to the ACL whitelist. You can try the following command to set up the ACL:

./kafka-acls.sh --bootstrap-server <broker-addr> \
  --add --allow-principal User:<user or service account> \
  --consumer-group rw-consumer-.* \
  --operation READ

Other parameters

FieldNotes
data_formatData format. Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN.
data_encodeData encode. Supported encodes: JSON, AVRO, PROTOBUF, CSV.
messageMessage name of the main Message in schema definition. Required for Protobuf.
locationWeb location of the schema file in http://..., https://..., or S3://... format.
  • This option is not supported for Avro data.
  • For Protobuf data, you must specify either a schema location or a schema registry but not both.
schema.registryConfluent Schema Registry URL. Example: http://127.0.0.1:8081.
  • For Avro data, you must specify a Confluent Schema Registry or an AWS Glue Schema Registry.
  • For Protobuf data, you must specify either a schema location or a Confluent Schema Registry but not both.
schema.registry.usernameConditional. User name for the schema registry. It must be specified with schema.registry.password.
schema.registry.passwordConditional. Password for the schema registry. It must be specified with schema.registry.username.
access_keyRequired if loading descriptors from S3. The access key ID of AWS.
secret_keyRequired if loading descriptors from S3. The secret access key of AWS.
regionRequired if loading descriptors from S3. The AWS service region.
arnOptional. The Amazon Resource Name (ARN) of the role to assume.
external_idOptional. The external id used to authorize access to third-party resources.

Additional Kafka parameters

When creating a source in RisingWave, you can specify the following Kafka parameters. To set the parameter, add the RisingWave equivalent of the Kafka parameter under the WITH options. For an example of the usage of these parameters, see the JSON example. For additional details on these parameters, see the Configuration properties.

Kafka parameter nameRisingWave parameter nameType
enable.auto.commitproperties.enable.auto.commitboolean
enable.ssl.certificate.verificationproperties.enable.ssl.certificate.verificationbool
fetch.max.bytesproperties.fetch.max.bytesint
fetch.queue.backoff.msproperties.fetch.queue.backoff.msint
fetch.wait.max.msproperties.fetch.wait.max.msint
message.max.bytesproperties.message.max.bytesint
queued.max.messages.kbytesproperties.queued.max.messages.kbytesint
queued.min.messagesproperties.queued.min.messagesint
receive.message.max.bytesproperties.receive.message.max.bytesint
ssl.endpoint.identification.algorithmproperties.ssl.endpoint.identification.algorithmstr
statistics.interval.msproperties.statistics.interval.msint

To monitor Kafka metrics in Grafana, set properties.statistics.interval.ms to a non-zero value. The granularity is 1000ms.

The additional Kafka parameters queued.min.messages and queued.max.messages.kbytes are specified with properties.queued.min.messages and properties.queued.max.messages.kbytes, respectively, when creating the source.

CREATE SOURCE s1 (v1 int, v2 varchar) with (
  connector = 'kafka',
  topic = 'kafka_1_partition_topic',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest',
  properties.queued.min.messages = 10000,
  properties.queued.max.messages.kbytes = 65536
) FORMAT PLAIN ENCODE JSON;

Set properties.ssl.endpoint.identification.algorithm to none to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either https or none. By default, it is https.

Specific parameters for Amazon MSK

There are some specific parameters for Amazon Managed Streaming for Apache Kafka (MSK), please see Access MSK in RisingWave for more details.

Examples

Here are examples of connecting RisingWave to a Kafka broker to read data from individual topics.

RisingWave supports reading messages that have been compressed by zstd. Additional configurations are not required.

CREATE SOURCE IF NOT EXISTS source_abc
WITH (
   connector='kafka',
   topic='demo_topic',
   properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
   scan.startup.mode='latest',
   scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
   message = 'message_name',
   schema.registry = 'http://127.0.0.1:8081',
   schema.registry.username='your_schema_registry_username',
   schema.registry.password='your_schema_registry_password'
);

Query Kafka timestamp

For each Kafka source created, the virtual column, _rw_kafka_timestamp, will also exist. This column includes the timestamp of the Kafka message.

You can include this column in your views or materialized views to display the Kafka timestamp. Here is an example.

CREATE MATERIALIZED VIEW v1 AS
SELECT _rw_kafka_timestamp, col1
FROM source_name;

If directly querying from the source, you can use _rw_kafka_timestamp to filter messages sent within a specific time period. For example, the following query only selects messages sent in the past 10 minutes.

SELECT * FROM source_name
WHERE _rw_kafka_timestamp > now() - interval '10 minute';

Read schemas from locations

RisingWave supports reading schemas from a Web location in http://..., https://..., or S3://... format, or a Confluent Schema Registry for Kafka data in Protobuf format. For Avro, Confluent Schema Registry and AWS Glue Schema Registry are supported for reading schemas.

For Protobuf, if a schema location is specified, the schema file must be a FileDescriptorSet, which can be compiled from a .proto file with a command like this:

protoc -I=$include_path --include_imports --descriptor_set_out=schema_descriptor.pb schema.proto

To specify a schema location, add this clause to a CREATE SOURCE statement.

ENCODE data_encode (
   schema.location = 'file://...' -- path to schema_descriptor.pb
)

If a primary key also needs to be defined, use the table constraint syntax.

CREATE TABLE table1 (PRIMARY KEY(id))

Read schemas from Confluent Schema Registry

Confluent and Karapace Schema Registry provide a serving layer for your metadata. They provide a RESTful interface for storing and retrieving your schemas.

RisingWave supports reading schemas from a Confluent Schema Registry. The latest schema will be retrieved from the specified Confluent Schema Registry using the TopicNameStrategy strategy when the CREATE SOURCE statement is issued. Then the schema parser in RisingWave will automatically determine the columns and data types to use in the source.

To specify the Confluent Schema Registry, add this clause to a CREATE SOURCE statement.

ENCODE data_encode (
   schema.registry = 'schema_registry_url'
)

To learn more about Confluent Schema Registry and how to set up a Schema Registry, refer to the Confluent Schema Registry documentation.

To learn more about Karapace Schema Registry and how to get started, see Get started with Karapace.

If a primary key also needs to be defined, use the table constraint syntax.

CREATE TABLE table1 (PRIMARY KEY(id));

Schema evolution

Based on the compatibility type that is configured for the schema registry, some changes are allowed without changing the schema to a different version. In this case, RisingWave will continue using the original schema definition. To use a newer version of the writer schema in RisingWave, you need to drop and recreate the source.

To learn about compatibility types for Schema Registry and the changes allowed, see Compatibility Types.

Read schemas from AWS Glue Schema Registry

PREMIUM EDITION FEATURE

This is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.

For a full list of Premium Edition features, see RisingWave Premium Edition.

AWS Glue Schema Registry is a serverless feature of AWS Glue that allows you to centrally manage and enforce schemas for data streams, enabling data validation and compatibility checks. It helps in improving the quality of data streams by providing a central repository for managing and enforcing schemas across various AWS services and custom applications.

You can specify the following configurations in the ENCODE AVRO (...) clause to read schemas from Glue.

Auth-related configurations:

  • aws.region: The region of the AWS Glue Schema Registry. For example, us-west-2.
  • aws.credentials.access_key_id: Your AWS access key ID.
  • aws.credentials.secret_access_key: Your AWS secret access key.
  • aws.credentials.role.arn: The Amazon Resource Name (ARN) of the role to assume. For example, arn:aws:iam::123456123456:role/MyGlueRole.
    • This IAM role shall be granted permissions for the action glue:GetSchemaVersion.

ARN to the schema:

  • aws.glue.schema_arn: The ARN of the schema in AWS Glue Schema Registry. For example, 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent'.
Examples
ENCODE AVRO (
  aws.glue.schema_arn = 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent',
  aws.region = 'US-WEST-2',
  aws.credentials.access_key_id = 'your_access_key_id',
  aws.credentials.secret_access_key = 'your_secret_access_key',
  aws.credentials.role.arn = 'arn:aws:iam::123456123456:role/MyGlueRole'
  ...
);

If your Kafka source service is located in a different VPC from RisingWave, use AWS PrivateLink to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see Create an AWS PrivateLink connection.

To create a Kafka source with a PrivateLink connection, in the WITH section of your CREATE SOURCE or CREATE TABLE statement, specify the following parameters.

ParameterNotes
privatelink.targetsThe PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues.
privatelink.endpointThe DNS name of the VPC endpoint. If you’re using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in Create a PrivateLink connection.

Here is an example of creating a Kafka source using a PrivateLink connection. Notice that {"port": 9094} corresponds to the broker broker1-endpoint, {"port": 9095} corresponds to the broker broker2-endpoint, and {"port": 9096} corresponds to the broker broker3-endpoint.

CREATE TABLE IF NOT EXISTS crypto_source (
  product_id VARCHAR,
  price NUMERIC,
  open_24h NUMERIC,
  volume_24h NUMERIC,
  low_24h NUMERIC,
  high_24h NUMERIC,
  volume_30d NUMERIC,
  best_bid  NUMERIC,
  best_ask  NUMERIC,
  side VARCHAR,
  time timestamp,
  trade_id bigint,
)
WITH (
  connector='kafka',
  topic='crypto',
  privatelink.endpoint='10.148.0.4',
  privatelink.targets='[{"port": 9094}, {"port": 9095}, {"port": 9096}]',
  properties.bootstrap.server='broker1-endpoint,broker2-endpoint,broker3-endpoint',
  scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;

TLS/SSL encryption and SASL authentication

RisingWave can read Kafka data that is encrypted with Transport Layer Security (TLS) and/or authenticated with SASL.

Secure Sockets Layer (SSL) was the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015. For historical reasons, SSL is used in configuration and code instead of TLS.

Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.

RisingWave supports these SASL authentication mechanisms:

  • SASL/PLAIN
  • SASL/SCRAM
  • SASL/GSSAPI
  • SASL/OAUTHBEARER

SSL encryption can be used concurrently with SASL authentication mechanisms.

To learn about how to enable SSL encryption and SASL authentication in Kafka, including how to generate the keys and certificates, see the Security Tutorial from Confluent.

You need to specify encryption and authentication parameters in the WITH section of a CREATE SOURCE statement.

To read data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SOURCE statement.

ParameterNotes
properties.security.protocolSet to SSL.
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password

For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.

Here is an example of creating a table encrypted with SSL without using SASL authentication.

CREATE TABLE IF NOT EXISTS table_1 (
   column1 varchar,
   column2 integer,
)
WITH (
   connector='kafka',
   topic='quickstart-events',
   properties.bootstrap.server='localhost:9093',
   scan.startup.mode='earliest',
   properties.security.protocol='SSL',
   properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
   properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
   properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
   properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;