Skip to main content

Ingest data from PostgreSQL CDC

Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, and then delivering the changes to a downstream service in real time.

RisingWave supports ingesting CDC data from PostgreSQL. Versions 10, 11, 12, 13, 14, 15, and 16 of PostgreSQL are supported.

You can ingest CDC data from PostgreSQL into RisingWave in two ways:

  • Using the built-in PostgreSQL CDC connector

    With this connector, RisingWave can connect to PostgreSQL databases directly to obtain data from the binlog without starting additional services.

  • Using a CDC tool and a message broker

    You can use a CDC tool and then use the Kafka, Pulsar, or Kinesis connector to send the CDC data to RisingWave. For more details, see the Create source via event streaming systems topic.

Set up PostgreSQL

  1. Ensure that wal_level is logical. Check by using the following statement.

    SHOW wal_level;

    By default, it is replica. For CDC, you will need to set it to logical in the database configuration file (postgresql.conf) or via a psql command. The following command will change the wal_level.

    ALTER SYSTEM SET wal_level = logical;

    Keep in mind that changing the wal_level requires a restart of the PostgreSQL instance and can affect database performance.

    note

    If you choose to create multiple CDC tables without using a shared source, be sure to set max_wal_senders to be greater than or equal to the number of synced tables. By default, max_wal_senders is 10.

  2. Assign REPLICATION, LOGIN,and CREATEDB role attributes to the user.

    For an existing user, run the following statement to assign the attributes:

    ALTER USER <username> REPLICATION LOGIN CREATEDB;

    For a new user, run the following statement to create the user and assign the attributes:

    CREATE USER <username> REPLICATION LOGIN CREATEDB;

    You can check your role attributes by using the \du psql command:

    dev-# \du
    List of roles
    Role name | Attributes | Member of
    -----------+-----------------------------------------------------------+---------
    rw | Create DB, Replication | {}
    postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}
  3. Grant required privileges to the user.

    Run the following statements to grant the required privileges to the user.

    GRANT CONNECT ON DATABASE <database_name> TO <username>;
    GRANT USAGE ON SCHEMA <schema_name> TO <username>;
    GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>;
    GRANT CREATE ON DATABASE <database_name> TO <username>;

    You can use the following statement to check the privileges of the user to the tables:

    postgres=# SELECT table_name, grantee, privilege_type
    FROM information_schema.role_table_grants
    WHERE grantee='<username>';

    An example result:

     table_name | grantee | privilege_type
    -----------+---------+----------------
    lineitem | rw | SELECT
    customer | rw | SELECT
    nation | rw | SELECT
    orders | rw | SELECT
    part | rw | SELECT
    partsupp | rw | SELECT
    supplier | rw | SELECT
    region | rw | SELECT
    (8 rows)

Notes about running RisingWave from binaries

If you are running RisingWave locally from binaries and intend to use the native CDC source connectors or the JDBC sink connector, make sure that you have JDK 11 or a later version installed in your environment.

Create a table using the native CDC connector

To ensure all data changes are captured, you must create a table or source and specify primary keys. See the CREATE TABLE command for more details.

Syntax

Syntax for creating a CDC source.

CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
connector='postgres-cdc',
<field>=<value>, ...
);

Syntax for creating a CDC table on top of this CDC Source. Note that a primary key is required and must be consistent with the upstream table. We must also specify the Postgres table name (pg_table_name) which we are selecting from.

CREATE TABLE [ IF NOT EXISTS ] table_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
[ INCLUDE timestamp AS column_name ]
WITH (
snapshot='true'
)
FROM source TABLE pg_table_name;

To check the progress of backfilling historical data, find the corresponding internal table using the SHOW INTERNAL TABLES command and query from it.

Connector parameters

Unless specified otherwise, the fields listed are required. Note that the value of these parameters should be enclosed in single quotation marks.

FieldNotes
hostnameHostname of the database.
portPort number of the database.
usernameUsername of the database.
passwordPassword of the database.
database.nameName of the database.
schema.nameOptional. Name of the schema. By default, the value is public.
table.nameName of the table that you want to ingest data from.
slot.nameOptional. The replication slot for this PostgreSQL source. By default, a unique slot name will be randomly generated. Each source should have a unique slot name. Valid replication slot names must contain only lowercase letters, numbers, and underscores, and be no longer than 63 characters.
ssl.modeOptional. The ssl.mode parameter determines the level of SSL/TLS encryption for secure communication with Postgres. Accepted values are disabled, preferred, required, verify-ca, and verify-full. The default value is disabled.
  • When set to required, it enforces TLS for establishing a connection;
  • When set to verify-ca, it verifies that the server is trustworthy by checking the certificate chain up to the root certificate stored on the client;
  • When set to verify-full, it verifies the certificate and also ensures the server hostname matches the name in the certificate.
ssl.root.certOptional. Specify the root certificate secret. You must create secret first and then use it here.
publication.nameOptional. Name of the publication. By default, the value is rw_publication. For more information, see Multiple CDC source tables.
publication.create.enableOptional. By default, the value is 'true'. If publication.name does not exist and this value is 'true', a publication.name will be created. If publication.name does not exist and this value is 'false', an error will be returned.
transactionalOptional. Specify whether you want to enable transactions for the CDC table that you are about to create. By default, the value is 'true' for shared sources, and 'false' otherwise. This feature is also supported for shared CDC sources for multi-table transactions. For performance considerations, transactions involving changes to more than 4096 rows cannot be guaranteed.
note

RisingWave implements CDC via PostgreSQL replication. Inspect the current progress via the pg_replication_slots view. Remove inactive replication slots via pg_drop_replication_slot(). RisingWave does not automatically drop inactive replication slots. You must do this manually to prevent WAL files from accumulating in the upstream PostgreSQL database.

The following fields are used when creating a CDC table.

FieldNotes
snapshotOptional. If false, CDC backfill will be disabled and only upstream events that have occurred after the creation of the table will be consumed. This option can only be applied for tables created from a shared source.
snapshot.intervalOptional. Specifies the barrier interval for buffering upstream events. The default value is 1.
snapshot.batch_sizeOptional. Specifies the batch size of a snapshot read query from the upstream table. The default value is 1000.

Regarding the INCLUDE timestamp AS column_name clause, it allows you to ingest the upstream commit timestamp. For historical data, the commit timestamp will be set to 1970-01-01 00:00:00+00:00. Here is an example:

CREATE TABLE mytable (v1 int PRIMARY KEY, v2 varchar)
INCLUDE timestamp AS commit_ts
FROM pg_source TABLE 'public.mytable';

SELECT * FROM t2 ORDER BY v1;

----RESULT
v1 | v2 | commit_ts
----+----+---------------------------
1 | aa | 1970-01-01 00:00:00+00:00
2 | bb | 1970-01-01 00:00:00+00:00
3 | cc | 2024-05-20 09:01:08+00:00
4 | dd | 2024-05-20 09:01:08+00:00

You can see the INCLUDE clause for more details.

Debezium parameters

Debezium v2.6 connector configuration properties can also be specified under the WITH clause when creating a table or shared source. Add the prefix debezium. to the connector property you want to include.

For instance, to skip unknown DDL statements, specify the schema.history.internal.skip.unparseable.ddl parameter as debezium.schema.history.internal.skip.unparseable.ddl.

CREATE SOURCE pg_mydb WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot',
debezium.schema.history.internal.skip.unparseable.ddl = 'true'
);

Data format

Data is in Debezium JSON format. Debezium is a log-based CDC tool that can capture row changes from various database management systems such as PostgreSQL, MySQL, and SQL Server and generate events with consistent structures in real time. The PostgreSQL CDC connector in RisingWave supports JSON as the serialization format for Debezium data. The data format does not need to be specified when creating a table with postgres-cdc as the source.

Metadata options

Below are the metadata columns available for PostgreSQL CDC.

FieldNotes
database_nameName of the database.
schema_nameName of the schema.
table_nameName of the table.

For instance, the person table below contains columns for typical personal information. It also includes metadata fields (database_name, schema_name, table_name) to provide contextual information about where the data resides within the PostgreSQL database.

CREATE TABLE person (
id int,
name varchar,
email_address varchar,
credit_card varchar,
city varchar,
PRIMARY KEY (id)
) INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
FROM pg_source TABLE 'public.person';

Examples

Connect to the upstream database by creating a CDC source using the CREATE SOURCE command and PostgreSQL CDC parameters. The data format is fixed as FORMAT PLAIN ENCODE JSON so it does not need to be specified.

CREATE SOURCE pg_mydb WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot'
);

With the source created, you can create multiple CDC tables that ingest data from different tables and schemas in the upstream database without needing to specify the database connection parameters again.

For instance, the following CDC table in RisingWave ingests data from table tt3 in the schema public. When specifying the PostgreSQL table name in the FROM clause after the keyword TABLE, the schema name must also be specified.

CREATE TABLE tt3 (
v1 integer primary key,
v2 timestamp with time zone
) FROM pg_mydb TABLE 'public.tt3';

You can also create another CDC table in RisingWave that ingests data from table tt4 in the schema ods.

CREATE TABLE tt4 (
v1 integer primary key,
v2 varchar,
PRIMARY KEY (v1)
) FROM pg_mydb TABLE 'ods.tt4';

To check the progress of backfilling historical data, find the corresponding internal table using the SHOW INTERNAL TABLES command and query from it.

Data type mapping

The following table shows the corresponding data type in RisingWave that should be specified when creating a source. For details on native RisingWave data types, see Overview of data types.

RisingWave data types marked with an asterisk indicate that while there is no corresponding RisingWave data type, the ingested data can still be consumed as the listed type.

note

RisingWave cannot correctly parse composite types from PostgreSQL as Debezium does not support composite types in PostgreSQL.

PostgreSQL typeRisingWave type
BOOLEANBOOLEAN
BIT(1)BOOLEAN
BIT( > 1)No support
BIT VARYING[(M)]No support
SMALLINT, SMALLSERIALSMALLINT
INTEGER, SERIALINTEGER
BIGINT, BIGSERIAL, OIDBIGINT
REALREAL
DOUBLE PRECISIONDOUBLE PRECISION
CHAR[(M)]CHARACTER VARYING
VARCHAR[(M)]CHARACTER VARYING
CHARACTER[(M)]CHARACTER VARYING
CHARACTER VARYING[(M)]CHARACTER VARYING
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONETIMESTAMP WITH TIME ZONE
TIMETZ, TIME WITH TIME ZONETIME WITHOUT TIME ZONE (assume UTC time zone)
INTERVAL [P]INTERVAL
BYTEABYTEA
JSON, JSONBJSONB
XMLCHARACTER VARYING
UUIDCHARACTER VARYING
POINTSTRUCT (with form <x REAL, y REAL>)
LTREENo support
CITEXTCHARACTER VARYING*
INETCHARACTER VARYING*
INT4RANGECHARACTER VARYING*
INT8RANGECHARACTER VARYING*
NUMRANGECHARACTER VARYING*
TSRANGECHARACTER VARYING*
TSTZRANGECHARACTER VARYING*
DATERANGECHARACTER VARYING*
ENUMCHARACTER VARYING*
DATEDATE
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6)TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMPTIMESTAMP WITHOUT TIME ZONE
NUMERIC[(M[,D])], DECIMAL[(M[,D])]numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL.
MONEY[(M[,D])]NUMERIC
HSTORENo support
HSTORENo support
INETCHARACTER VARYING*
CIDRCHARACTER VARYING*
MACADDRCHARACTER VARYING*
MACADDR8CHARACTER VARYING*

Use dbt to ingest data from PostgreSQL CDC

