Connect to PostgreSQL CDC
Ingest real-time changes from your PostgreSQL database using change data capture (CDC).
This guide explains how to connect RisingWave to a PostgreSQL database to ingest data changes in real time using the native PostgreSQL CDC source connector.
RisingWave’s PostgreSQL CDC connector is compatible with any PostgreSQL-compliant database that supports logical replication, and supports PostgreSQL versions 10 through 17.
Prerequisites
Before connecting to RisingWave, you must enable logical replication on your upstream database. The specific steps depend on your provider. Select your platform below for detailed setup instructions:
- Set up a self-hosted PostgreSQL database
- Set up AWS RDS or Aurora PostgreSQL
- Set up a Neon serverless PostgreSQL database
- Set up a Supabase project
Once setup is complete, return to this guide to connect RisingWave.
Connect to PostgreSQL
RisingWave uses the CREATE SOURCE
or CREATE TABLE ... FROM source
syntax to connect to PostgreSQL CDC. You’ll use CREATE SOURCE
to create a shared source, and then create multiple tables from it, each representing a table in the upstream PostgreSQL database.
CREATE SOURCE
(shared source)
Use the CREATE SOURCE
statement to define the shared source with connection parameters.
CREATE TABLE ... FROM source
(per-table definition)
Create a table on top of the defined source. Note that a primary key is required and must be consistent with the upstream table. We must also specify the Postgres table name (pg_table_name
) which we are selecting from.
Basic connection example
Configuration Parameters
This section contains a complete reference for all parameters used when creating a PostgreSQL CDC source.
Connector parameters
These parameters are used in the WITH
clause of a CREATE SOURCE
statement.
Parameter | Description | Required |
---|---|---|
connector | Must be set to 'postgres-cdc' . | Yes |
hostname | The hostname or IP address of your PostgreSQL database server. | Yes |
port | The port number of your PostgreSQL database server. The default is 5432. | Yes |
username | The username for connecting to your PostgreSQL database. | Yes |
password | The password for the PostgreSQL user. | Yes |
database.name | The name of the PostgreSQL database to connect to. | Yes |
schema.name | The name of the PostgreSQL schema to capture changes from. Defaults to 'public' . | No |
slot.name | The name of the PostgreSQL replication slot to use. If not specified, a unique name is generated. Each source needs a unique slot. Valid characters: lowercase letters, numbers, underscore. Max length: 63. | No |
publication.name | The name of the PostgreSQL publication to use. Defaults to 'rw_publication' . | No |
publication.create.enable | Whether to automatically create the publication if it doesn’t exist. Defaults to true . | No |
auto.schema.change | Set to true to enable automatic replication of DDL changes from Postgres. Defaults to false . | No |
ssl.mode | SSL/TLS encryption mode. Accepted values: disabled , preferred , required , verify-ca , verify-full . Defaults to disabled . | No |
ssl.root.cert | The PEM-encoded root certificate for verify-ca or verify-full mode. Use a secret. | No |
transactional | Ensures that changes from a single upstream transaction are processed atomically. Defaults to true for shared sources. | No |
Per-table parameters
These parameters are used in the WITH
clause of a CREATE TABLE ... FROM source
statement.
Parameter | Description | Required | Default |
---|---|---|---|
snapshot | If false , disables the initial snapshot (backfill) of the table. Only new changes will be ingested. | No | true |
Debezium parameters
You can also specify any valid Debezium PostgreSQL connector configuration property in the WITH
clause. Prefix the Debezium parameter name with debezium.
.
For example, to skip unknown DDL statements, use:
Features and reference
Data format
The PostgreSQL CDC connector uses the Debezium JSON format for data.
Supported data types
The following table shows the data type mapping from PostgreSQL to RisingWave.
RisingWave does not support directly creating tables from PostgreSQL composite types. If you want to read composite type data, you will need to use a source and create a materialized view based off that source.
PostgreSQL type | RisingWave type |
---|---|
BOOLEAN | BOOLEAN |
BIT(1) | BOOLEAN |
BIT( > 1) | No support |
BIT VARYING[(M)] | No support |
SMALLINT, SMALLSERIAL | SMALLINT |
INTEGER, SERIAL | INTEGER |
BIGINT, BIGSERIAL, OID | BIGINT |
REAL | REAL |
DOUBLE PRECISION | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
CHARACTER[(M)] | CHARACTER VARYING |
CHARACTER VARYING[(M)] | CHARACTER VARYING |
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE |
TIMETZ, TIME WITH TIME ZONE | TIME WITHOUT TIME ZONE (assume UTC time zone) |
INTERVAL [P] | INTERVAL |
BYTEA | BYTEA |
JSON, JSONB | JSONB |
XML | CHARACTER VARYING |
UUID | CHARACTER VARYING |
POINT | STRUCT (with form <x REAL, y REAL> ) |
LTREE | No support |
CITEXT | CHARACTER VARYING* |
INET | CHARACTER VARYING* |
INT4RANGE | CHARACTER VARYING* |
INT8RANGE | CHARACTER VARYING* |
NUMRANGE | CHARACTER VARYING* |
TSRANGE | CHARACTER VARYING* |
TSTZRANGE | CHARACTER VARYING* |
DATERANGE | CHARACTER VARYING* |
ENUM | CHARACTER VARYING* |
DATE | DATE |
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6) | TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3) | TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP | TIMESTAMP WITHOUT TIME ZONE |
NUMERIC[(M[,D])], DECIMAL[(M[,D])] | numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL. |
MONEY[(M[,D])] | NUMERIC |
HSTORE | No support |
HSTORE | No support |
INET | CHARACTER VARYING* |
CIDR | CHARACTER VARYING* |
MACADDR | CHARACTER VARYING* |
MACADDR8 | CHARACTER VARYING* |
Use dbt to ingest data from PostgreSQL CDC
Here is an example of how to use dbt to ingest data from PostgreSQL CDC. In this dbt example, source
and table_with_connector
models will be used. For more details about these two models, please refer to Use dbt for data transformations.
First, we create a source
model pg_mydb.sql
.
And then we create a table_with_connector
model tt3.sql
.
Extract metadata from sources
The INCLUDE
clause allows you to ingest fields not included in the main Debezium payload (such as metadata). See Extracting metadata from sources for details. The available fields are:
timestamp
partition
offset
database_name
collection_name
Automatically map upstream table schema
RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source.
Instead of defining columns individually, you can use *
when creating a table to ingest all columns from the source table. Note that *
cannot be used if other columns are specified in the table creation process.
Automatic schema changes
PREMIUM EDITION FEATURE
This is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.
For a full list of Premium Edition features, see RisingWave Premium Edition.
RisingWave supports auto schema changes in Postgres CDC. It ensures that your RisingWave pipeline stays synchronized with any schema changes in the source database, reducing the need for manual updates and preventing inconsistencies.
Currently, RisingWave supports the ALTER TABLE
command with the following operations, and we plan to add support for additional DDL operations in future releases.
ADD COLUMN [DEFAULT expr]
: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value.DROP COLUMN
: Allows you to remove an existing column from a table.
To enable this feature, set auto.schema.change = 'true'
in your PostgreSQL CDC source configuration:
Create a RisingWave table from the PostgreSQL source:
Add columns to the PostgreSQL table and observe the changes in RisingWave:
After the changes in the upstream table, the schema of the table in RisingWave will also be changed. You can verify this by running DESCRIBE my_table;
in RisingWave.
Ingest data from a partitioned table
RisingWave supports ingesting data from a partitioned table. To configure a publication for your CDC stream, PostgreSQL, by default, creates publications with publish_via_partition_root = false
. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table.
If you need to read from the partitioned table, you should explicitly set this property to TRUE
when creating a publication. Execute the following command in your upstream PostgreSQL database:
If you let RisingWave create the publication, it will automatically set publish_via_partition_root = true
.
Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.
Expression as a column
RisingWave allows users to define expressions as table columns. For example, in the SQL statement below, next_id
is not a column from the source PostgreSQL table. Instead, it is a generated column that RisingWave computes dynamically while ingesting data. The value of next_id
for each row is always equal to id + 1
:
Currently, generated columns must appear at the end of the schema definition. If a column from the upstream source appears after a generated column, RisingWave will return an error. For example, the following statement will fail because name
, an upstream column, is placed after the generated column next_id
:
To avoid errors, ensure that all generated columns are positioned at the end of the schema definition.
Time travel
RisingWave does not support time travel for the native PostgreSQL CDC connector.
What’s next?
- Supported data formats: Data formats and encoding options
- Monitor CDC ingestion progress