Skip to main content
What this does: Builds a full streaming lakehouse pipeline — CDC from PostgreSQL and events from Kafka flow through RisingWave transformations and land in Iceberg tables with automatic compaction. When to use this: You want to build or extend a data lakehouse with fresh data, without managing Debezium, Kafka connectors, Flink jobs, and Iceberg maintenance separately.

Setup

1. Ingest database changes via CDC

CREATE SOURCE pg_source WITH (
  connector = 'postgres-cdc',
  hostname = '127.0.0.1',
  port = '5432',
  username = 'postgres',
  password = 'your_password',
  database.name = 'mydb',
  slot.name = 'rw_lakehouse_slot'
);

CREATE TABLE customers (
  id INT PRIMARY KEY,
  name VARCHAR,
  email VARCHAR,
  created_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.customers';

CREATE TABLE orders (
  id INT PRIMARY KEY,
  customer_id INT,
  amount DOUBLE PRECISION,
  status VARCHAR,
  created_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.orders';

2. Ingest events from Kafka

CREATE SOURCE clickstream (
  session_id VARCHAR,
  customer_id INT,
  page VARCHAR,
  action VARCHAR,
  event_time TIMESTAMPTZ,
  WATERMARK FOR event_time AS event_time - INTERVAL '10 SECONDS'
) WITH (
  connector = 'kafka',
  topic = 'clickstream',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

3. Create enriched views

-- Enrich orders with customer data
CREATE MATERIALIZED VIEW enriched_orders AS
SELECT
  o.id AS order_id,
  o.customer_id,
  c.name AS customer_name,
  c.email AS customer_email,
  o.amount,
  o.status,
  o.created_at
FROM orders o
JOIN customers c ON o.customer_id = c.id;

-- Aggregate clickstream by session
CREATE MATERIALIZED VIEW session_summary AS
SELECT
  session_id,
  customer_id,
  COUNT(*) AS page_views,
  COUNT(DISTINCT page) AS unique_pages,
  MIN(event_time) AS session_start,
  MAX(event_time) AS session_end
FROM clickstream
GROUP BY session_id, customer_id;

4. Sink to Iceberg

-- Sink enriched orders (upsert mode — reflects updates and deletes)
CREATE SINK orders_iceberg FROM enriched_orders
WITH (
  connector = 'iceberg',
  type = 'upsert',
  primary_key = 'order_id',
  catalog.type = 'storage',
  warehouse.path = 's3://my-lakehouse/warehouse',
  database.name = 'analytics',
  table.name = 'enriched_orders',
  create_table_if_not_exists = 'true',
  s3.region = 'us-east-1',
  s3.access.key = 'your-access-key',
  s3.secret.key = 'your-secret-key'
);

-- Sink session summaries (upsert — session state updates as new events arrive)
CREATE SINK sessions_iceberg FROM session_summary
WITH (
  connector = 'iceberg',
  type = 'upsert',
  primary_key = 'session_id',
  catalog.type = 'storage',
  warehouse.path = 's3://my-lakehouse/warehouse',
  database.name = 'analytics',
  table.name = 'session_summary',
  create_table_if_not_exists = 'true',
  s3.region = 'us-east-1',
  s3.access.key = 'your-access-key',
  s3.secret.key = 'your-secret-key'
);

Key points

  • Use type = 'upsert' for tables with changing state (CDC tables, aggregating MVs); type = 'append-only' only for truly append-only streams with no retractions
  • RisingWave hosts the Iceberg catalog and runs compaction automatically — no Spark compaction jobs needed
  • The JOIN between CDC tables and streams is maintained incrementally — when customer data changes, enriched_orders updates automatically
  • For production, use a proper Iceberg catalog (Glue, REST, Hive) instead of catalog.type = 'storage'

Next steps