Prerequisites
- An upstream source, table, or materialized view in RisingWave to output data from.
- Existing Iceberg tables that you can deliver to, or the ability to create them via external systems.
- Appropriate permissions to deliver to the target Iceberg catalog and storage.
- Access credentials for the underlying object storage (e.g., S3 access key and secret key).
Create an Iceberg sink
To write data to an external Iceberg table, create aSINK. This statement defines how data from an upstream object should be formatted and delivered to the target Iceberg table.
Configuration parameters
| Parameter | Required | Description |
|---|---|---|
connector | Yes | Must be 'iceberg'. |
type | Yes | Sink mode. 'append-only' for new records only; 'upsert' to handle updates and deletes. |
database.name | Yes | The name of the target Iceberg database. |
table.name | Yes | The name of the target Iceberg table. |
primary_key | Yes, if type is upsert | A comma-separated list of columns that form the primary key. |
force_append_only | No | If true, converts an upsert stream to append-only. Updates become inserts and deletes are ignored. Default: false. |
is_exactly_once | No | Set to true to enable exactly-once delivery semantics. This provides stronger consistency but may impact performance. Default: true. Exactly-once delivery requires sink decoupling to be enabled (the default behavior). If you SET sink_decouple = false;, exactly-once semantics will be automatically disabled for the sink. |
commit_checkpoint_interval | No | Controls how often RisingWave commits to Iceberg. Default: 60 (about every 60 seconds in the default configuration). |
commit_retry_num | No | The number of times to retry a failed commit. Default: 8. |
create_table_if_not_exists | No | If true, RisingWave creates the external Iceberg table automatically when it does not exist. Default: false. |
partition_by | No | Specify partitioning using column names or transformations. Supported transformations include identity, truncate(n), bucket(n), year, month, day, hour, and void. Multiple columns can be separated by commas. Example: partition_by = 'truncate(4,v2),bucket(5,v1)'. For more details on Iceberg partitioning, see Partition transforms. |
- Object storage: Object storage configuration
- Catalogs: Catalog configuration
commit_checkpoint_interval and commit_retry_num to manage commit frequency and retry behavior.