This guide explains how to stream processed data from RisingWave into existing Iceberg tables as part of the Bring Your Own Iceberg approach. Use this when you have Iceberg tables managed by external systems and want RisingWave to write processed results into these tables.

When to use Iceberg sinks

Choose Iceberg sinks when:

  • Existing data lake: You have an established data lake with Iceberg tables that need real-time updates.
  • Multi-system architecture: Multiple applications write to the same Iceberg tables.
  • Analytics pipeline: You want to stream processed results into tables consumed by analytics tools.
  • ETL/ELT workflows: RisingWave processes data and loads results into your data warehouse.

Prerequisites

  • Ensure you have existing Iceberg tables that you can write to, or the ability to create them via external systems.
  • Access credentials for the underlying object storage (e.g., S3 access key and secret key).
  • Appropriate permissions to write to the target Iceberg catalog and storage.
  • An upstream source, table, or materialized view in RisingWave to sink data from.

Basic connection example

CREATE SINK my_iceberg_sink FROM processed_events
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'processed_user_events',
    catalog.type = 'glue',
    catalog.name = 'my_glue_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Sink modes

Append-only sinks

Use append-only mode when you only want to add new records to the target table:

-- From an append-only source
CREATE SINK events_sink FROM user_events_source
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'events',
    table.name = 'user_activity',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Upsert sinks

Use upsert mode when you need to handle updates and deletes:

-- From an upsert table with primary key
CREATE SINK users_sink FROM user_profiles
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'users',
    table.name = 'profiles',
    catalog.type = 'rest',
    catalog.uri = 'http://rest-catalog:8181',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

Force append-only

Convert upsert streams to append-only by ignoring deletes and converting updates to inserts:

CREATE SINK audit_log FROM user_changes
WITH (
    connector = 'iceberg',
    type = 'append-only',
    force_append_only = 'true',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'audit',
    table.name = 'user_change_log',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

Catalog configurations

AWS Glue catalog

For tables managed by AWS Glue:

CREATE SINK glue_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3://my-bucket/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'glue',
    catalog.name = 'my_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

REST catalog

For REST catalog services including AWS S3 Tables:

CREATE SINK rest_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'id',
    warehouse.path = 's3://my-bucket/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'rest',
    catalog.uri = 'http://rest-catalog:8181',
    catalog.credential = 'username:password',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

JDBC catalog

For JDBC-based catalogs:

CREATE SINK jdbc_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3://my-bucket/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'jdbc',
    catalog.uri = 'jdbc:postgresql://postgres:5432/catalog',
    catalog.jdbc.user = 'catalog_user',
    catalog.jdbc.password = 'catalog_password',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

Advanced features

Exactly-once delivery

Enable exactly-once delivery semantics for critical data pipelines:

CREATE SINK critical_data FROM important_events
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'event_id',
    is_exactly_once = 'true',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'critical',
    table.name = 'events',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

Enabling exactly-once delivery provides stronger consistency guarantees but may impact performance due to additional coordination overhead.

Commit configuration

Control commit frequency and retry behavior:

CREATE SINK configurable_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'append-only',
    commit_checkpoint_interval = 10,  -- Commit every 10 checkpoints
    commit_retry_num = 5,            -- Retry failed commits 5 times
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'metrics',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

The approximate time to commit is calculated as:

time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval

Use with different storage backends

Amazon S3

CREATE SINK s3_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3://my-bucket/warehouse',
    database.name = 'analytics',
    table.name = 'events',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Google Cloud Storage

CREATE SINK gcs_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 'gs://my-bucket/warehouse',
    database.name = 'analytics', 
    table.name = 'events',
    catalog.type = 'rest',
    catalog.uri = 'http://catalog-service:8181',
    gcs.service.account = 'path/to/service-account.json'
);

Azure Blob Storage

CREATE SINK azure_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 'abfss://container@account.dfs.core.windows.net/warehouse',
    database.name = 'analytics',
    table.name = 'events',
    catalog.type = 'rest',
    catalog.uri = 'http://catalog-service:8181',
    azblob.account_name = 'your_account',
    azblob.account_key = 'your_key'
);

Amazon S3 Tables integration

AWS S3 Tables provides automatic compaction and optimization:

CREATE SINK s3_tables_sink FROM processed_data
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'id',
    warehouse.path = 's3://my-bucket/my-table-bucket/',
    database.name = 'analytics',
    table.name = 'user_metrics',
    catalog.type = 'rest',
    catalog.uri = 'https://s3tables.us-east-1.amazonaws.com/tables',
    catalog.rest.signing_region = 'us-east-1',
    catalog.rest.signing_name = 's3tables',
    catalog.rest.sigv4_enabled = 'true',
    s3.region = 'us-east-1'
);

Data type mapping

RisingWave data types map to Iceberg types as follows:

RisingWave TypeIceberg TypeNotes
BOOLEANboolean
SMALLINTint
INTint
BIGINTlong
REALfloat
DOUBLE PRECISIONdouble
VARCHARstring
BYTEAbinary
DECIMAL(p,s)decimal(p,s)
TIMEtime
DATEdate
TIMESTAMPtimestamp
TIMESTAMPTZtimestamptz
INTERVALstringSerialized as string
JSONBstringSerialized as JSON string
ARRAYlist
STRUCTstruct
MAPmap

Configuration parameters

Required parameters

ParameterDescription
connectorMust be 'iceberg'
typeSink mode: 'append-only' or 'upsert'
database.nameTarget Iceberg database name
table.nameTarget Iceberg table name

Optional parameters

ParameterDescriptionDefault
primary_keyPrimary key for upsert sinksNone
force_append_onlyForce append-only mode from upsert sourcefalse
is_exactly_onceEnable exactly-once deliveryfalse
commit_checkpoint_intervalCommit interval in checkpoints60
commit_retry_numNumber of commit retries8

For detailed storage and catalog configuration:

Integration patterns

Real-time analytics pipeline

Stream aggregated results to analytics tables:

-- Process real-time events
CREATE MATERIALIZED VIEW hourly_metrics AS
SELECT 
    user_id,
    date_trunc('hour', event_timestamp) as hour,
    COUNT(*) as event_count,
    COUNT(DISTINCT session_id) as session_count
FROM user_events
GROUP BY user_id, date_trunc('hour', event_timestamp);

-- Sink to data lake for analytics
CREATE SINK analytics_sink FROM hourly_metrics
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id,hour',
    warehouse.path = 's3://analytics-lake/warehouse',
    database.name = 'metrics',
    table.name = 'hourly_user_metrics',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

Change data capture

Stream database changes to data lake:

-- CDC from PostgreSQL
CREATE SOURCE user_changes
WITH (
    connector = 'postgres-cdc',
    hostname = 'postgres',
    port = '5432',
    username = 'user',
    password = 'password',
    database.name = 'app_db',
    schema.name = 'public'
);

CREATE TABLE users (
    user_id INT PRIMARY KEY,
    username VARCHAR,
    email VARCHAR,
    created_at TIMESTAMPTZ
)
FROM user_changes TABLE 'users';

-- Stream to data lake
CREATE SINK user_lake_sink FROM users
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'raw',
    table.name = 'users',
    catalog.type = 'glue'
);

Best practices

  1. Choose appropriate sink mode: Use append-only for event logs, upsert for dimensional data.
  2. Configure commit intervals: Balance latency vs file size based on your requirements.
  3. Enable exactly-once for critical data: Use for financial transactions or other critical data.
  4. Monitor sink lag: Track how far behind your sink is from the source data.
  5. Design proper partitioning: Ensure target tables are properly partitioned for query performance.
  6. Handle backpressure: Monitor sink performance and adjust resources as needed.

Monitoring and troubleshooting

Monitor sink performance

-- Check sink status
SHOW SINKS;

-- View sink details
DESCRIBE SINK my_iceberg_sink;

Common issues

Permission errors: Ensure RisingWave has write access to the catalog and storage Schema mismatches: Verify source and target schemas are compatible Connectivity issues: Check network access to catalog and storage services Performance issues: Monitor commit intervals and consider adjusting configuration

Limitations

  • Schema evolution: Limited support for automatic schema changes.
  • Concurrent writers: Coordinate with other systems writing to the same tables.
  • File size optimization: Consider external compaction for optimal file sizes.

Next steps