Skip to main content
What this does: Creates a subscription on a materialized view and consumes change events (insert, update, delete) in your application as they happen. When to use this: Your app or agent needs to react to data changes immediately, without polling a materialized view in a loop.

Setup

1. Create a materialized view to subscribe to

CREATE SOURCE transactions (
  user_id VARCHAR,
  amount DOUBLE PRECISION,
  event_time TIMESTAMPTZ,
  WATERMARK FOR event_time AS event_time - INTERVAL '5 SECONDS'
) WITH (
  connector = 'kafka',
  topic = 'transactions',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

CREATE MATERIALIZED VIEW high_value_tx AS
SELECT user_id, amount, event_time
FROM transactions
WHERE amount > 10000;

2. Create a subscription

CREATE SUBSCRIPTION high_value_alerts
FROM high_value_tx
WITH (retention = '1D');

3. Consume changes in Python

import psycopg2

conn = psycopg2.connect(host="127.0.0.1", port=4566, user="root", dbname="dev")
conn.autocommit = True
cur = conn.cursor()

# Declare cursor on the subscription
cur.execute("DECLARE alert_cursor SUBSCRIPTION CURSOR FOR high_value_alerts")

print("Listening for high-value transactions...")
while True:
    # FETCH NEXT blocks for up to 5s waiting for data, then returns empty
    cur.execute("FETCH NEXT FROM alert_cursor WITH (timeout = '5s')")
    row = cur.fetchone()
    if row:
        # Columns: user_id, amount, event_time, op, rw_timestamp
        user_id = row[0]
        amount = row[1]
        event_time = row[2]
        op = row[3]  # 'Insert', 'Delete', 'UpdateInsert', 'UpdateDelete'
        if op == 'Insert':  # New row inserted
            print(f"ALERT: user {user_id} — ${amount:.2f} at {event_time}")

4. Consume with a start timestamp (resume from checkpoint)

-- Start consuming from a specific point in time (Unix timestamp in milliseconds)
DECLARE alert_cursor SUBSCRIPTION CURSOR FOR high_value_alerts
  SINCE 1714000000000;

Key points

  • Change types: 'Insert', 'Delete', 'UpdateInsert' (new value after update), 'UpdateDelete' (old value before update)
  • retention controls how far back you can start consuming (default: 24 hours)
  • FETCH NEXT FROM cur WITH (timeout = 'Ns') blocks for up to N seconds waiting for data, then returns empty — eliminates the need for a polling sleep loop
  • A single subscription can have multiple cursors consuming independently
  • Drop a subscription when no longer needed: DROP SUBSCRIPTION high_value_alerts

Next steps