Skip to main content
What this does: Joins a live Kafka event stream with reference data synced from a PostgreSQL database via CDC. The JOIN is maintained incrementally — when reference data changes, the enriched output updates automatically. When to use this: You need to augment high-volume event streams with context from an operational database (user profiles, product catalog, account data) before delivering downstream.

Setup

1. Ingest events from Kafka

CREATE SOURCE clickstream (
  user_id    VARCHAR,
  page_url   VARCHAR,
  event_time TIMESTAMPTZ,
  WATERMARK FOR event_time AS event_time - INTERVAL '5 SECONDS'
) WITH (
  connector = 'kafka',
  topic = 'clickstream',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

2. Sync reference data from PostgreSQL via CDC

CREATE SOURCE pg_source WITH (
  connector = 'postgres-cdc',
  hostname = '127.0.0.1',
  port = '5432',
  username = 'postgres',
  password = 'your_password',
  database.name = 'mydb',
  slot.name = 'rw_enrichment_slot'
);

CREATE TABLE users (
  id           VARCHAR PRIMARY KEY,
  segment      VARCHAR,
  region       VARCHAR,
  subscription VARCHAR
) FROM pg_source TABLE 'public.users';

3. Create an enriched materialized view

The JOIN is maintained incrementally. When a user’s segment changes in PostgreSQL, the enriched output updates automatically.
CREATE MATERIALIZED VIEW enriched_clicks AS
SELECT
  c.user_id,
  c.page_url,
  c.event_time,
  u.segment,
  u.region,
  u.subscription
FROM clickstream c
JOIN users u ON c.user_id = u.id;

4. Deliver enriched events downstream

CREATE SINK enriched_clicks_kafka FROM enriched_clicks
WITH (
  connector = 'kafka',
  properties.bootstrap.server = 'localhost:9092',
  topic = 'enriched-clicks'
) FORMAT PLAIN ENCODE JSON (force_append_only = 'true');

Key points

  • The JOIN between a streaming source and a CDC table is maintained incrementally — no periodic recompute
  • When a row in users changes in PostgreSQL, the enriched_clicks MV updates to reflect the new value
  • For late-arriving events or joins where the reference data may not exist yet, use a LEFT JOIN to avoid dropping records
  • Use force_append_only = 'true' on Kafka sinks from materialized views that contain aggregations or joins

Next steps