This guide explains how to connect RisingWave to a PostgreSQL database to ingest data changes in real time using the native PostgreSQL CDC source connector.

RisingWave’s PostgreSQL CDC connector is compatible with any PostgreSQL-compliant database that supports logical replication, and supports PostgreSQL versions 10 through 17.

Prerequisites

Before connecting to RisingWave, you must enable logical replication on your upstream database. The specific steps depend on your provider. Select your platform below for detailed setup instructions:

Once setup is complete, return to this guide to connect RisingWave.

Connect to PostgreSQL

RisingWave uses the CREATE SOURCE or CREATE TABLE ... FROM source syntax to connect to PostgreSQL CDC. You’ll use CREATE SOURCE to create a shared source, and then create multiple tables from it, each representing a table in the upstream PostgreSQL database.

CREATE SOURCE (shared source)

Use the CREATE SOURCE statement to define the shared source with connection parameters.

CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
    connector='postgres-cdc',
    <field>=<value>, ...
);

CREATE TABLE ... FROM source (per-table definition)

Create a table on top of the defined source. Note that a primary key is required and must be consistent with the upstream table. We must also specify the Postgres table name (pg_table_name) which we are selecting from.

CREATE TABLE [ IF NOT EXISTS ] table_name (
    column_name data_type PRIMARY KEY , ...
    PRIMARY KEY ( column_name, ... )
)
[ INCLUDE timestamp AS column_name ]
WITH (
    snapshot='true'
)
FROM source TABLE pg_table_name;

Basic connection example

-- Create a shared CDC source
CREATE SOURCE pg_source WITH (
    connector='postgres-cdc',
    hostname='localhost',
    port='5432',
    username='your_user',
    password='your_password',
    database.name='your_database',
    schema.name='public' -- Optional, defaults to 'public'
);

-- Create a table from the source, representing a specific PostgreSQL table
CREATE TABLE my_table (
    id INT PRIMARY KEY,
    name VARCHAR
)
FROM pg_source TABLE 'public.my_upstream_table';

Configuration Parameters

This section contains a complete reference for all parameters used when creating a PostgreSQL CDC source.

Connector parameters

These parameters are used in the WITH clause of a CREATE SOURCE statement.

ParameterDescriptionRequired
connectorMust be set to 'postgres-cdc'.Yes
hostnameThe hostname or IP address of your PostgreSQL database server.Yes
portThe port number of your PostgreSQL database server. The default is 5432.Yes
usernameThe username for connecting to your PostgreSQL database.Yes
passwordThe password for the PostgreSQL user.Yes
database.nameThe name of the PostgreSQL database to connect to.Yes
schema.nameThe name of the PostgreSQL schema to capture changes from. Defaults to 'public'.No
slot.nameThe name of the PostgreSQL replication slot to use. If not specified, a unique name is generated. Each source needs a unique slot. Valid characters: lowercase letters, numbers, underscore. Max length: 63.No
publication.nameThe name of the PostgreSQL publication to use. Defaults to 'rw_publication'.No
publication.create.enableWhether to automatically create the publication if it doesn’t exist. Defaults to true.No
auto.schema.changeSet to true to enable automatic replication of DDL changes from Postgres. Defaults to false.No
ssl.modeSSL/TLS encryption mode. Accepted values: disabled, preferred, required, verify-ca, verify-full. Defaults to disabled.No
ssl.root.certThe PEM-encoded root certificate for verify-ca or verify-full mode. Use a secret.No
transactionalEnsures that changes from a single upstream transaction are processed atomically. Defaults to true for shared sources.No

Per-table parameters

These parameters are used in the WITH clause of a CREATE TABLE ... FROM source statement.

ParameterDescriptionRequiredDefault
snapshotIf false, disables the initial snapshot (backfill) of the table. Only new changes will be ingested.Notrue

Debezium parameters

You can also specify any valid Debezium PostgreSQL connector configuration property in the WITH clause. Prefix the Debezium parameter name with debezium..

For example, to skip unknown DDL statements, use:

debezium.schema.history.internal.skip.unparseable.ddl = 'true'

Features and reference

Data format

The PostgreSQL CDC connector uses the Debezium JSON format for data.

Supported data types

The following table shows the data type mapping from PostgreSQL to RisingWave.

RisingWave does not support directly creating tables from PostgreSQL composite types. If you want to read composite type data, you will need to use a source and create a materialized view based off that source.

PostgreSQL typeRisingWave type
BOOLEANBOOLEAN
BIT(1)BOOLEAN
BIT( > 1)No support
BIT VARYING[(M)]No support
SMALLINT, SMALLSERIALSMALLINT
INTEGER, SERIALINTEGER
BIGINT, BIGSERIAL, OIDBIGINT
REALREAL
DOUBLE PRECISIONDOUBLE PRECISION
CHAR[(M)]CHARACTER VARYING
VARCHAR[(M)]CHARACTER VARYING
CHARACTER[(M)]CHARACTER VARYING
CHARACTER VARYING[(M)]CHARACTER VARYING
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONETIMESTAMP WITH TIME ZONE
TIMETZ, TIME WITH TIME ZONETIME WITHOUT TIME ZONE (assume UTC time zone)
INTERVAL [P]INTERVAL
BYTEABYTEA
JSON, JSONBJSONB
XMLCHARACTER VARYING
UUIDCHARACTER VARYING
POINTSTRUCT (with form <x REAL, y REAL>)
LTREENo support
CITEXTCHARACTER VARYING*
INETCHARACTER VARYING*
INT4RANGECHARACTER VARYING*
INT8RANGECHARACTER VARYING*
NUMRANGECHARACTER VARYING*
TSRANGECHARACTER VARYING*
TSTZRANGECHARACTER VARYING*
DATERANGECHARACTER VARYING*
ENUMCHARACTER VARYING*
DATEDATE
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6)TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMPTIMESTAMP WITHOUT TIME ZONE
NUMERIC[(M[,D])], DECIMAL[(M[,D])]numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL.
MONEY[(M[,D])]NUMERIC
HSTORENo support
HSTORENo support
INETCHARACTER VARYING*
CIDRCHARACTER VARYING*
MACADDRCHARACTER VARYING*
MACADDR8CHARACTER VARYING*

