Ingest real-time changes from your PostgreSQL database using change data capture (CDC).
CREATE SOURCE
or CREATE TABLE ... FROM source
syntax to connect to PostgreSQL CDC. You’ll use CREATE SOURCE
to create a shared source, and then create multiple tables from it, each representing a table in the upstream PostgreSQL database.
CREATE SOURCE
(shared source)CREATE SOURCE
statement to define the shared source with connection parameters.
CREATE TABLE ... FROM source
(per-table definition)pg_table_name
) which we are selecting from.
WITH
clause of a CREATE SOURCE
statement.
Parameter | Description | Required |
---|---|---|
connector | Must be set to 'postgres-cdc' . | Yes |
hostname | The hostname or IP address of your PostgreSQL database server. | Yes |
port | The port number of your PostgreSQL database server. The default is 5432. | Yes |
username | The username for connecting to your PostgreSQL database. | Yes |
password | The password for the PostgreSQL user. | Yes |
database.name | The name of the PostgreSQL database to connect to. | Yes |
schema.name | The name of the PostgreSQL schema to capture changes from. Defaults to 'public' . | No |
slot.name | The name of the PostgreSQL replication slot to use. If not specified, a unique name is generated. Each source needs a unique slot. Valid characters: lowercase letters, numbers, underscore. Max length: 63. | No |
publication.name | The name of the PostgreSQL publication to use. Defaults to 'rw_publication' . | No |
publication.create.enable | Whether to automatically create the publication if it doesn’t exist. Defaults to true . | No |
auto.schema.change | Set to true to enable automatic replication of DDL changes from Postgres. Defaults to false . | No |
ssl.mode | SSL/TLS encryption mode. Accepted values: disabled , preferred , required , verify-ca , verify-full . Defaults to disabled . | No |
ssl.root.cert | The PEM-encoded root certificate for verify-ca or verify-full mode. Use a secret. | No |
transactional | Ensures that changes from a single upstream transaction are processed atomically. Defaults to true for shared sources. | No |
WITH
clause of a CREATE TABLE ... FROM source
statement.
Parameter | Description | Required | Default |
---|---|---|---|
snapshot | If false , disables the initial snapshot (backfill) of the table. Only new changes will be ingested. | No | true |
WITH
clause. Prefix the Debezium parameter name with debezium.
.
For example, to skip unknown DDL statements, use:
PostgreSQL type | RisingWave type |
---|---|
BOOLEAN | BOOLEAN |
BIT(1) | BOOLEAN |
BIT( > 1) | No support |
BIT VARYING[(M)] | No support |
SMALLINT, SMALLSERIAL | SMALLINT |
INTEGER, SERIAL | INTEGER |
BIGINT, BIGSERIAL, OID | BIGINT |
REAL | REAL |
DOUBLE PRECISION | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
CHARACTER[(M)] | CHARACTER VARYING |
CHARACTER VARYING[(M)] | CHARACTER VARYING |
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE |
TIMETZ, TIME WITH TIME ZONE | TIME WITHOUT TIME ZONE (assume UTC time zone) |
INTERVAL [P] | INTERVAL |
BYTEA | BYTEA |
JSON, JSONB | JSONB |
XML | CHARACTER VARYING |
UUID | CHARACTER VARYING |
POINT | STRUCT (with form <x REAL, y REAL> ) |
LTREE | No support |
CITEXT | CHARACTER VARYING* |
INET | CHARACTER VARYING* |
INT4RANGE | CHARACTER VARYING* |
INT8RANGE | CHARACTER VARYING* |
NUMRANGE | CHARACTER VARYING* |
TSRANGE | CHARACTER VARYING* |
TSTZRANGE | CHARACTER VARYING* |
DATERANGE | CHARACTER VARYING* |
ENUM | CHARACTER VARYING* |
DATE | DATE |
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6) | TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3) | TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) |
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP | TIMESTAMP WITHOUT TIME ZONE |
NUMERIC[(M[,D])], DECIMAL[(M[,D])] | numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL. |
MONEY[(M[,D])] | NUMERIC |
HSTORE | No support |
HSTORE | No support |
INET | CHARACTER VARYING* |
CIDR | CHARACTER VARYING* |
MACADDR | CHARACTER VARYING* |
MACADDR8 | CHARACTER VARYING* |
source
and table_with_connector
models will be used. For more details about these two models, please refer to Use dbt for data transformations.
First, we create a source
model pg_mydb.sql
.
table_with_connector
model tt3.sql
.
INCLUDE
clause allows you to ingest fields not included in the main Debezium payload (such as metadata). See Extracting metadata from sources for details. The available fields are:
timestamp
partition
offset
database_name
collection_name
*
when creating a table to ingest all columns from the source table. Note that *
cannot be used if other columns are specified in the table creation process.
ALTER TABLE
command with the following operations, and we plan to add support for additional DDL operations in future releases.
ADD COLUMN [DEFAULT expr]
: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value.DROP COLUMN
: Allows you to remove an existing column from a table.auto.schema.change = 'true'
in your PostgreSQL CDC source configuration:
DESCRIBE my_table;
in RisingWave.
publish_via_partition_root = false
. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table.
If you need to read from the partitioned table, you should explicitly set this property to TRUE
when creating a publication. Execute the following command in your upstream PostgreSQL database:
publish_via_partition_root = true
.
Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.
next_id
is not a column from the source PostgreSQL table. Instead, it is a generated column that RisingWave computes dynamically while ingesting data. The value of next_id
for each row is always equal to id + 1
:
name
, an upstream column, is placed after the generated column next_id
: