Skip to main content
Try our SQL generator tool! We’ve built an interactive tool to help you generate SQL statements for connecting to data sources. Check it out at https://sql.risingwave.com/source/postgres-cdc.
This guide explains how to connect RisingWave to a PostgreSQL database to ingest data changes in real time using the native PostgreSQL CDC source connector. RisingWave’s PostgreSQL CDC connector is compatible with any PostgreSQL-compliant database that supports logical replication, and supports PostgreSQL versions 10 through 17.

Prerequisites

Before using the native Postgres CDC connector in RisingWave, you need to configure your Postgres database properly.

Connect to PostgreSQL

To ingest CDC data from PostgreSQL, you first create a shared source using the CREATE SOURCE statement. This source establishes the connection to the PostgreSQL database. Then, for each upstream table you want to ingest, you define a corresponding table in RisingWave using the CREATE TABLE FROM SOURCE statement.

Create a shared source

Use the CREATE SOURCE statement to create a shared source.
CREATE SOURCE [ IF NOT EXISTS ] <shared_source_name> WITH (
    connector='postgres-cdc',
    <field>=<value>, ...
);

Create a table from the shared source

Next, create a table from the shared source to ingest data from a specific upstream PostgreSQL table. When defining this table in RisingWave, you must specify a primary key that matches the primary key of the upstream table. You also need to provide the name of the upstream table.
CREATE TABLE [ IF NOT EXISTS ] <rw_table_name> (
    <column_name> <data_type> PRIMARY KEY , ...
    PRIMARY KEY ( <column_name>, ... )
)
[ INCLUDE timestamp AS <column_name> ]
WITH (
    snapshot='true'
)
FROM <shared_source_name> TABLE <upstream_table_name>;

Basic connection example

-- Create a shared CDC source
CREATE SOURCE shared_source WITH (
    connector='postgres-cdc',
    hostname='localhost',
    port='5432',
    username='your_user',
    password='your_password',
    database.name='your_database',
    schema.name='public' -- Optional, defaults to 'public'
);

-- Create a table from the source, representing a specific PostgreSQL table
CREATE TABLE my_table (
    id INT PRIMARY KEY,
    name VARCHAR
)
FROM shared_source TABLE 'public.my_upstream_table';

Parameters

These parameters are used in the WITH clause of a CREATE SOURCE statement.
ParameterDescriptionRequired
connectorMust be set to 'postgres-cdc'.Yes
hostnameThe hostname or IP address of your PostgreSQL database server.Yes
portThe port number of your PostgreSQL database server. The default is 5432.Yes
usernameThe username for connecting to your PostgreSQL database.Yes
passwordThe password for the PostgreSQL user.Yes
database.nameThe name of the PostgreSQL database to connect to.Yes
schema.nameThe name of the PostgreSQL schema to capture changes from. Defaults to 'public'.No
slot.nameThe name of the PostgreSQL replication slot to use. If not specified, a unique name is generated. Each source needs a unique slot. Valid characters: lowercase letters, numbers, underscore. Max length: 63.No
publication.nameThe name of the PostgreSQL publication to use. Defaults to 'rw_publication'.No
publication.create.enableWhether to automatically create the publication if it doesn’t exist. Defaults to true.No
auto.schema.changeSet to true to enable automatic replication of DDL changes from Postgres. Defaults to false.No
ssl.modeSSL/TLS encryption mode. Accepted values: disabled, preferred, required, verify-ca, verify-full. Defaults to disabled.No
ssl.root.certThe PEM-encoded root certificate for verify-ca or verify-full mode. Use a secret.No
transactionalEnsures that changes from a single upstream transaction are processed atomically. Defaults to true for shared sources.No
These parameters are used in the WITH clause of a CREATE TABLE ... FROM source statement.
ParameterDescription
snapshotOptional. If false, disables the initial snapshot (backfill) of the table. Only new changes will be ingested. The default value is true.
snapshot.intervalOptional. Specifies the barrier interval for buffering upstream events. The default value is 1.
snapshot.batch_sizeOptional. Specifies the batch size of a snapshot read query from the upstream table. The default value is 1000.
backfill.parallelismOptional. Controls the parallelism of CDC table backfill. When set to a value greater than 0, enables parallelized CDC backfill. The default value is 0 (disabled).
backfill_num_rows_per_splitOptional. Specifies number of rows per split for parallelized CDC backfill. Only effective when backfill.parallelism > 0. The default value is 100000.
backfill_as_even_splitsOptional. Whether to create even splits for parallelized CDC backfill. Only effective when backfill.parallelism > 0. The default value is true.
For large tables, you can significantly speed up the initial data load by enabling parallelized backfill. Configure this feature using the backfill.parallelism, backfill_num_rows_per_split, and backfill_as_even_splits parameters.
CREATE TABLE large_table (
  id integer primary key,
  data varchar
)
WITH (
  backfill.parallelism = '4',
  backfill.num_rows_per_split = '50000',
  backfill.as_even_splits = 'true'
)
FROM pg_mydb TABLE 'public.large_table';

Debezium parameters

