This guide explains how to connect RisingWave to a PostgreSQL database to ingest data changes in real time using change data capture (CDC). RisingWave supports PostgreSQL versions 10, 11, 12, 13, 14, 15, and 16.

Prerequisites

  • A running PostgreSQL database (self-hosted or AWS RDS/Aurora).
  • PostgreSQL wal_level set to logical.
  • A PostgreSQL user with the necessary privileges (REPLICATION, LOGIN, CREATEDB).
  • Network connectivity between RisingWave and your PostgreSQL database.

Connection methods

RisingWave supports two ways to ingest CDC data from PostgreSQL:

  • Using the built-in PostgreSQL CDC connector.
  • Using a CDC tool (like Debezium) and message broker (like Kafka).

This guide focuses on using built-in PostgreSQL CDC connector.

Set up PostgreSQL

Before connecting RisingWave, you need to configure your PostgreSQL database for CDC. The steps vary slightly depending on whether you’re using a self-hosted PostgreSQL instance or AWS RDS/Aurora.

  1. Set wal_level to logical:

    ALTER SYSTEM SET wal_level = logical;
    

    This requires a restart of your PostgreSQL instance.

    If you are creating multiple CDC tables without a shared source, ensure max_wal_senders is greater than or equal to the number of synced tables. The default is 10.

  2. Grant privileges:

    -- For an existing user:
    ALTER USER <username> REPLICATION LOGIN CREATEDB;
    
    -- Or, for a new user:
    CREATE USER <username> REPLICATION LOGIN CREATEDB;
    
    GRANT CONNECT ON DATABASE <database_name> TO <username>;
    GRANT USAGE ON SCHEMA <schema_name> TO <username>;
    GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>;
    GRANT CREATE ON DATABASE <database_name> TO <username>;
    

Connect to PostgreSQL

RisingWave uses the CREATE SOURCE or CREATE TABLE ... FROM source syntax to connect to PostgreSQL CDC. You’ll use CREATE SOURCE to create a shared source, and then create multiple tables from it, each representing a table in the upstream PostgreSQL database.

CREATE SOURCE (shared source)

Use the CREATE SOURCE statement to define the shared source with connection parameters.

CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
    connector='postgres-cdc',
    <field>=<value>, ...
);

CREATE TABLE ... FROM source (per-table definition)

Create a table on top of the defined source. Note that a primary key is required and must be consistent with the upstream table. We must also specify the Postgres table name (pg_table_name) which we are selecting from.

CREATE TABLE [ IF NOT EXISTS ] table_name (
    column_name data_type PRIMARY KEY , ...
    PRIMARY KEY ( column_name, ... )
)
[ INCLUDE timestamp AS column_name ]
WITH (
    snapshot='true'
)
FROM source TABLE pg_table_name;

Basic connection example

-- Create a shared CDC source
CREATE SOURCE pg_source WITH (
    connector='postgres-cdc',
    hostname='localhost',
    port='5432',
    username='your_user',
    password='your_password',
    database.name='your_database',
    schema.name='public' -- Optional, defaults to 'public'
);

-- Create a table from the source, representing a specific PostgreSQL table
CREATE TABLE my_table (
    id INT PRIMARY KEY,
    name VARCHAR
)
FROM pg_source TABLE 'public.my_upstream_table';

Data format

The PostgreSQL CDC connector uses the Debezium JSON format for data.

Supported data types

The following table shows the data type mapping from PostgreSQL to RisingWave.

RisingWave does not support directly creating tables from PostgreSQL composite types. If you want to read composite type data, you will need to use a source and create a materialized view based off that source.

PostgreSQL typeRisingWave type
BOOLEANBOOLEAN
BIT(1)BOOLEAN
BIT( > 1)No support
BIT VARYING[(M)]No support
SMALLINT, SMALLSERIALSMALLINT
INTEGER, SERIALINTEGER
BIGINT, BIGSERIAL, OIDBIGINT
REALREAL
DOUBLE PRECISIONDOUBLE PRECISION
CHAR[(M)]CHARACTER VARYING
VARCHAR[(M)]CHARACTER VARYING
CHARACTER[(M)]CHARACTER VARYING
CHARACTER VARYING[(M)]CHARACTER VARYING
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONETIMESTAMP WITH TIME ZONE
TIMETZ, TIME WITH TIME ZONETIME WITHOUT TIME ZONE (assume UTC time zone)
INTERVAL [P]INTERVAL
BYTEABYTEA
JSON, JSONBJSONB
XMLCHARACTER VARYING
UUIDCHARACTER VARYING
POINTSTRUCT (with form <x REAL, y REAL>)
LTREENo support
CITEXTCHARACTER VARYING*
INETCHARACTER VARYING*
INT4RANGECHARACTER VARYING*
INT8RANGECHARACTER VARYING*
NUMRANGECHARACTER VARYING*
TSRANGECHARACTER VARYING*
TSTZRANGECHARACTER VARYING*
DATERANGECHARACTER VARYING*
ENUMCHARACTER VARYING*
DATEDATE
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6)TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMPTIMESTAMP WITHOUT TIME ZONE
NUMERIC[(M[,D])], DECIMAL[(M[,D])]numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL.
MONEY[(M[,D])]NUMERIC
HSTORENo support
HSTORENo support
INETCHARACTER VARYING*
CIDRCHARACTER VARYING*
MACADDRCHARACTER VARYING*
MACADDR8CHARACTER VARYING*

Extracting metadata from sources

The INCLUDE clause allows you to ingest fields not included in the main Debezium payload (such as metadata). See Extracting metadata from sources for details. The available fields are:

  • timestamp
  • partition
  • offset
  • database_name
  • collection_name

Automatically map upstream table schema

RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source. Instead of defining columns individually, you can use * when creating a table to ingest all columns from the source table. Note that * cannot be used if other columns are specified in the table creation process.

CREATE TABLE <table_name> (*) FROM <source_name> TABLE '<schema_name>.<table_name>';

Time travel

RisingWave does not support time travel for the native PostgreSQL CDC connector.

What’s next?