Skip to main content
What this does: Reads the PostgreSQL transaction log (WAL) via CDC, syncs table changes into RisingWave in real time, and enables downstream transformations and delivery. When to use this: You need to react to database changes (inserts, updates, deletes) in real time, or build a streaming lakehouse from an operational PostgreSQL database.

Prerequisites

Enable logical replication on your PostgreSQL instance:
-- Run on PostgreSQL (not RisingWave)
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL after this change

-- Create a replication slot
SELECT pg_create_logical_replication_slot('risingwave_slot', 'pgoutput');

Setup

1. Create the CDC source in RisingWave

CREATE SOURCE pg_source WITH (
  connector = 'postgres-cdc',
  hostname = '127.0.0.1',
  port = '5432',
  username = 'postgres',
  password = 'your_password',
  database.name = 'mydb',
  slot.name = 'risingwave_slot'
);

2. Create a table from the CDC source

CREATE TABLE orders (
  id INT PRIMARY KEY,
  user_id INT,
  product_id INT,
  amount DOUBLE PRECISION,
  status VARCHAR,
  created_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.orders';

3. Build a materialized view on top

CREATE MATERIALIZED VIEW order_summary AS
SELECT
  user_id,
  COUNT(*) AS order_count,
  SUM(amount) AS total_spent,
  MAX(created_at) AS last_order_at
FROM orders
WHERE status = 'completed'
GROUP BY user_id;

4. Query results

SELECT user_id, order_count, total_spent
FROM order_summary
WHERE user_id = 42;

5. Deliver to downstream (optional)

-- Sink to Kafka
CREATE SINK order_summary_sink FROM order_summary
WITH (
  connector = 'kafka',
  properties.bootstrap.server = 'localhost:9092',
  topic = 'order_summaries'
) FORMAT PLAIN ENCODE JSON (force_append_only='true');

-- Or sink to Iceberg
CREATE SINK order_summary_iceberg FROM order_summary
WITH (
  connector = 'iceberg',
  type = 'upsert',
  primary_key = 'user_id',
  catalog.type = 'storage',
  warehouse.path = 's3://my-bucket/warehouse',
  database.name = 'mydb',
  table.name = 'order_summary',
  create_table_if_not_exists = 'true'
);

Key points

  • RisingWave reads from the PostgreSQL WAL — no triggers or application changes needed
  • Changes (INSERT, UPDATE, DELETE) are all reflected in the RisingWave table
  • The slot.name must match the replication slot you created on PostgreSQL
  • For AWS RDS, Supabase, and Neon, see platform-specific setup guides

Next steps