Use dbt to ingest data from PostgreSQL CDC

Here is an example of how to use dbt to ingest data from PostgreSQL CDC. In this dbt example, source and table_with_connector models will be used. For more details about these two models, please refer to Use dbt for data transformations.

First, we create a source model pg_mydb.sql.

{{ config(materialized='source') }}
CREATE SOURCE {{ this }} WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '8306',
    username = 'root',
    password = '123456',
    database.name = 'mydb',
    slot.name = 'mydb_slot'
);

And then we create a table_with_connector model tt3.sql.

{{ config(materialized='table_with_connector') }}
CREATE TABLE {{ this }} (
    v1 integer primary key,
    v2 timestamp with time zone
) FROM {{ ref('pg_mydb') }} TABLE 'public.tt3';

Extract metadata from sources

The INCLUDE clause allows you to ingest fields not included in the main Debezium payload (such as metadata). See Extracting metadata from sources for details. The available fields are:

  • timestamp
  • partition
  • offset
  • database_name
  • collection_name

Automatically map upstream table schema

RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source. Instead of defining columns individually, you can use * when creating a table to ingest all columns from the source table. Note that * cannot be used if other columns are specified in the table creation process.

CREATE TABLE <table_name> (*) FROM <source_name> TABLE '<schema_name>.<table_name>';

Automatic schema changes

PREMIUM EDITION FEATURE

This is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.

For a full list of Premium Edition features, see RisingWave Premium Edition.

RisingWave supports auto schema changes in Postgres CDC. It ensures that your RisingWave pipeline stays synchronized with any schema changes in the source database, reducing the need for manual updates and preventing inconsistencies.

Currently, RisingWave supports the ALTER TABLE command with the following operations, and we plan to add support for additional DDL operations in future releases.

  • ADD COLUMN [DEFAULT expr]: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value.
  • DROP COLUMN: Allows you to remove an existing column from a table.

To enable this feature, set auto.schema.change = 'true' in your PostgreSQL CDC source configuration:

CREATE SOURCE pg_source WITH (
 connector = 'postgres-cdc',
 hostname = 'localhost',
 port = '5432',
 username = 'your_user',
 password = 'your_password',
 database.name = 'your_database',
 schema.name = 'public',
 auto.schema.change = 'true'
);

Create a RisingWave table from the PostgreSQL source:

CREATE TABLE my_table (
    id INT PRIMARY KEY,
    name VARCHAR
)
FROM pg_source TABLE 'public.my_upstream_table';

Add columns to the PostgreSQL table and observe the changes in RisingWave:

-- In your PostgreSQL database:
ALTER TABLE my_upstream_table ADD COLUMN v1 VARCHAR(255);
ALTER TABLE my_upstream_table ADD COLUMN v2 NUMERIC(5,2);

After the changes in the upstream table, the schema of the table in RisingWave will also be changed. You can verify this by running DESCRIBE my_table; in RisingWave.

-- In RisingWave:
DESCRIBE my_table;

Ingest data from a partitioned table

RisingWave supports ingesting data from a partitioned table. To configure a publication for your CDC stream, PostgreSQL, by default, creates publications with publish_via_partition_root = false. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table.

If you need to read from the partitioned table, you should explicitly set this property to TRUE when creating a publication. Execute the following command in your upstream PostgreSQL database:

CREATE PUBLICATION publication_name FOR table_name WITH (publish_via_partition_root = true);

If you let RisingWave create the publication, it will automatically set publish_via_partition_root = true.

Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.

Expression as a column

RisingWave allows users to define expressions as table columns. For example, in the SQL statement below, next_id is not a column from the source PostgreSQL table. Instead, it is a generated column that RisingWave computes dynamically while ingesting data. The value of next_id for each row is always equal to id + 1:

CREATE TABLE person (
  id integer PRIMARY KEY,
  name varchar,
  next_id int AS id + 1,
  PRIMARY KEY (id)
) FROM pg_mydb TABLE 'public.person';

Currently, generated columns must appear at the end of the schema definition. If a column from the upstream source appears after a generated column, RisingWave will return an error. For example, the following statement will fail because name, an upstream column, is placed after the generated column next_id:

CREATE TABLE person (
  id integer PRIMARY KEY,
  next_id int AS id + 1,
  name varchar,
  PRIMARY KEY (id)
) FROM pg_mydb TABLE 'public.person';

To avoid errors, ensure that all generated columns are positioned at the end of the schema definition.

Time travel

RisingWave does not support time travel for the native PostgreSQL CDC connector.

What’s next?