Skip to main content
This guide explains how to stream processed data from RisingWave into existing Iceberg tables. Use this when you have Iceberg tables managed by external systems and want RisingWave to deliver processed results into them.

Prerequisites

  • An upstream source, table, or materialized view in RisingWave to output data from.
  • Existing Iceberg tables that you can deliver to, or the ability to create them via external systems.
  • Appropriate permissions to deliver to the target Iceberg catalog and storage.
  • Access credentials for the underlying object storage (e.g., S3 access key and secret key).

Create an Iceberg sink

To write data to an external Iceberg table, create a SINK. This statement defines how data from an upstream object should be formatted and delivered to the target Iceberg table.
CREATE SINK my_iceberg_sink FROM processed_events
WITH (
    connector = 'iceberg',
    type = 'append-only',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'processed_user_events',
    catalog.type = 'glue',
    catalog.name = 'my_glue_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Configuration parameters

ParameterRequiredDescription
connectorYesMust be 'iceberg'.
typeYesSink mode. 'append-only' for new records only; 'upsert' to handle updates and deletes.
database.nameYesThe name of the target Iceberg database.
table.nameYesThe name of the target Iceberg table.
primary_keyYes, if type is upsertA comma-separated list of columns that form the primary key.
force_append_onlyNoIf true, converts an upsert stream to append-only. Updates become inserts and deletes are ignored. Default: false.
is_exactly_onceNoIf true, enables exactly-once delivery semantics. This provides stronger consistency but may impact performance. Default: false.
commit_checkpoint_intervalNoThe number of checkpoints between commits. The approximate time to commit is barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval. Default: 60.
commit_retry_numNoThe number of times to retry a failed commit. Default: 8.
For detailed storage and catalog configuration:

Advanced features

Exactly-once delivery

Enable exactly-once delivery semantics for critical data pipelines:
CREATE SINK critical_data FROM important_events
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'event_id',
    is_exactly_once = 'true',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'critical',
    table.name = 'events',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);
Enabling exactly-once delivery provides stronger consistency guarantees but may impact performance due to additional coordination overhead.

Commit configuration

Control commit frequency and retry behavior:
CREATE SINK configurable_sink FROM my_data
WITH (
    connector = 'iceberg',
    type = 'append-only',
    commit_checkpoint_interval = 10,  -- Commit every 10 checkpoints
    commit_retry_num = 5,            -- Retry failed commits 5 times
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'metrics',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);
The approximate time to commit is calculated as:
time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval

Table maintenance

When you continuously sink data to an Iceberg table, it is important to perform periodic maintenance, including compaction and snapshot expiration, to maintain good query performance and manage storage costs. RisingWave provides both automatic and manual maintenance options. For complete details, see the Iceberg table maintenance guide.

Integration patterns

Real-time analytics pipeline

Stream aggregated results to analytics tables:
-- Process real-time events
CREATE MATERIALIZED VIEW hourly_metrics AS
SELECT 
    user_id,
    date_trunc('hour', event_timestamp) as hour,
    COUNT(*) as event_count,
    COUNT(DISTINCT session_id) as session_count
FROM user_events
GROUP BY user_id, date_trunc('hour', event_timestamp);

-- Sink to data lake for analytics
CREATE SINK analytics_sink FROM hourly_metrics
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id,hour',
    warehouse.path = 's3://analytics-lake/warehouse',
    database.name = 'metrics',
    table.name = 'hourly_user_metrics',
    catalog.type = 'glue',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);

Change data capture

Stream database changes to data lake:
-- CDC from PostgreSQL
CREATE SOURCE user_changes
WITH (
    connector = 'postgres-cdc',
    hostname = 'postgres',
    port = '5432',
    username = 'user',
    password = 'password',
    database.name = 'app_db',
    schema.name = 'public'
);

CREATE TABLE users (
    user_id INT PRIMARY KEY,
    username VARCHAR,
    email VARCHAR,
    created_at TIMESTAMPTZ
)
FROM user_changes TABLE 'users';

-- Stream to data lake
CREATE SINK user_lake_sink FROM users
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'raw',
    table.name = 'users',
    catalog.type = 'glue'
);

Best practices

  1. Choose appropriate sink mode: Use append-only for event logs, upsert for dimensional data.
  2. Configure commit intervals: Balance latency vs file size based on your requirements.
  3. Enable exactly-once for critical data: Use for financial transactions or other critical data.
  4. Monitor sink lag: Track how far behind your sink is from the source data.
  5. Design proper partitioning: Ensure target tables are properly partitioned for query performance.
  6. Handle backpressure: Monitor sink performance and adjust resources as needed.

Monitoring and troubleshooting

Monitor sink performance

-- Check sink status
SHOW SINKS;

-- View sink details
DESCRIBE SINK my_iceberg_sink;

Limitations

  • Schema evolution: Limited support for automatic schema changes.
  • Concurrent writers: Coordinate with other systems writing to the same tables.

Next steps

I