In RisingWave, you can stream data into Iceberg tables with the built-in Iceberg sink connector.

Apache Iceberg is a table format designed to support huge tables. For more information, see Apache Iceberg.

Prerequisites

  • Ensure you already have an Iceberg table that you can sink data to. For additional guidance on creating a table and setting up Iceberg, refer to this quickstart guide on creating an Iceberg table.
  • Ensure you have an upstream materialized view or source that you can sink data from.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='iceberg',
   connector_parameter = 'value', ...
);

Parameters

Basic parameters

Parameter NamesDescription
typeRequired. Allowed values: append-only and upsert.
force_append_onlyOptional. If true, forces the sink to be append-only, even if it cannot be.
s3.endpointOptional. Endpoint of the S3.
  • For MinIO object store backend, it should be http://${MINIO_HOST}:${MINIO_PORT}.
  • For AWS S3, refer to S3.
s3.regionOptional. The region where the S3 bucket is hosted. Either s3.endpoint or s3.region must be specified.
s3.access.keyRequired. Access key of the S3 compatible object store.
s3.secret.keyRequired. Secret key of the S3 compatible object store.
s3.path.style.accessOptional. Determines the access style for S3. If true, use path-style; if false, use virtual-hosted–style.
database.nameRequired. The database of the target Iceberg table.
table.nameRequired. The name of the target Iceberg table.
catalog.nameConditional. The name of the Iceberg catalog. It can be omitted for storage catalog but required for other catalogs.
catalog.typeOptional. The catalog type used in this table. Currently, the supported values are storage, rest, hive, jdbc, and glue. If not specified, storage is used. For details, see Catalogs.
warehouse.pathConditional. The path of the Iceberg warehouse. Currently, only S3-compatible object storage systems, such as AWS S3 and MinIO, are supported. It’s required if the catalog.type is not rest.
catalog.urlConditional. The URL of the catalog. It is required when catalog.type is not storage.
primary_keyThe primary key for an upsert sink. It is only applicable to the upsert mode.
partition_byOptional. Specify partitioning using column names or transformations. Supported formats include: column, transform(column), transform(n,column), and transform(n, column). Transformations can include functions like bucket or truncate, where n is an optional parameter. Ensure that the specified partition fields exist in the schema.
commit_checkpoint_intervalOptional. Commit every N checkpoints (N > 0). Default value is 10.
The behavior of this field also depends on the sink_decouple setting:
  • If sink_decouple is true (the default), the default value of commit_checkpoint_interval is 10.
  • If sink_decouple is set to false, the default value of commit_checkpoint_interval is 1.
  • If sink_decouple is set to false and commit_checkpoint_interval is set to larger than 1, an error will occur.
create_table_if_not_existsOptional. When set to true, it will automatically create a table for the Iceberg sink.
catalog.credentialOptional. Credential for accessing the Iceberg catalog, used to exchange for a token in the OAuth2 client credentials flow. Applicable only in the rest catalog.
catalog.tokenOptional. A bearer token for accessing the Iceberg catalog, used for interaction with the server. Applicable only in the rest catalog.
catalog.oauth2_server_uriOptional. The oauth2_server_uri for accessing the Iceberg catalog, serving as the token endpoint URI to fetch a token if the rest catalog is not the authorization server. Applicable only in the rest catalog.
catalog.scopeOptional. Scope for accessing the Iceberg catalog, providing additional scope for OAuth2. Applicable only in the rest catalog.

Use Amazon S3 Tables with the Iceberg sink

You can configure the RisingWave Iceberg sink connector to use Amazon S3 Tables as its catalog. This setup allows RisingWave to sink data into Iceberg tables managed by the AWS native S3 Tables catalog service.

To achieve this, specify the rest catalog type within your CREATE SINK statement and include the necessary parameters for SigV4 authentication against the S3 Tables REST API.

Required REST Catalog Parameters for S3 Tables:

Parameter NameDescriptionValue for S3 Tables
catalog.rest.signing_regionThe AWS region for signing REST catalog requests.e.g., us-east-1
catalog.rest.signing_nameThe service name for signing REST catalog requests.s3tables
catalog.rest.sigv4_enabledEnables SigV4 signing for REST catalog requests. Set to true.true

Example CREATE SINK Statement:

