Skip to main content
What this does: Creates a complete pipeline from a Kafka topic through a materialized view to low-latency query serving. When to use this: You have event data in Kafka and need agents or applications to query continuously computed aggregates at low latency.

Setup

1. Create a Kafka source

CREATE SOURCE transactions (
  user_id VARCHAR,
  amount DOUBLE PRECISION,
  status VARCHAR,
  event_time TIMESTAMPTZ,
  WATERMARK FOR event_time AS event_time - INTERVAL '5 SECONDS'
) WITH (
  connector = 'kafka',
  topic = 'transactions',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

2. Create a materialized view

CREATE MATERIALIZED VIEW suspicious_activity AS
SELECT
  user_id,
  COUNT(*) AS tx_count,
  SUM(amount) AS total_amount,
  window_start,
  window_end
FROM TUMBLE(transactions, event_time, INTERVAL '5 MINUTES')
GROUP BY user_id, window_start, window_end
HAVING COUNT(*) > 5 AND SUM(amount) > 5000;

3. Query the results

-- Always fresh — no REFRESH needed
SELECT user_id, tx_count, total_amount
FROM suspicious_activity
ORDER BY window_end DESC
LIMIT 20;
import psycopg2

conn = psycopg2.connect(host="127.0.0.1", port=4566, user="root", dbname="dev")
conn.autocommit = True
cur = conn.cursor()

cur.execute("""
    SELECT user_id, tx_count, total_amount
    FROM suspicious_activity
    WHERE window_end > NOW() - INTERVAL '10 MINUTES'
    ORDER BY total_amount DESC
    LIMIT 10
""")
results = cur.fetchall()

4. Deliver results downstream (optional)

CREATE SINK suspicious_activity_alerts FROM suspicious_activity
WITH (
  connector = 'kafka',
  properties.bootstrap.server = 'localhost:9092',
  topic = 'alerts'
) FORMAT PLAIN ENCODE JSON (force_append_only='true');

Key points

  • The WATERMARK declaration on the source is required for TUMBLE/HOP window aggregations to emit results
  • Without EMIT ON WINDOW CLOSE, the MV emits results incrementally as data arrives (default behavior)
  • scan.startup.mode = 'earliest' replays all historical Kafka data on first run; use 'latest' to start from now
  • The materialized view is incrementally updated — only changed rows are recomputed when new Kafka messages arrive

Next steps