Kafka configuration options
Reference for all configuration parameters when connecting RisingWave to Apache Kafka.
This page provides a comprehensive reference for all configuration parameters available when creating a Kafka source or table in RisingWave using the WITH
clause of a CREATE SOURCE
or CREATE TABLE
statement.
Basic connection parameters
These parameters are typically required to establish a basic connection to your Kafka cluster.
Parameter | Description | Required |
---|---|---|
connector | Must be set to 'kafka' . | Yes |
topic | The name of the Kafka topic to consume from. RisingWave can only consume from a single topic per source/table definition. | Yes |
properties.bootstrap.server | The address(es) of your Kafka broker(s). Use a comma-separated list for multiple brokers. Format: host1:port1,host2:port2 . Example: 'localhost:9092' or 'broker1:9092,broker2:9092' | Yes |
Data consumption options
These parameters control how RisingWave consumes data from the Kafka topic.
Parameter | Description | Required | Default |
---|---|---|---|
scan.startup.mode | The starting offset. Options:
| No | earliest |
scan.startup.timestamp.millis | Start consuming from a specific UNIX timestamp (in milliseconds). If this parameter is set, it overrides scan.startup.mode . Example: 1678886400000 (for March 15, 2023, 00:00:00 GMT) | No |
General Kafka properties
These parameters provide additional control over the Kafka consumer behavior. Most correspond to standard Kafka configuration properties, and for those, the RisingWave parameter name is prefixed with properties.
For detailed explanations of these parameters, refer to the official librdkafka configuration documentation.
Parameter | Description | Type |
---|---|---|
properties.client.id | A user-defined string sent with every request to the Kafka brokers for tracing, logging, and monitoring purposes. It helps identify the client application (RisingWave) in Kafka logs and metrics. | string |
group.id.prefix | Specifies a custom prefix for the consumer group ID. RisingWave uses consumer groups to track progress, but does not rely on committed offsets. The default prefix is rw-consumer . The full consumer group ID is formed as {group_id_prefix}-{fragment_id} . | string |
properties.sync.call.timeout | The timeout in seconds of the synchronous calls. Default: 5s . | string |
properties.enable.auto.commit | If true , periodically commit the consumer’s offsets to Kafka. If false , you’ll need to handle offset management manually (generally not recommended with RisingWave). Default is false . | boolean |
properties.enable.ssl.certificate.verification | Whether to verify the server’s SSL certificate. Default: true . Set to false to disable verification (not recommended for production). | bool |
properties.fetch.max.bytes | The maximum amount of data the server will return per fetch request. | int |
properties.fetch.queue.backoff.ms | The initial backoff time in milliseconds between fetch requests. This helps avoid overwhelming the Kafka brokers if there are temporary issues. | int |
properties.fetch.wait.max.ms | The maximum time (in milliseconds) the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy the fetch.min.bytes requirement. | int |
properties.message.max.bytes | The maximum size of a message that can be received by the consumer. | int |
properties.queued.max.messages.kbytes | The maximum total size (in kilobytes) of messages buffered in the consumer’s local queue. | int |
properties.queued.min.messages | The minimum number of messages to keep in the local queue. | int |
properties.receive.message.max.bytes | The maximum size of a message that can be received, including protocol overhead. This should generally be larger than message.max.bytes . | int |
properties.statistics.interval.ms | The interval (in milliseconds) at which RisingWave emits Kafka consumer statistics. Set to a non-zero value to enable statistics collection. These statistics can be monitored using tools like Grafana. | int |
properties.ssl.endpoint.identification.algorithm | Set it to none to skip the hostname verification if you don’t provide the client certificate. See more details at TLS/SSL encryption and SASL authentication. | string |
Security (TLS/SSL and SASL)
RisingWave supports connecting to Kafka clusters secured with TLS/SSL encryption and/or SASL authentication. See the main Connect to Kafka guide for detailed examples.
The following table summarizes the parameters used for configuring security:
Parameter | Description | Required (Conditional) |
---|---|---|
properties.security.protocol | The security protocol to use:
| Yes (for secure setups) |
properties.sasl.mechanism | The SASL mechanism to use: 'PLAIN' , 'SCRAM-SHA-256' , 'SCRAM-SHA-512' , 'GSSAPI' , 'OAUTHBEARER' . Note: RisingWave’s OAUTHBEARER implementation only supports unsecured client-side tokens. | Yes (if using SASL) |
properties.sasl.username | The SASL username. | Yes (if using SASL) |
properties.sasl.password | The SASL password. | Yes (if using SASL) |
properties.ssl.ca.location | Path to the Certificate Authority (CA) certificate file used to verify the Kafka broker’s certificate. Example: '/path/to/ca.pem' | Yes (for SSL) |
properties.ssl.certificate.location | Path to the client’s SSL certificate file. Example: '/path/to/client.pem' | Conditional |
properties.ssl.key.location | Path to the client’s SSL private key file. Example: '/path/to/client.key' | Conditional |
properties.ssl.key.password | Password for the client’s SSL private key (if it’s encrypted). | Conditional |
properties.sasl.kerberos.service.name | The Kerberos service name. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.keytab | The path to the Kerberos keytab file. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.principal | The Kerberos principal name. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.kinit.cmd | The shell command used to obtain or refresh a Kerberos ticket. Required for SASL/GSSAPI. | Yes (for GSSAPI) |
properties.sasl.kerberos.min.time.before.relogin | The minimum time in milliseconds that must pass before the client automatically refreshes the Kerberos ticket. | Yes (for GSSAPI) |
properties.sasl.oauthbearer.config | The configuration string for SASL/OAUTHBEARER. See the librdkafka documentation for details on the format. Important: RisingWave only supports unsecured client-side tokens for OAUTHBEARER . | Yes (for OAUTHBEARER) |
Format and encoding options
These parameters are used with FORMAT
and ENCODE
to specify how RisingWave should interpret the data in your Kafka messages. See Data formats and encoding options for details on available formats and encodings.
Parameter | Notes |
---|---|
data_format | Data format. Supported formats: DEBEZIUM , MAXWELL , CANAL , UPSERT , PLAIN . |
data_encode | Data encode. Supported encodes: JSON , AVRO , PROTOBUF , CSV , BYTES , PARQUET . |
message | Message name of the main Message in schema definition. Required for Protobuf. |
location | Web location of the schema file in http://... , https://... , or S3://... format.
|
schema.registry | Confluent Schema Registry URL. Example: http://127.0.0.1:8081 .
|
schema.registry.username | Conditional. User name for the schema registry. It must be specified with schema.registry.password . |
schema.registry.password | Conditional. Password for the schema registry. It must be specified with schema.registry.username . |
access_key | Required if loading descriptors from S3. The access key ID of AWS. |
secret_key | Required if loading descriptors from S3. The secret access key of AWS. |
region | Required if loading descriptors from S3. The AWS service region. |
arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
external_id | Optional. The external id used to authorize access to third-party resources. |
map.handling.mode | For Avro data. How to ingest Avro map type to RisingWave. Available values: 'map' (default) and 'jsonb' . |
PrivateLink configuration
If your Kafka cluster is in a different VPC and you’re using AWS PrivateLink for connectivity, you’ll need to specify these parameters:
Parameter | Description | Required |
---|---|---|
privatelink.endpoint | The DNS name of your VPC endpoint. This is provided by AWS PrivateLink when you create the endpoint. | Yes |
privatelink.targets | A JSON array specifying the PrivateLink targets (port mappings) corresponding to your Kafka brokers. The order must match the order of brokers in properties.bootstrap.server . Example: '[{"port": 9094}, {"port": 9095}]' | Yes |
AWS MSK Configuration
For connecting to Amazon Managed Streaming for Apache Kafka (MSK), please see Access MSK in RisingWave for more details.
Was this page helpful?