Connect to PostgreSQL CDC
Ingest real-time changes from your PostgreSQL database using change data capture (CDC).
This guide explains how to connect RisingWave to a PostgreSQL database to ingest data changes in real time using change data capture (CDC). RisingWave supports PostgreSQL versions 10, 11, 12, 13, 14, 15, and 16.
Prerequisites
- A running PostgreSQL database (self-hosted or AWS RDS/Aurora).
- PostgreSQL
wal_level
set tological
. - A PostgreSQL user with the necessary privileges (
REPLICATION
,LOGIN
,CREATEDB
). - Network connectivity between RisingWave and your PostgreSQL database.
Connection methods
RisingWave supports two ways to ingest CDC data from PostgreSQL:
- Using the built-in PostgreSQL CDC connector.
- Using a CDC tool (like Debezium) and message broker (like Kafka).
This guide focuses on using built-in PostgreSQL CDC connector.
Set up PostgreSQL
Before connecting RisingWave, you need to configure your PostgreSQL database for CDC. The steps vary slightly depending on whether you’re using a self-hosted PostgreSQL instance or AWS RDS/Aurora.
-
Set
wal_level
tological
:This requires a restart of your PostgreSQL instance.
If you are creating multiple CDC tables without a shared source, ensure
max_wal_senders
is greater than or equal to the number of synced tables. The default is 10. -
Grant privileges:
Connect to PostgreSQL
RisingWave uses the CREATE SOURCE
or CREATE TABLE ... FROM source
syntax to connect to PostgreSQL CDC. You’ll use CREATE SOURCE
to create a shared source, and then create multiple tables from it, each representing a table in the upstream PostgreSQL database.
CREATE SOURCE
(shared source)
Use the CREATE SOURCE
statement to define the shared source with connection parameters.
CREATE TABLE ... FROM source
(per-table definition)
Create a table on top of the defined source. Note that a primary key is required and must be consistent with the upstream table. We must also specify the Postgres table name (pg_table_name
) which we are selecting from.
Basic connection example
Data format
The PostgreSQL CDC connector uses the Debezium JSON format for data.
Supported data types
The following table shows the data type mapping from PostgreSQL to RisingWave.
RisingWave does not support directly creating tables from PostgreSQL composite types. If you want to read composite type data, you will need to use a source and create a materialized view based off that source.
PostgreSQL type | RisingWave type |
---|---|
BOOLEAN | BOOLEAN |
BIT(1) | BOOLEAN |
BIT( > 1) | No support |
BIT VARYING[(M)] | No support |
SMALLINT, SMALLSERIAL | SMALLINT |
INTEGER, SERIAL | INTEGER |
BIGINT, BIGSERIAL, OID | BIGINT |
REAL | REAL |
DOUBLE PRECISION | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
CHARACTER[(M)] | CHARACTER VARYING |
CHARACTER VARYING[(M)] | CHARACTER VARYING |
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE |
TIMETZ, TIME WITH TIME ZONE | TIME WITHOUT TIME ZONE (assume UTC time zone) |
INTERVAL [P] | INTERVAL |
BYTEA | BYTEA |
JSON, JSONB | JSONB |
XML | CHARACTER VARYING |
UUID | CHARACTER VARYING |
POINT | STRUCT (with form <x REAL, y REAL> ) |
LTREE | No support |
CITEXT | CHARACTER VARYING* |
INET | CHARACTER VARYING* |
INT4RANGE | CHARACTER VARYING* |
INT8RANGE | CHARACTER VARYING* |
NUMRANGE | CHARACTER VARYING* |
TSRANGE | CHARACTER VARYING* |
TSTZRANGE | CHARACTER VARYING* |
DATERANGE | CHARACTER VARYING* |
ENUM | CHARACTER VARYING* |
DATE | DATE |
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6) | TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3) | TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP | TIMESTAMP WITHOUT TIME ZONE |
NUMERIC[(M[,D])], DECIMAL[(M[,D])] | numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL. |
MONEY[(M[,D])] | NUMERIC |
HSTORE | No support |
HSTORE | No support |
INET | CHARACTER VARYING* |
CIDR | CHARACTER VARYING* |
MACADDR | CHARACTER VARYING* |
MACADDR8 | CHARACTER VARYING* |
Extracting metadata from sources
The INCLUDE
clause allows you to ingest fields not included in the main Debezium payload (such as metadata). See Extracting metadata from sources for details. The available fields are:
timestamp
partition
offset
database_name
collection_name
Automatically map upstream table schema
RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source.
Instead of defining columns individually, you can use *
when creating a table to ingest all columns from the source table. Note that *
cannot be used if other columns are specified in the table creation process.
Time travel
RisingWave does not support time travel for the native PostgreSQL CDC connector.
What’s next?
- All configuration options: PostgreSQL CDC configuration options
- Supported data formats: Data formats and encoding options
- Ingest data with dbt
- Monitor CDC ingestion progress
Was this page helpful?