Skip to main content

Ingest data from PostgreSQL CDC

Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, then delivering the changes to a downstream service in real time.

RisingWave supports ingesting CDC data from PostgreSQL. Versions 10, 11, 12, 13, and 14 of PostgreSQL are supported.

You can ingest CDC data from PostgreSQL into RisingWave in two ways:

  • Using the built-in PostgreSQL CDC connector

    With this connector, RisingWave can connect to PostgreSQL databases directly to obtain data from the binlog without starting additional services.

    Beta feature

    The built-in PostgreSQL CDC connector in RisingWave is currently in Beta. Please use with caution as stability issues may still occur. Its functionality may evolve based on feedback. Please report any issues encountered to our team.

  • Using a CDC tool and a message broker

    You can use a CDC tool then use the Kafka, Pulsar, or Kinesis connector to send the CDC data to RisingWave. For more details, see the Create source via event streaming systems topic.

Set up PostgreSQL

  1. Ensure that wal_level is logical. Check by using the following statement.

    SHOW wal_level;

    By default, it is replica. For CDC, you will need to set it to logical in the database configuration file (postgresql.conf) or via a psql command. The following command will change the wal_level.

    ALTER SYSTEM SET wal_level = logical;

    Keep in mind that changing the wal_level requires a restart of the PostgreSQL instance and can affect database performance.

  2. Assign REPLICATION, LOGIN and CREATEDB role attributes to the user.

    For an existing user, run the following statement to assign the attributes:

    ALTER USER <username> REPLICATION LOGIN CREATEDB;

    For a new user, run the following statement to create the user and assign the attributes:

    CREATE USER <username> REPLICATION LOGIN CREATEDB;

    You can check your role attributes by using the \du psql command:

    dev-# \du
    List of roles
    Role name | Attributes | Member of
    -----------+-----------------------------------------------------------+---------
    rw | Create DB, Replication | {}
    postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}
  3. Grant required privileges to the user.

    Run the following statements to grant the required privileges to the user.

    GRANT CONNECT ON DATABASE <database_name> TO <username>;   
    GRANT USAGE ON SCHEMA <schema_name> TO <username>;
    GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>;
    GRANT CREATE ON DATABASE <database_name> TO <username>;

    You can use the following statement to check the privileges of the user to the tables:

    postgres=# SELECT table_name, grantee, privilege_type
    FROM information_schema.role_table_grants
    WHERE grantee='<username>';

    An example result:

     table_name | grantee | privilege_type
    -----------+---------+----------------
    lineitem | rw | SELECT
    customer | rw | SELECT
    nation | rw | SELECT
    orders | rw | SELECT
    part | rw | SELECT
    partsupp | rw | SELECT
    supplier | rw | SELECT
    region | rw | SELECT
    (8 rows)

Enable the connector node in RisingWave

The native PostgreSQL CDC connector is implemented by the connector node in RisingWave. The connector node handles the connections with upstream and downstream systems.

The connector node is enabled by default in this docker-compose configuration. To learn about how to start RisingWave with this configuration, see Docker Compose.

If you are running RisingWave locally with the pre-built library or with the source code, the connector node needs to be started separately. To learn about how to start the connector node in this case, see Enable the connector node.

Create a table using the native CDC connector

To ensure all data changes are captured, you must create a table and specify primary keys. See the CREATE TABLE command for more details. The data format must be Debezium JSON.

Syntax

CREATE TABLE [ IF NOT EXISTS ] source_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='postgres-cdc',
<field>=<value>, ...
);

Note that a primary key is required.

WITH parameters

Unless specified otherwise, the fields listed are required.

FieldNotes
hostnameHostname of the database.
portPort number of the database.
usernameUsername of the database.
passwordPassword of the database.
database.nameName of the database.
schema.nameOptional. Name of the schema. By default, the value is public.
table.nameName of the table that you want to ingest data from.
slot.nameOptional. The replication slot for this PostgreSQL source. By default, a unique slot name will be randomly generated. Each source should have a unique slot name.
note

RisingWave implements CDC via PostgresQL replication. Inspect the current progress via the pg_replication_slots view. Remove inactive replication slots via pg_drop_replication_slot().

Data format

Data is in Debezium JSON format. Debezium is a log-based CDC tool that can capture row changes from various database management systems such as PostgreSQL, MySQL, and SQL Server and generate events with consistent structures in real time. The PostgreSQL CDC connector in RisingWave supports JSON as the serialization format for Debezium data. The data format does not need to be specified when creating a table with postgres-cdc as the source.

Example

 CREATE TABLE shipments (
shipment_id integer,
order_id integer,
origin string,
destination string,
is_arrived boolean,
PRIMARY KEY (shipment_id)
) WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'dev',
schema.name = 'public',
table.name = 'shipments'
);
Ask AI

Help us make this doc better!

Was this page helpful?

Happy React is loading...