Here is an example of how to use dbt to ingest data from PostgreSQL CDC. In this dbt example, source and table_with_connector models will be used. For more details about these two models, please refer to Use dbt for data transformations.

First, we create a source model pg_mydb.sql.

{{ config(materialized='source') }}
CREATE SOURCE {{ this }} WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot'
);

And then we create a table_with_connector model tt3.sql.

{{ config(materialized='table_with_connector') }}
CREATE TABLE {{ this }} (
v1 integer primary key,
v2 timestamp with time zone
) FROM {{ ref('pg_mydb') }} TABLE 'public.tt3';

Automatically map upstream table schema

Premium Edition Feature

This feature is only available in the premium edition of RisingWave. The premium edition offers additional advanced features and capabilities beyond the free and community editions. If you have any questions about upgrading to the premium edition, please contact our sales team at sales@risingwave-labs.com.

Public Preview

This feature is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.

RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source. Instead of defining columns individually, you can use * when creating a table to ingest all columns from the source table. Note that * cannot be used if other columns are specified in the table creation process.

Below is an example to create a table that ingests all columns from the upstream table from the PostgreSQL database:

CREATE TABLE supplier (*) FROM pg_source TABLE 'public.supplier';

And this it the output of DESCRIBE supplier;

       Name        |       Type        | Is Hidden | Description
-------------------+-------------------+-----------+-------------
s_suppkey | bigint | false |
s_name | character varying | false |
s_address | character varying | false |
s_nationkey | bigint | false |
s_phone | character varying | false |
s_acctbal | numeric | false |
s_comment | character varying | false |
primary key | s_suppkey | |
distribution key | s_suppkey | |
table description | supplier | |
(10 rows)

Ingest data from a partitioned table

Public Preview

This feature is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.

RisingWave supports ingesting data from a partitioned table. To configure a publication for your CDC stream, note that PostgreSQL, by default, creates publications with publish_via_partition_root = false. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table.

If you need to read from the partitioned table, you should explicitly set this property to TRUE when creating a publication. Execute the following command in your upstream PostgreSQL database:

CREATE PUBLICATION publication_name FOR table_name WITH (publish_via_partition_root = true);

If you let RisingWave create the publication, it will automatically set publish_via_partition_root = true.

Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.

Monitor the progress of direct CDC

To observe the progress of direct CDC for PostgreSQL, use the following methods:

For historical data

Historical data needs to be backfilled into the table. You can check the internal state of the backfill executor as follows:

  1. Create a table to backfill historical data:

    CREATE TABLE t3 (id INTEGER, v1 TIMESTAMP WITH TIME ZONE, PRIMARY KEY(id)) FROM pg_source TABLE 'public.t3';
  2. List the internal tables to find the relevant backfill executor state:

    SHOW INTERNAL TABLES;

    Output:

    Name
    ---------------------------------
    __internal_t3_3_streamcdcscan_4
    __internal_pg_source_1_source_2
    (2 rows)
  3. Check the internal state of the backfill executor:

    SELECT * FROM __internal_t3_3_streamcdcscan_4;

    Output:

    split_id | id | backfill_finished | row_count | cdc_offset
    ----------+----+-------------------+-----------+--------------------------------------------------
    3 | 5 | t | 4 | {"Postgres": {"lsn": 4558482960, "txid": 35853}}
    (1 row)

For real-time data

RisingWave stores source offset in the internal state table of source executor. You can check the current consumed offset by checking this table and comparing it with the upstream database's log offset.

The Postgres connector commits offsets to the upstream database, allowing Postgres to free up space used by Write-Ahead Log (WAL) files. This offset commitment happens during checkpoint commits in the CDC source. If there is high checkpoint point latency, WAL files may accumulate on the upstream server.

To check WAL accumulation on the upstream Postgres server, run this SQL query on upstream Postgres:

SELECT slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS raw,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replicationSlotLag,
active
FROM pg_replication_slots;

Help us make this doc better!