This page provides a comprehensive reference for all configuration parameters available when creating a Kafka source or table in RisingWave using the WITH clause of a CREATE SOURCE or CREATE TABLE statement.

Basic connection parameters

These parameters are typically required to establish a basic connection to your Kafka cluster.

ParameterDescriptionRequired
connectorMust be set to 'kafka'.Yes
topicThe name of the Kafka topic to consume from. RisingWave can only consume from a single topic per source/table definition.Yes
properties.bootstrap.serverThe address(es) of your Kafka broker(s). Use a comma-separated list for multiple brokers.
Format: host1:port1,host2:port2.
Example: 'localhost:9092' or 'broker1:9092,broker2:9092'
Yes

Data consumption options

These parameters control how RisingWave consumes data from the Kafka topic.

ParameterDescriptionRequiredDefault
scan.startup.modeThe starting offset. Options:
  • 'earliest': Start consuming from the earliest available offset (oldest data).
  • 'latest': Start consuming from the latest offset (new data only).
Noearliest
scan.startup.timestamp.millisStart consuming from a specific UNIX timestamp (in milliseconds). If this parameter is set, it overrides scan.startup.mode. Example: 1678886400000 (for March 15, 2023, 00:00:00 GMT)No

General Kafka properties

These parameters provide additional control over the Kafka consumer behavior. Most correspond to standard Kafka configuration properties, and for those, the RisingWave parameter name is prefixed with properties. For detailed explanations of these parameters, refer to the official librdkafka configuration documentation.

ParameterDescriptionType
properties.client.idA user-defined string sent with every request to the Kafka brokers for tracing, logging, and monitoring purposes. It helps identify the client application (RisingWave) in Kafka logs and metrics.string
group.id.prefixSpecifies a custom prefix for the consumer group ID. RisingWave uses consumer groups to track progress, but does not rely on committed offsets. The default prefix is rw-consumer. The full consumer group ID is formed as {group_id_prefix}-{fragment_id}.string
properties.sync.call.timeoutThe timeout in seconds of the synchronous calls. Default: 5s.string
properties.enable.auto.commitIf true, periodically commit the consumer’s offsets to Kafka. If false, you’ll need to handle offset management manually (generally not recommended with RisingWave). Default is false.boolean
properties.enable.ssl.certificate.verificationWhether to verify the server’s SSL certificate. Default: true. Set to false to disable verification (not recommended for production).bool
properties.fetch.max.bytesThe maximum amount of data the server will return per fetch request.int
properties.fetch.queue.backoff.msThe initial backoff time in milliseconds between fetch requests. This helps avoid overwhelming the Kafka brokers if there are temporary issues.int
properties.fetch.wait.max.msThe maximum time (in milliseconds) the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy the fetch.min.bytes requirement.int
properties.message.max.bytesThe maximum size of a message that can be received by the consumer.int
properties.queued.max.messages.kbytesThe maximum total size (in kilobytes) of messages buffered in the consumer’s local queue.int
properties.queued.min.messagesThe minimum number of messages to keep in the local queue.int
properties.receive.message.max.bytesThe maximum size of a message that can be received, including protocol overhead. This should generally be larger than message.max.bytes.int
properties.statistics.interval.msThe interval (in milliseconds) at which RisingWave emits Kafka consumer statistics. Set to a non-zero value to enable statistics collection. These statistics can be monitored using tools like Grafana.int
properties.ssl.endpoint.identification.algorithmSet it to none to skip the hostname verification if you don’t provide the client certificate. See more details at TLS/SSL encryption and SASL authentication.string

Security (TLS/SSL and SASL)

RisingWave supports connecting to Kafka clusters secured with TLS/SSL encryption and/or SASL authentication. See the main Connect to Kafka guide for detailed examples.

The following table summarizes the parameters used for configuring security:

ParameterDescriptionRequired (Conditional)
properties.security.protocolThe security protocol to use:
  • 'plaintext': No encryption or authentication.
  • 'ssl': SSL encryption.
  • 'sasl_plaintext': SASL authentication without encryption.
  • 'sasl_ssl': SASL authentication with SSL encryption.
