In RisingWave, you can stream data into Iceberg tables with the built-in Iceberg sink connector.

Apache Iceberg is a table format designed to support huge tables. For more information, see Apache Iceberg.

Prerequisites

  • Ensure you already have an Iceberg table that you can sink data to. For additional guidance on creating a table and setting up Iceberg, refer to this quickstart guide on creating an Iceberg table.
  • Ensure you have an upstream materialized view or source that you can sink data from.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='iceberg',
   connector_parameter = 'value', ...
);

Parameters

For object storage configuration (S3, GCS, Azure Blob) and catalog configuration parameters, see:

Sink-specific parameters

Parameter nameDescription
typeRequired. Allowed values: append-only and upsert.
force_append_onlyOptional. If true, forces the sink to be append-only, even if it cannot be.
database.nameRequired. The database of the target Iceberg table.
table.nameRequired. The name of the target Iceberg table.
primary_keyThe primary key for an upsert sink. It is only applicable to the upsert mode.
partition_byOptional. Specify partitioning using column names or transformations. Supported formats include: column, transform(column), transform(n,column), and transform(n, column). Transformations can include functions like bucket or truncate, where n is an optional parameter. Ensure that the specified partition fields exist in the schema.
commit_checkpoint_intervalOptional. Commit every N checkpoints (N > 0). Default value is 10. The behavior of this field also depends on the sink_decouple setting:
  • If sink_decouple is true (the default), the default value of commit_checkpoint_interval is 10.
  • If sink_decouple is set to false, the default value of commit_checkpoint_interval is 1.
  • If sink_decouple is set to false and commit_checkpoint_interval is set to larger than 1, an error will occur.
The approximate time to commit to Iceberg can be calculated as time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval. barrier_interval_ms and checkpoint_frequency are system parameters that define the base checkpointing rate; commit_checkpoint_interval is configurable in the Iceberg sink.
commit_retry_numOptional. The number of times to retry a commit when an Iceberg commit fails. Default is 8.
create_table_if_not_existsOptional. When set to true, it will automatically create a table for the Iceberg sink.
is_exactly_onceOptional. When set to true, enables exactly-once delivery semantics for the Iceberg sink. Default is false.

Data type mapping

RisingWave converts RisingWave data types from/to Iceberg according to the following data type mapping table:

RisingWave TypeIceberg Type
booleanboolean
intint
smallintint
bigintlong
realfloat
floatfloat
doubledouble
varcharstring
datedate
timestamptztimestamptz
timestamptimestamp
mapmap
arraylist
structstruct
jsonbstring

Iceberg table format

Currently, RisingWave only supports Iceberg tables in format v2.

Exactly-once delivery

RisingWave provides exactly-once delivery semantics for Iceberg sinks. This semantics guarantees that each data event is processed once and only once, even in the presence of failures such as retries or restarts. This level of delivery assurance is essential in scenarios where duplicate records can lead to incorrect analytics or data corruption in downstream systems.

Exactly-once delivery is achieved through a two-phase commit protocol involving a pre-commit phase and a commit phase. Iceberg’s commit operations are idempotent, which allows RisingWave to safely retry failed transactions without introducing duplicates.

By default, exactly-once semantics is disabled. To enable it for an Iceberg sink, include is_exactly_once = 'true' in the WITH clause of the sink definition. Note that enabling this option introduces additional coordination overhead due to metadata pre-commit, which may impact sink performance in high-throughput workloads.

Examples

This section includes several examples that you can use if you want to quickly experiment with sinking data to Iceberg.

Create an Iceberg table (if you do not already have one)

Set create_table_if_not_exists to true to automatically create an Iceberg table.

Alternatively, use Spark to create a table. For example, the following spark-sql command creates an Iceberg table named table under the database dev in AWS S3. The table is in an S3 bucket named my-iceberg-bucket in region ap-southeast-1 and under the path path/to/warehouse. The table has the property format-version=2, so it supports the upsert option. There should be a folder named s3://my-iceberg-bucket/path/to/warehouse/dev/table/metadata.

Note that only S3-compatible object store is supported, such as AWS S3 or MinIO.

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1,org.apache.hadoop:hadoop-aws:3.3.2\
    --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.demo.type=hadoop \
    --conf spark.sql.catalog.demo.warehouse=s3a://my-iceberg-bucket/path/to/warehouse \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=${ACCESS_KEY} \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=${SECRET_KEY} \
    --conf spark.sql.defaultCatalog=demo \
    --e "drop table if exists demo.dev.`table`;

CREATE TABLE demo.dev.`table`
(
  seq_id bigint,
  user_id bigint,
  user_name string
) TBLPROPERTIES ('format-version'='2')";

Create an upstream materialized view or source

The following query creates an append-only source. For more details on creating a source, see CREATE SOURCE .

CREATE SOURCE s1_source (
     seq_id bigint,
     user_id bigint,
     user_name varchar)
WITH (
     connector = 'datagen',
     fields.seq_id.kind = 'sequence',
     fields.seq_id.start = '1',
     fields.seq_id.end = '10000000',
     fields.user_id.kind = 'random',
     fields.user_id.min = '1',
     fields.user_id.max = '10000000',
     fields.user_name.kind = 'random',
     fields.user_name.length = '10',
     datagen.rows.per.second = '20000'
 ) FORMAT PLAIN ENCODE JSON;

Another option is to create an upsert table, which supports in-place updates. For more details on creating a table, see CREATE TABLE .

CREATE TABLE s1_table (
     seq_id bigint,
     user_id bigint,
     user_name varchar)
WITH (
     connector = 'datagen',
     fields.seq_id.kind = 'sequence',
     fields.seq_id.start = '1',
     fields.seq_id.end = '10000000',
     fields.user_id.kind = 'random',
     fields.user_id.min = '1',
     fields.user_id.max = '10000000',
     fields.user_name.kind = 'random',
     fields.user_name.length = '10',
     datagen.rows.per.second = '20000'
 ) FORMAT PLAIN ENCODE JSON;

Append-only sink from append-only source

If you have an append-only source and want to create an append-only sink, set type = append-only in the CREATE SINK SQL query.

CREATE SINK s1_sink FROM t1_table
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse,
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY},
    database.name='dev',
    table.name='table'
);

Append-only sink from upsert source

If you have an upsert source and want to create an append-only sink, set type = append-only and force_append_only = true. This will ignore delete messages in the upstream, and to turn upstream update messages into insert messages.

CREATE SINK s1_sink FROM s1_table
WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = 'true',
    warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse,
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY},
    database.name='dev',
    table.name='table'
);

Upsert sink from upsert source

In RisingWave, you can directly sink data as upserts into Iceberg tables.

CREATE SINK s1_sink FROM s1_table
WITH (
    connector = 'iceberg',
    type = 'upsert',
    warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse,
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY},
    database.name='dev',
    table.name='table',
    primary_key='seq_id'
);