Skip to main content
What this does: Creates a continuously running sink that writes RisingWave data to an Iceberg table, with RisingWave managing compaction, small-file cleanup, and snapshot maintenance automatically. When to use this: You need durable, open-format storage for streaming data that is also queryable by Spark, Trino, DuckDB, or other analytical engines.

Setup

1. Ingest source data

CREATE SOURCE events (
  id VARCHAR,
  user_id INT,
  event_type VARCHAR,
  properties JSONB,
  event_time TIMESTAMPTZ,
  WATERMARK FOR event_time AS event_time - INTERVAL '10 SECONDS'
) WITH (
  connector = 'kafka',
  topic = 'app-events',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

2. Create a materialized view to transform data

CREATE MATERIALIZED VIEW clean_events AS
SELECT
  id,
  user_id,
  event_type,
  properties->>'page' AS page,
  event_time
FROM events
WHERE event_type IS NOT NULL;

3. Create the Iceberg sink

CREATE SINK events_iceberg_sink FROM clean_events
WITH (
  connector = 'iceberg',
  type = 'append-only',
  catalog.type = 'storage',
  warehouse.path = 's3://my-bucket/iceberg-warehouse',
  database.name = 'analytics',
  table.name = 'clean_events',
  create_table_if_not_exists = 'true',
  s3.region = 'us-east-1',
  s3.access.key = 'your-access-key',
  s3.secret.key = 'your-secret-key'
);

4. Query the Iceberg table from another engine (optional)

# DuckDB
import duckdb
duckdb.execute("INSTALL iceberg; LOAD iceberg;")
result = duckdb.execute("""
  SELECT event_type, COUNT(*) as count
  FROM iceberg_scan('s3://my-bucket/iceberg-warehouse/analytics/clean_events')
  GROUP BY event_type
  ORDER BY count DESC
""").fetchall()

Key points

  • type = 'append-only' for event streams; type = 'upsert' for tables with primary keys (requires primary_key option)
  • RisingWave automatically handles Iceberg compaction, small-file optimization, and snapshot cleanup — no external scheduler needed
  • Data written to Iceberg is immediately readable by Spark, Trino, DuckDB, and other Iceberg-compatible engines
  • For upsert mode: CREATE SINK ... WITH (type = 'upsert', primary_key = 'id', ...)

Next steps