Yes (for secure setups)
properties.sasl.mechanismThe SASL mechanism to use: 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'GSSAPI', 'OAUTHBEARER'. Note: RisingWave’s OAUTHBEARER implementation only supports unsecured client-side tokens.Yes (if using SASL)
properties.sasl.usernameThe SASL username.Yes (if using SASL)
properties.sasl.passwordThe SASL password.Yes (if using SASL)
properties.ssl.ca.locationPath to the Certificate Authority (CA) certificate file used to verify the Kafka broker’s certificate. Example: '/path/to/ca.pem'Yes (for SSL)
properties.ssl.certificate.locationPath to the client’s SSL certificate file. Example: '/path/to/client.pem'Conditional
properties.ssl.key.locationPath to the client’s SSL private key file. Example: '/path/to/client.key'Conditional
properties.ssl.key.passwordPassword for the client’s SSL private key (if it’s encrypted).Conditional
properties.sasl.kerberos.service.nameThe Kerberos service name. Required for SASL/GSSAPI.Yes (for GSSAPI)
properties.sasl.kerberos.keytabThe path to the Kerberos keytab file. Required for SASL/GSSAPI.Yes (for GSSAPI)
properties.sasl.kerberos.principalThe Kerberos principal name. Required for SASL/GSSAPI.Yes (for GSSAPI)
properties.sasl.kerberos.kinit.cmdThe shell command used to obtain or refresh a Kerberos ticket. Required for SASL/GSSAPI.Yes (for GSSAPI)
properties.sasl.kerberos.min.time.before.reloginThe minimum time in milliseconds that must pass before the client automatically refreshes the Kerberos ticket.Yes (for GSSAPI)
properties.sasl.oauthbearer.configThe configuration string for SASL/OAUTHBEARER. See the librdkafka documentation for details on the format. Important: RisingWave only supports unsecured client-side tokens for OAUTHBEARER.Yes (for OAUTHBEARER)

Format and encoding options

These parameters are used with FORMAT and ENCODE to specify how RisingWave should interpret the data in your Kafka messages. See Data formats and encoding options for details on available formats and encodings.

ParameterNotes
data_formatData format. Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN.
data_encodeData encode. Supported encodes: JSON, AVRO, PROTOBUF, CSV, BYTES, PARQUET.
messageMessage name of the main Message in schema definition. Required for Protobuf.
locationWeb location of the schema file in http://..., https://..., or S3://... format.
  • This option is not supported for Avro data.
  • For Protobuf data, you must specify either a schema location or a schema registry but not both.
schema.registryConfluent Schema Registry URL. Example: http://127.0.0.1:8081.
  • For Avro data, you must specify a Confluent Schema Registry or an AWS Glue Schema Registry.
  • For Protobuf data, you must specify either a schema location or a Confluent Schema Registry but not both.
schema.registry.usernameConditional. User name for the schema registry. It must be specified with schema.registry.password.
schema.registry.passwordConditional. Password for the schema registry. It must be specified with schema.registry.username.
access_keyRequired if loading descriptors from S3. The access key ID of AWS.
secret_keyRequired if loading descriptors from S3. The secret access key of AWS.
regionRequired if loading descriptors from S3. The AWS service region.
arnOptional. The Amazon Resource Name (ARN) of the role to assume.
external_idOptional. The external id used to authorize access to third-party resources.
map.handling.modeFor Avro data. How to ingest Avro map type to RisingWave. Available values: 'map' (default) and 'jsonb'.

If your Kafka cluster is in a different VPC and you’re using AWS PrivateLink for connectivity, you’ll need to specify these parameters:

ParameterDescriptionRequired
privatelink.endpointThe DNS name of your VPC endpoint. This is provided by AWS PrivateLink when you create the endpoint.Yes
privatelink.targetsA JSON array specifying the PrivateLink targets (port mappings) corresponding to your Kafka brokers. The order must match the order of brokers in properties.bootstrap.server. Example: '[{"port": 9094}, {"port": 9095}]'Yes

AWS MSK Configuration

For connecting to Amazon Managed Streaming for Apache Kafka (MSK), please see Access MSK in RisingWave for more details.