You can also specify any valid Debezium PostgreSQL connector configuration property in the WITH clause. Prefix the Debezium parameter name with debezium.. For example, to skip unknown DDL statements, use:
debezium.schema.history.internal.skip.unparseable.ddl = 'true'

Features and reference

Data format

The PostgreSQL CDC connector uses the Debezium JSON format for data.

Supported data types

The following table shows the data type mapping from PostgreSQL to RisingWave.
RisingWave does not support directly creating tables from PostgreSQL composite types. If you want to read composite type data, you will need to use a source and create a materialized view based off that source.
PostgreSQL typeRisingWave type
BOOLEANBOOLEAN
BIT(1)BOOLEAN
BIT( > 1)No support
BIT VARYING[(M)]No support
SMALLINT, SMALLSERIALSMALLINT
INTEGER, SERIALINTEGER
BIGINT, BIGSERIAL, OIDBIGINT
REALREAL
DOUBLE PRECISIONDOUBLE PRECISION
CHAR[(M)]CHARACTER VARYING
VARCHAR[(M)]CHARACTER VARYING
CHARACTER[(M)]CHARACTER VARYING
CHARACTER VARYING[(M)]CHARACTER VARYING
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONETIMESTAMP WITH TIME ZONE
TIMETZ, TIME WITH TIME ZONETIME WITHOUT TIME ZONE (assume UTC time zone)
INTERVAL [P]INTERVAL
BYTEABYTEA
JSON, JSONBJSONB
XMLCHARACTER VARYING
UUIDCHARACTER VARYING
POINTSTRUCT (with form <x REAL, y REAL>)
LTREENo support
CITEXTCHARACTER VARYING*
INETCHARACTER VARYING*
INT4RANGECHARACTER VARYING*
INT8RANGECHARACTER VARYING*
NUMRANGECHARACTER VARYING*
TSRANGECHARACTER VARYING*
TSTZRANGECHARACTER VARYING*
DATERANGECHARACTER VARYING*
ENUMCHARACTER VARYING*
DATEDATE
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6)TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMPTIMESTAMP WITHOUT TIME ZONE
NUMERIC[(M[,D])], DECIMAL[(M[,D])]numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL.
MONEY[(M[,D])]NUMERIC
HSTORENo support
HSTORENo support
INETCHARACTER VARYING*
CIDRCHARACTER VARYING*
MACADDRCHARACTER VARYING*
MACADDR8CHARACTER VARYING*

Support for PostgreSQL TOAST

Added in v2.6.0.
RisingWave supports TOASTed (The Oversized-Attribute Storage Technique) data from PostgreSQL when using the CDC connector. This ensures that even columns with very large values, such as long text or large JSON objects, are ingested completely and accurately during both the initial backfill and incremental changes. Supported TOAST-able data types in Postgres:
  • Standard types: varchar, text, xml, jsonb, bytea.
  • One-dimensional array of the above types: varchar[], text[], jsonb[], bytea[], xml[].
RisingWave currently supports the TOAST-able data types mentioned above. Other types that may trigger TOAST, mainly simple one-dimensional arrays with low probability, are not yet supported. For more details, please refer to the issue.
The example below demonstrates how RisingWave ingests large data that triggers PostgreSQL’s TOAST mechanism and ensures that large fields are not lost even when non-TOAST columns are updated.
  1. Create a table with TOAST-able columns in PostgreSQL.
CREATE TABLE test_toast (
    id int PRIMARY KEY,
    v_text text,
    v_json jsonb,
    v_bytea bytea
);
  1. Insert large TOAST data in PostgreSQL.
INSERT INTO test_toast (id, v_text, v_json, v_bytea)
VALUES (
    1,
    repeat('long_text_', 10000),  -- large text triggers TOAST
    jsonb_build_object('data', repeat('x', 50000)),  -- large JSON triggers TOAST
    repeat('a', 2000000)::bytea  -- large bytea triggers TOAST
);
  1. Create a RisingWave source from PostgreSQL CDC.
CREATE SOURCE src_test_toast WITH (
    connector = 'postgres-cdc',
    hostname = 'localhost',
    port = 5432,
    username = 'pguser',
    password = 'pgpassword',
    database.name = 'mydb',
    schema.name = 'public',
    slot.name = 'rw_slot_toast_example'
);
  1. Create a RisingWave table from the source.
CREATE TABLE rw_test_toast(*) FROM src_test_toast TABLE 'public.test_toast';
  1. Verify data is ingested in RisingWave with TOAST preserved.
SELECT
    id,
    CASE WHEN octet_length(v_text) > 15000 THEN 'toast-triggered' ELSE 'small' END AS v_text_check,
    CASE WHEN octet_length(v_json::text) > 50000 THEN 'toast-triggered' ELSE 'small' END AS v_json_check,
    CASE WHEN octet_length(v_bytea) > 1500000 THEN 'toast-triggered' ELSE 'small' END AS v_bytea_check
FROM rw_test_toast
WHERE id = 1;
  1. Update non-TOAST column to test placeholder handling.
