This guide explains how to stream processed data from RisingWave into existing Iceberg tables as part of the Bring Your Own Iceberg approach. Use this when you have Iceberg tables managed by external systems and want RisingWave to write processed results into these tables.
When to use Iceberg sinks
Choose Iceberg sinks when:
- Existing data lake: You have an established data lake with Iceberg tables that need real-time updates.
- Multi-system architecture: Multiple applications write to the same Iceberg tables.
- Analytics pipeline: You want to stream processed results into tables consumed by analytics tools.
- ETL/ELT workflows: RisingWave processes data and loads results into your data warehouse.
Prerequisites
- Ensure you have existing Iceberg tables that you can write to, or the ability to create them via external systems.
- Access credentials for the underlying object storage (e.g., S3 access key and secret key).
- Appropriate permissions to write to the target Iceberg catalog and storage.
- An upstream source, table, or materialized view in RisingWave to sink data from.
Basic connection example
CREATE SINK my_iceberg_sink FROM processed_events
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-data-lake/warehouse',
database.name = 'analytics',
table.name = 'processed_user_events',
catalog.type = 'glue',
catalog.name = 'my_glue_catalog',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
Sink modes
Append-only sinks
Use append-only mode when you only want to add new records to the target table:
-- From an append-only source
CREATE SINK events_sink FROM user_events_source
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'events',
table.name = 'user_activity',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
Upsert sinks
Use upsert mode when you need to handle updates and deletes:
-- From an upsert table with primary key
CREATE SINK users_sink FROM user_profiles
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'users',
table.name = 'profiles',
catalog.type = 'rest',
catalog.uri = 'http://rest-catalog:8181',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Force append-only
Convert upsert streams to append-only by ignoring deletes and converting updates to inserts:
CREATE SINK audit_log FROM user_changes
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'audit',
table.name = 'user_change_log',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Catalog configurations
AWS Glue catalog
For tables managed by AWS Glue:
CREATE SINK glue_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'my_database',
table.name = 'my_table',
catalog.type = 'glue',
catalog.name = 'my_catalog',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
REST catalog
For REST catalog services including AWS S3 Tables:
CREATE SINK rest_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'id',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'my_database',
table.name = 'my_table',
catalog.type = 'rest',
catalog.uri = 'http://rest-catalog:8181',
catalog.credential = 'username:password',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
JDBC catalog
For JDBC-based catalogs:
CREATE SINK jdbc_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'my_database',
table.name = 'my_table',
catalog.type = 'jdbc',
catalog.uri = 'jdbc:postgresql://postgres:5432/catalog',
catalog.jdbc.user = 'catalog_user',
catalog.jdbc.password = 'catalog_password',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Advanced features
Exactly-once delivery
Enable exactly-once delivery semantics for critical data pipelines:
CREATE SINK critical_data FROM important_events
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'event_id',
is_exactly_once = 'true',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'critical',
table.name = 'events',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Enabling exactly-once delivery provides stronger consistency guarantees but may impact performance due to additional coordination overhead.
Commit configuration
Control commit frequency and retry behavior:
CREATE SINK configurable_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
commit_checkpoint_interval = 10, -- Commit every 10 checkpoints
commit_retry_num = 5, -- Retry failed commits 5 times
warehouse.path = 's3://data-lake/warehouse',
database.name = 'analytics',
table.name = 'metrics',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
The approximate time to commit is calculated as:
time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval
Use with different storage backends
Amazon S3
CREATE SINK s3_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'analytics',
table.name = 'events',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key',
s3.region = 'us-west-2'
);
Google Cloud Storage
CREATE SINK gcs_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 'gs://my-bucket/warehouse',
database.name = 'analytics',
table.name = 'events',
catalog.type = 'rest',
catalog.uri = 'http://catalog-service:8181',
gcs.service.account = 'path/to/service-account.json'
);
Azure Blob Storage
CREATE SINK azure_sink FROM my_data
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 'abfss://container@account.dfs.core.windows.net/warehouse',
database.name = 'analytics',
table.name = 'events',
catalog.type = 'rest',
catalog.uri = 'http://catalog-service:8181',
azblob.account_name = 'your_account',
azblob.account_key = 'your_key'
);
Amazon S3 Tables integration
AWS S3 Tables provides automatic compaction and optimization:
CREATE SINK s3_tables_sink FROM processed_data
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'id',
warehouse.path = 's3://my-bucket/my-table-bucket/',
database.name = 'analytics',
table.name = 'user_metrics',
catalog.type = 'rest',
catalog.uri = 'https://s3tables.us-east-1.amazonaws.com/tables',
catalog.rest.signing_region = 'us-east-1',
catalog.rest.signing_name = 's3tables',
catalog.rest.sigv4_enabled = 'true',
s3.region = 'us-east-1'
);
Data type mapping
RisingWave data types map to Iceberg types as follows:
RisingWave Type | Iceberg Type | Notes |
---|
BOOLEAN | boolean | |
SMALLINT | int | |
INT | int | |
BIGINT | long | |
REAL | float | |
DOUBLE PRECISION | double | |
VARCHAR | string | |
BYTEA | binary | |
DECIMAL(p,s) | decimal(p,s) | |
TIME | time | |
DATE | date | |
TIMESTAMP | timestamp | |
TIMESTAMPTZ | timestamptz | |
INTERVAL | string | Serialized as string |
JSONB | string | Serialized as JSON string |
ARRAY | list | |
STRUCT | struct | |
MAP | map | |
Configuration parameters
Required parameters
Parameter | Description |
---|
connector | Must be 'iceberg' |
type | Sink mode: 'append-only' or 'upsert' |
database.name | Target Iceberg database name |
table.name | Target Iceberg table name |
Optional parameters
Parameter | Description | Default |
---|
primary_key | Primary key for upsert sinks | None |
force_append_only | Force append-only mode from upsert source | false |
is_exactly_once | Enable exactly-once delivery | false |
commit_checkpoint_interval | Commit interval in checkpoints | 60 |
commit_retry_num | Number of commit retries | 8 |
For detailed storage and catalog configuration:
Integration patterns
Real-time analytics pipeline
Stream aggregated results to analytics tables:
-- Process real-time events
CREATE MATERIALIZED VIEW hourly_metrics AS
SELECT
user_id,
date_trunc('hour', event_timestamp) as hour,
COUNT(*) as event_count,
COUNT(DISTINCT session_id) as session_count
FROM user_events
GROUP BY user_id, date_trunc('hour', event_timestamp);
-- Sink to data lake for analytics
CREATE SINK analytics_sink FROM hourly_metrics
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id,hour',
warehouse.path = 's3://analytics-lake/warehouse',
database.name = 'metrics',
table.name = 'hourly_user_metrics',
catalog.type = 'glue',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Change data capture
Stream database changes to data lake:
-- CDC from PostgreSQL
CREATE SOURCE user_changes
WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'user',
password = 'password',
database.name = 'app_db',
schema.name = 'public'
);
CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR,
email VARCHAR,
created_at TIMESTAMPTZ
)
FROM user_changes TABLE 'users';
-- Stream to data lake
CREATE SINK user_lake_sink FROM users
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id',
warehouse.path = 's3://data-lake/warehouse',
database.name = 'raw',
table.name = 'users',
catalog.type = 'glue'
);
Best practices
- Choose appropriate sink mode: Use append-only for event logs, upsert for dimensional data.
- Configure commit intervals: Balance latency vs file size based on your requirements.
- Enable exactly-once for critical data: Use for financial transactions or other critical data.
- Monitor sink lag: Track how far behind your sink is from the source data.
- Design proper partitioning: Ensure target tables are properly partitioned for query performance.
- Handle backpressure: Monitor sink performance and adjust resources as needed.
Monitoring and troubleshooting
-- Check sink status
SHOW SINKS;
-- View sink details
DESCRIBE SINK my_iceberg_sink;
Common issues
Permission errors: Ensure RisingWave has write access to the catalog and storage
Schema mismatches: Verify source and target schemas are compatible
Connectivity issues: Check network access to catalog and storage services
Performance issues: Monitor commit intervals and consider adjusting configuration
Limitations
- Schema evolution: Limited support for automatic schema changes.
- Concurrent writers: Coordinate with other systems writing to the same tables.
- File size optimization: Consider external compaction for optimal file sizes.
Next steps