CREATE SINK my_s3_tables_sink FROM source_table
WITH (
    connector = 'iceberg',
    type = 'upsert', -- Or 'append-only'
    primary_key = 'id', -- Required for 'upsert' type

    -- Specify the S3 Tables warehouse ARN
    warehouse.path = 'arn:aws:s3tables:<your-region>:<your-account-id>:bucket/<your-bucket-name>',
    -- AWS Credentials
    s3.access.key = '<your-aws-access-key-id>',
    s3.secret.key = '<your-aws-secret-access-key>',
    s3.region = '<your-region>', -- e.g., 'us-east-1'

    -- S3 Tables REST catalog endpoint
    catalog.uri = 'https://s3tables.<your-region>.amazonaws.com/iceberg',
    -- REST catalog signing configurations
    catalog.rest.signing_region = '<your-region>', -- e.g., 'us-east-1'
    catalog.rest.sigv4_enabled = true,
    catalog.rest.signing_name = 's3tables',
    -- Specify REST catalog type
    catalog.type = 'rest',

    -- Target Iceberg table details within S3 Tables catalog
    database.name = '<your-database-name>', -- Database in S3 Tables
    table.name = '<your-table-name>',       -- Table name in S3 Tables
    create_table_if_not_exists = true      -- Optional: Create table if it doesn't exist
);

<Note>Replace placeholder values (<...>) with your specific AWS account, region, bucket, database, table names, and credentials.</Note>

Once created, this sink will write data from source_table into the specified Iceberg table (<your-table-name>) within the <your-database-name> database, using Amazon S3 Tables to manage the table’s metadata.

Data type mapping

RisingWave converts RisingWave data types from/to Iceberg according to the following data type mapping table:

RisingWave TypeIceberg Type
booleanboolean
intint
smallintint
bigintlong
realfloat
floatfloat
doubledouble
varcharstring
datedate
timestamptztimestamptz
timestamptimestamp
mapmap
arraylist
structstruct
jsonbstring

Catalog

Iceberg supports these types of catalogs:

Storage catalog

The Storage catalog stores all metadata in the underlying file system, such as Hadoop or S3. Currently, we only support S3 as the underlying file system.

Example
CREATE SINK sink_demo_storage FROM t
WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = true,
    s3.endpoint = 'http://minio-0:9301',
    s3.access.key = 'xxxxxxxxxx',
    s3.secret.key = 'xxxxxxxxxx',
    s3.region = 'ap-southeast-1',
    catalog.type = 'storage',
    catalog.name = 'demo',
    warehouse.path = 's3://icebergdata/demo',
    database.name = 's1',
    table.name = 't1'
);

REST catalog

RisingWave supports the REST catalog, which acts as a proxy to other catalogs like Hive, JDBC, and Nessie catalog. This is the recommended approach to use RisingWave with Iceberg tables.

Example
CREATE SINK sink_demo_rest FROM t
WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = true,
    s3.endpoint = 'https://s3.ap-southeast-2.amazonaws.com',
    s3.region = 'ap-southeast-2',
    s3.access.key = 'xxxx',
    s3.secret.key = 'xxxx',
    s3.path.style.access = 'true',
    catalog.type = 'rest',
    catalog.uri = 'http://localhost:8181/api/catalog',
    warehouse.path = 'quickstart_catalog',
    database.name = 'ns',
    table.name = 't1',
    catalog.credential='123456:123456',
    catalog.scope='PRINCIPAL_ROLE:ALL',
    catalog.oauth2_server_uri='xxx'
    catalog.scope='xxx',
);

Hive catalog

RisingWave supports the Hive catalog. You need to set catalog.type to hive to use it.

Example
CREATE SINK sink_demo_hive FROM t
WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = true,
    catalog.type = 'hive',
    catalog.uri = 'thrift://metastore:9083',
    warehouse.path = 's3://icebergdata/demo',
    s3.endpoint = 'http://minio-0:9301',
    s3.access.key = 'xxxxxxxxxx',
    s3.secret.key = 'xxxxxxxxxx',
    s3.region = 'ap-southeast-1',
    catalog.name = 'demo',
    database.name = 's1',
    table.name = 't1'
);

JDBC catalog

RisingWave supports the JDBC catalog.

Example
CREATE SINK sink_demo_jdbc FROM t
WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = true,
    warehouse.path = 's3://icebergdata/demo',
    s3.endpoint = 'http://minio-0:9301',
    s3.access.key = 'xxxxxxxxxx',
    s3.secret.key = 'xxxxxxxxxx',
    s3.region = 'ap-southeast-1',
    catalog.name = 'demo',
    catalog.type = 'jdbc',
    catalog.uri = 'jdbc:postgresql://postgres:5432/iceberg',
    catalog.jdbc.user = 'admin',
    catalog.jdbc.password = '123456',
    database.name = 's1',
    table.name = 't1'
);

Glue catalog

PREMIUM EDITION FEATURE

This is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.

For a full list of Premium Edition features, see RisingWave Premium Edition.

RisingWave supports the Glue catalog. You should use AWS S3 if you use the Glue catalog. Below are example codes for using this catalog:

Example
CREATE SINK sink_test FROM t
WITH (
      type='upsert',
      primary_key='col',
      connector = 'iceberg',
      catalog.type = 'glue',
      catalog.name = 'test',
      warehouse.path = 's3://my-iceberg-bucket/test',
      s3.access.key = 'xxxxxxxxxx',
      s3.secret.key = 'xxxxxxxxxx',
      s3.region = 'ap-southeast-2',
      database.name='test_db',
      table.name='test_table'
  );

Iceberg table format

Currently, RisingWave only supports Iceberg tables in format v2.

Examples

This section includes several examples that you can use if you want to quickly experiment with sinking data to Iceberg.

Create an Iceberg table (if you do not already have one)

Set create_table_if_not_exists to true to automatically create an Iceberg table.

Alternatively, use Spark to create a table. For example, the following spark-sql command creates an Iceberg table named table under the database dev in AWS S3. The table is in an S3 bucket named my-iceberg-bucket in region ap-southeast-1 and under the path path/to/warehouse. The table has the property format-version=2, so it supports the upsert option. There should be a folder named s3://my-iceberg-bucket/path/to/warehouse/dev/table/metadata.

Note that only S3-compatible object store is supported, such as AWS S3 or MinIO.

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1,org.apache.hadoop:hadoop-aws:3.3.2\
    --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.demo.type=hadoop \
    --conf spark.sql.catalog.demo.warehouse=s3a://my-iceberg-bucket/path/to/warehouse \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=${ACCESS_KEY} \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=${SECRET_KEY} \
    --conf spark.sql.defaultCatalog=demo \
    --e "drop table if exists demo.dev.`table`;

CREATE TABLE demo.dev.`table`
(
  seq_id bigint,
  user_id bigint,
  user_name string
) TBLPROPERTIES ('format-version'='2')";

Create an upstream materialized view or source

The following query creates an append-only source. For more details on creating a source, see CREATE SOURCE .

CREATE SOURCE s1_source (
     seq_id bigint,
     user_id bigint,
     user_name varchar)
WITH (
     connector = 'datagen',
     fields.seq_id.kind = 'sequence',
     fields.seq_id.start = '1',
     fields.seq_id.end = '10000000',
     fields.user_id.kind = 'random',
     fields.user_id.min = '1',
     fields.user_id.max = '10000000',
     fields.user_name.kind = 'random',
     fields.user_name.length = '10',
     datagen.rows.per.second = '20000'
 ) FORMAT PLAIN ENCODE JSON;

Another option is to create an upsert table, which supports in-place updates. For more details on creating a table, see CREATE TABLE .

CREATE TABLE s1_table (
     seq_id bigint,
     user_id bigint,
     user_name varchar)
WITH (
     connector = 'datagen',
     fields.seq_id.kind = 'sequence',
     fields.seq_id.start = '1',
     fields.seq_id.end = '10000000',
     fields.user_id.kind = 'random',
     fields.user_id.min = '1',
     fields.user_id.max = '10000000',
     fields.user_name.kind = 'random',
     fields.user_name.length = '10',
     datagen.rows.per.second = '20000'
 ) FORMAT PLAIN ENCODE JSON;

Append-only sink from append-only source

If you have an append-only source and want to create an append-only sink, set type = append-only in the CREATE SINK SQL query.

CREATE SINK s1_sink FROM t1_table
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse,
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY},
    database.name='dev',
    table.name='table'
);

Append-only sink from upsert source

If you have an upsert source and want to create an append-only sink, set type = append-only and force_append_only = true. This will ignore delete messages in the upstream, and to turn upstream update messages into insert messages.

CREATE SINK s1_sink FROM s1_table
WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = 'true',
    warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse,
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY},
    database.name='dev',
    table.name='table'
);

Upsert sink from upsert source

In RisingWave, you can directly sink data as upserts into Iceberg tables.

CREATE SINK s1_sink FROM s1_table
WITH (
    connector = 'iceberg',
    type = 'upsert',
    warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse,
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY},
    database.name='dev',
    table.name='table',
    primary_key='seq_id'
);

Iceberg sink on GCS

Added in version 2.3.

RisingWave supports creating Iceberg sinks on GCS with catalog types storage or rest. For more information about gcs.credential, see parameters.

CREATE SINK sink_t FROM t WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = 'true',
    database.name = 'public',
    table.name = 't',
    catalog.name = 'demo',
    catalog.type = 'rest',
    catalog.uri = 'http://127.0.0.1:8181',
    warehouse.path = 'gs://bucket/path',
    gcs.credential = 'xxxxxxxx'
);