Reference for all configuration parameters when connecting RisingWave to Apache Kafka.
WITH
clause of a CREATE SOURCE
or CREATE TABLE
statement.
Parameter | Description | Required |
---|---|---|
connector | Must be set to 'kafka' . | Yes |
topic | The name of the Kafka topic to consume from. RisingWave can only consume from a single topic per source/table definition. | Yes |
properties.bootstrap.server | The address(es) of your Kafka broker(s). Use a comma-separated list for multiple brokers. Format: host1:port1,host2:port2 . Example: 'localhost:9092' or 'broker1:9092,broker2:9092' | Yes |
Parameter | Description | Required | Default |
---|---|---|---|
scan.startup.mode | The starting offset. Options:
| No | earliest |
scan.startup.timestamp.millis | Start consuming from a specific UNIX timestamp (in milliseconds). If this parameter is set, it overrides scan.startup.mode . Example: 1678886400000 (for March 15, 2023, 00:00:00 GMT) | No |
scan.startup.mode
only applies to streaming jobs, such as when building a table or creating a materialized view on a Kafka source. When directly querying a source, RisingWave will always read from the earliest available messages. For more information about querying sources, see Querying a source.
properties.
For detailed explanations of these parameters, refer to the official librdkafka configuration documentation.
Parameter | Description | Type |
---|---|---|
properties.client.id | A user-defined string sent with every request to the Kafka brokers for tracing, logging, and monitoring purposes. It helps identify the client application (RisingWave) in Kafka logs and metrics. | string |
group.id.prefix | Specifies a custom prefix for the consumer group ID. RisingWave uses consumer groups to track progress, but does not rely on committed offsets. The default prefix is rw-consumer . The full consumer group ID is formed as {group_id_prefix}-{fragment_id} . | string |
properties.sync.call.timeout | The timeout in seconds of the synchronous calls. Default: 5s . | string |
properties.enable.auto.commit | If true , periodically commit the consumer’s offsets to Kafka. If false , you’ll need to handle offset management manually (generally not recommended with RisingWave). Default is true . | boolean |
properties.enable.ssl.certificate.verification | Whether to verify the server’s SSL certificate. Default: true . Set to false to disable verification (not recommended for production). | bool |
properties.fetch.max.bytes | The maximum amount of data the server will return per fetch request. | int |
properties.fetch.queue.backoff.ms | The initial backoff time in milliseconds between fetch requests. This helps avoid overwhelming the Kafka brokers if there are temporary issues. | int |
properties.fetch.wait.max.ms | The maximum time (in milliseconds) the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy the fetch.min.bytes requirement. | int |
properties.message.max.bytes | The maximum size of a message that can be received by the consumer. | int |
properties.queued.max.messages.kbytes | The maximum total size (in kilobytes) of messages buffered in the consumer’s local queue. | int |
properties.queued.min.messages | The minimum number of messages to keep in the local queue. | int |
properties.receive.message.max.bytes | The maximum size of a message that can be received, including protocol overhead. This should generally be larger than message.max.bytes . | int |
properties.statistics.interval.ms | The interval (in milliseconds) at which RisingWave emits Kafka consumer statistics. Set to a non-zero value to enable statistics collection. These statistics can be monitored using tools like Grafana. | int |
properties.ssl.endpoint.identification.algorithm | Set it to none to skip the hostname verification if you don’t provide the client certificate. See more details at TLS/SSL encryption and SASL authentication. | string |
Parameter | Description | Required (Conditional) |
---|---|---|
properties.security.protocol | The security protocol to use:
| Yes (for secure setups) |
properties.sasl.mechanism | The SASL mechanism to use: 'PLAIN' , 'SCRAM-SHA-256' , 'SCRAM-SHA-512' , 'GSSAPI' , 'OAUTHBEARER' . Note: RisingWave’s OAUTHBEARER implementation only supports unsecured client-side tokens. | Yes (if using SASL) |
properties.sasl.username | The SASL username. | Yes (if using SASL) |
properties.sasl.password | The SASL password. | Yes (if using SASL) |
properties.ssl.ca.location | Path to the Certificate Authority (CA) certificate file used to verify the Kafka broker’s certificate. Example: '/path/to/ca.pem' | Yes (for SSL) |
properties.ssl.certificate.location | Path to the client’s SSL certificate file. Example: '/path/to/client.pem' | Conditional |
properties.ssl.key.location | Path to the client’s SSL private key file. Example: '/path/to/client.key' | Conditional |
properties.ssl.key.password | Password for the client’s SSL private key (if it’s encrypted). | Conditional |
properties.sasl.kerberos.service.name | The Kerberos service name. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.keytab | The path to the Kerberos keytab file. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.principal | The Kerberos principal name. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.kinit.cmd | The shell command used to obtain or refresh a Kerberos ticket. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.min.time.before.relogin | The minimum time in milliseconds that must pass before the client automatically refreshes the Kerberos ticket. | Yes (for GSSAPI) |
properties.sasl.oauthbearer.config | The configuration string for SASL/OAUTHBEARER. See the librdkafka documentation for details on the format. Important: RisingWave only supports unsecured client-side tokens for OAUTHBEARER . | Yes (for OAUTHBEARER) |
FORMAT
and ENCODE
to specify how RisingWave should interpret the data in your Kafka messages. See Data formats and encoding options for details on available formats and encodings.
Parameter | Notes |
---|---|
data_format | Data format. Supported formats: DEBEZIUM , MAXWELL , CANAL , UPSERT , PLAIN . |
data_encode | Data encode. Supported encodes: JSON , AVRO , PROTOBUF , CSV , BYTES , PARQUET . |
message | Message name of the main Message in schema definition. Required for Protobuf. |
location | Web location of the schema file in http://... , https://... , or S3://... format.
|
schema.registry | Confluent Schema Registry URL. Example: http://127.0.0.1:8081 .
|
schema.registry.username | Conditional. User name for the schema registry. It must be specified with schema.registry.password . |
schema.registry.password | Conditional. Password for the schema registry. It must be specified with schema.registry.username . |
access_key | Required if loading descriptors from S3. The access key ID of AWS. |
secret_key | Required if loading descriptors from S3. The secret access key of AWS. |
region | Required if loading descriptors from S3. The AWS service region. |
arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
external_id | Optional. The external id used to authorize access to third-party resources. |
map.handling.mode | For Avro data. How to ingest Avro map type to RisingWave. Available values: 'map' (default) and 'jsonb' . |
FORMAT PLAIN ENCODE AVRO (...)
or FORMAT UPSERT ENCODE AVRO (...)
clause to read schemas from Glue.
Auth-related configurations:
Parameter | Description |
---|---|
aws.region | The region of the AWS Glue Schema Registry. For example, us-west-2 . |
aws.credentials.access_key_id | Your AWS access key ID. |
aws.credentials.secret_access_key | Your AWS secret access key. |
aws.credentials.role.arn | The Amazon Resource Name (ARN) of the role to assume. For example, arn:aws:iam::123456123456:role/MyGlueRole . This IAM role shall be granted permissions for the action glue:GetSchemaVersion . |
Parameter | Description |
---|---|
aws.glue.schema_arn | The ARN of the schema in AWS Glue Schema Registry. For example, 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent' . |
Parameter | Description | Required |
---|---|---|
privatelink.endpoint | The DNS name of your VPC endpoint. This is provided by AWS PrivateLink when you create the endpoint. | Yes |
privatelink.targets | A JSON array specifying the PrivateLink targets (port mappings) corresponding to your Kafka brokers. The order must match the order of brokers in properties.bootstrap.server . Example: '[{"port": 9094}, {"port": 9095}]' | Yes |