ALTER TABLE test_toast ADD COLUMN v_counter int DEFAULT 0;
UPDATE test_toast SET v_counter = 1 WHERE id = 1;
Verify TOAST columns remain intact.
SELECT
    id,
    octet_length(v_text) AS text_len,
    octet_length(v_json::text) AS json_len,
    octet_length(v_bytea) AS bytea_len,
    v_counter
FROM rw_test_toast
WHERE id = 1;
RisingWave can process changes to rows with TOASTed data even if the upstream table’s REPLICA IDENTITY is set to default. It achieves this by leveraging its own materialized state of the source data. When an UPDATE or DELETE event occurs, RisingWave uses the record stored within its materialized view to construct the full change event, rather than relying on the before field in the CDC message.

Use dbt to ingest data from PostgreSQL CDC

Here is an example of how to use dbt to ingest data from PostgreSQL CDC. In this dbt example, source and table_with_connector models will be used. For more details about these two models, please refer to Use dbt for data transformations. First, we create a source model pg_mydb.sql.
{{ config(materialized='source') }}
CREATE SOURCE {{ this }} WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '8306',
    username = 'root',
    password = '123456',
    database.name = 'mydb',
    slot.name = 'mydb_slot'
);
And then we create a table_with_connector model tt3.sql.
{{ config(materialized='table_with_connector') }}
CREATE TABLE {{ this }} (
    v1 integer primary key,
    v2 timestamp with time zone
) FROM {{ ref('pg_mydb') }} TABLE 'public.tt3';

Extract metadata from sources

The INCLUDE clause allows you to ingest fields not included in the main Debezium payload (such as metadata). See Extracting metadata from sources for details. The available fields are:
  • timestamp
  • partition
  • offset
  • database_name
  • collection_name

Automatically map upstream table schema

RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source. Instead of defining columns individually, you can use * when creating a table to ingest all columns from the source table. Note that * cannot be used if other columns are specified in the table creation process.
CREATE TABLE <table_name> (*) FROM <source_name> TABLE '<schema_name>.<table_name>';

Auto schema change

PREMIUM FEATUREThis is a premium feature. For a comprehensive overview of all premium features and their usage, please see RisingWave premium features.
RisingWave supports auto schema changes in Postgres CDC. It ensures that your RisingWave pipeline stays synchronized with any schema changes in the source database, reducing the need for manual updates and preventing inconsistencies. Currently, RisingWave supports the ALTER TABLE command with the following operations, and we plan to add support for additional DDL operations in future releases.
  • ADD COLUMN [DEFAULT expr]: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value.
  • DROP COLUMN: Allows you to remove an existing column from a table.
To enable this feature, set auto.schema.change = 'true' in your PostgreSQL CDC source configuration:
CREATE SOURCE pg_source WITH (
 connector = 'postgres-cdc',
 hostname = 'localhost',
 port = '5432',
 username = 'your_user',
 password = 'your_password',
 database.name = 'your_database',
 schema.name = 'public',
 auto.schema.change = 'true'
);
Create a RisingWave table from the PostgreSQL source:
CREATE TABLE my_table (
    id INT PRIMARY KEY,
    name VARCHAR
)
FROM pg_source TABLE 'public.my_upstream_table';
Add columns to the PostgreSQL table and observe the changes in RisingWave:
-- In your PostgreSQL database:
ALTER TABLE my_upstream_table ADD COLUMN v1 VARCHAR(255);
ALTER TABLE my_upstream_table ADD COLUMN v2 NUMERIC(5,2);
After the changes in the upstream table, the schema of the table in RisingWave will also be changed. You can verify this by running DESCRIBE my_table; in RisingWave.
-- In RisingWave:
DESCRIBE my_table;

Ingest data from a partitioned table

RisingWave supports ingesting data from a partitioned table. To configure a publication for your CDC stream, PostgreSQL, by default, creates publications with publish_via_partition_root = false. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table. If you need to read from the partitioned table, you should explicitly set this property to TRUE when creating a publication. Execute the following command in your upstream PostgreSQL database:
CREATE PUBLICATION publication_name FOR table_name WITH (publish_via_partition_root = true);
If you let RisingWave create the publication, it will automatically set publish_via_partition_root = true. Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.

Expression as a column

RisingWave allows users to define expressions as table columns. For example, in the SQL statement below, next_id is not a column from the source PostgreSQL table. Instead, it is a generated column that RisingWave computes dynamically while ingesting data. The value of next_id for each row is always equal to id + 1:
CREATE TABLE person (
  id integer PRIMARY KEY,
  name varchar,
  next_id int AS id + 1,
  PRIMARY KEY (id)
) FROM pg_mydb TABLE 'public.person';
Currently, generated columns must appear at the end of the schema definition. If a column from the upstream source appears after a generated column, RisingWave will return an error. For example, the following statement will fail because name, an upstream column, is placed after the generated column next_id:
CREATE TABLE person (
  id integer PRIMARY KEY,
  next_id int AS id + 1,
  name varchar,
  PRIMARY KEY (id)
) FROM pg_mydb TABLE 'public.person';
To avoid errors, ensure that all generated columns are positioned at the end of the schema definition.

Time travel

RisingWave does not support time travel for the native PostgreSQL CDC connector.

What’s next?