Prerequisites
Before connecting to RisingWave, you must enable logical replication on your upstream database. The specific steps depend on your provider. Select your platform below for detailed setup instructions:- Set up a self-hosted PostgreSQL database
- Set up AWS RDS or Aurora PostgreSQL
- Set up a Neon serverless PostgreSQL database
- Set up a Supabase project
User permissions
Before connecting to PostgreSQL CDC, you need to set up proper user permissions. The PostgreSQL user used for CDC must have specific privileges to perform logical replication.Required user permissions
The replication user needs the following permissions:- REPLICATIONprivilege
- CONNECTprivilege on the target database
- SELECTprivilege on the tables to be replicated
- Permission to create publications (if publication.create.enableis enabled)
Publication permissions
If you don’t make any special settings, RisingWave will automatically help you create publications. In this case, the user needs:- CREATE PUBLICATION permission: The user must be the database owner, have SUPERUSER privileges, or have CREATE privileges on the database
- ALTER PUBLICATION ADD TABLE permission: The user must be the table owner or have SUPERUSER privileges
publication.create.enable = 'false', the user only needs SELECT privileges on the tables, and you must manually create the publication. This approach reduces permission requirements since you don’t need to be the table owner.
Setting up permissions
For basic user creation and privilege granting, see the setup instructions for your specific PostgreSQL deployment:Connect to PostgreSQL
RisingWave uses theCREATE SOURCE or CREATE TABLE ... FROM source syntax to connect to PostgreSQL CDC. You’ll use CREATE SOURCE to create a shared source, and then create multiple tables from it, each representing a table in the upstream PostgreSQL database.
CREATE SOURCE
Use the CREATE SOURCE statement to define the source with connection parameters.
CREATE TABLE ... FROM source (per-table definition)
Create a table on top of the defined source. Note that a primary key is required and must be consistent with the upstream table. We must also specify the Postgres table name (pg_table_name) which we are selecting from.
Basic connection example
Configuration Parameters
This section contains a complete reference for all parameters used when creating a PostgreSQL CDC source.Connector parameters
These parameters are used in theWITH clause of a CREATE SOURCE statement.
| Parameter | Description | Required | 
|---|---|---|
| connector | Must be set to 'postgres-cdc'. | Yes | 
| hostname | The hostname or IP address of your PostgreSQL database server. | Yes | 
| port | The port number of your PostgreSQL database server. The default is 5432. | Yes | 
| username | The username for connecting to your PostgreSQL database. | Yes | 
| password | The password for the PostgreSQL user. | Yes | 
| database.name | The name of the PostgreSQL database to connect to. | Yes | 
| schema.name | The name of the PostgreSQL schema to capture changes from. Defaults to 'public'. | No | 
| slot.name | The name of the PostgreSQL replication slot to use. If not specified, a unique name is generated. Each source needs a unique slot. Valid characters: lowercase letters, numbers, underscore. Max length: 63. | No | 
| publication.name | The name of the PostgreSQL publication to use. Defaults to 'rw_publication'. | No | 
| publication.create.enable | Whether to automatically create the publication if it doesn’t exist. Defaults to true. | No | 
| auto.schema.change | Set to trueto enable automatic replication of DDL changes from Postgres. Defaults tofalse. | No | 
| ssl.mode | SSL/TLS encryption mode. Accepted values: disabled,preferred,required,verify-ca,verify-full. Defaults todisabled. | No | 
| ssl.root.cert | The PEM-encoded root certificate for verify-caorverify-fullmode. Use a secret. | No | 
| transactional | Ensures that changes from a single upstream transaction are processed atomically. Defaults to truefor shared sources. | No | 
Per-table parameters
These parameters are used in theWITH clause of a CREATE TABLE ... FROM source statement.
| Parameter | Description | 
|---|---|
| snapshot | Optional. If false, disables the initial snapshot (backfill) of the table. Only new changes will be ingested. The default value istrue. | 
| snapshot.interval | Optional. Specifies the barrier interval for buffering upstream events. The default value is 1. | 
| snapshot.batch_size | Optional. Specifies the batch size of a snapshot read query from the upstream table. The default value is 1000. | 
| backfill.parallelism | Optional. Controls the parallelism of CDC table backfill. When set to a value greater than 0, enables parallelized CDC backfill. The default value is 0(disabled). | 
| backfill_num_rows_per_split | Optional. Specifies number of rows per split for parallelized CDC backfill. Only effective when backfill.parallelism > 0. The default value is100000. | 
| backfill_as_even_splits | Optional. Whether to create even splits for parallelized CDC backfill. Only effective when backfill.parallelism > 0. The default value istrue. | 
Debezium parameters
You can also specify any valid Debezium PostgreSQL connector configuration property in theWITH clause. Prefix the Debezium parameter name with debezium..
For example, to skip unknown DDL statements, use:
Features and reference
Data format
The PostgreSQL CDC connector uses the Debezium JSON format for data.Supported data types
The following table shows the data type mapping from PostgreSQL to RisingWave.RisingWave does not support directly creating tables from PostgreSQL composite types. If you want to read composite type data, you will need to use a source and create a materialized view based off that source.
| PostgreSQL type | RisingWave type | 
|---|---|
| BOOLEAN | BOOLEAN | 
| BIT(1) | BOOLEAN | 
| BIT( > 1) | No support | 
| BIT VARYING[(M)] | No support | 
| SMALLINT, SMALLSERIAL | SMALLINT | 
| INTEGER, SERIAL | INTEGER | 
| BIGINT, BIGSERIAL, OID | BIGINT | 
| REAL | REAL | 
| DOUBLE PRECISION | DOUBLE PRECISION | 
| CHAR[(M)] | CHARACTER VARYING | 
| VARCHAR[(M)] | CHARACTER VARYING | 
| CHARACTER[(M)] | CHARACTER VARYING | 
| CHARACTER VARYING[(M)] | CHARACTER VARYING | 
| TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE | 
| TIMETZ, TIME WITH TIME ZONE | TIME WITHOUT TIME ZONE (assume UTC time zone) | 
| INTERVAL [P] | INTERVAL | 
| BYTEA | BYTEA | 
| JSON, JSONB | JSONB | 
| XML | CHARACTER VARYING | 
| UUID | CHARACTER VARYING | 
| POINT | STRUCT (with form <x REAL, y REAL>) | 
| LTREE | No support | 
| CITEXT | CHARACTER VARYING* | 
| INET | CHARACTER VARYING* | 
| INT4RANGE | CHARACTER VARYING* | 
| INT8RANGE | CHARACTER VARYING* | 
| NUMRANGE | CHARACTER VARYING* | 
| TSRANGE | CHARACTER VARYING* | 
| TSTZRANGE | CHARACTER VARYING* | 
| DATERANGE | CHARACTER VARYING* | 
| ENUM | CHARACTER VARYING* | 
| DATE | DATE | 
| TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6) | TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) | 
| TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3) | TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40)) | 
| TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP | TIMESTAMP WITHOUT TIME ZONE | 
| NUMERIC[(M[,D])], DECIMAL[(M[,D])] | numeric, rw_int256, or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw_int256 or varchar. Note that rw_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL. | 
| MONEY[(M[,D])] | NUMERIC | 
| HSTORE | No support | 
| HSTORE | No support | 
| INET | CHARACTER VARYING* | 
| CIDR | CHARACTER VARYING* | 
| MACADDR | CHARACTER VARYING* | 
| MACADDR8 | CHARACTER VARYING* | 
Support for PostgreSQL TOAST
Added in v2.6.0.
- Standard types: varchar,text,xml,jsonb,bytea.
- One-dimensional array of the above types: varchar[],text[],jsonb[],bytea[], xml[].
RisingWave currently supports the TOAST-able data types mentioned above. Other types that may trigger TOAST, mainly simple one-dimensional arrays with low probability, are not yet supported. For more details, please refer to the issue.
- Create a table with TOAST-able columns in PostgreSQL.
- Insert large TOAST data in PostgreSQL.
- Create a RisingWave source from PostgreSQL CDC.
- Create a RisingWave table from the source.
- Verify data is ingested in RisingWave with TOAST preserved.
- Update non-TOAST column to test placeholder handling.
REPLICA IDENTITY is set to default. It achieves this by leveraging its own materialized state of the source data. When an UPDATE or DELETE event occurs, RisingWave uses the record stored within its materialized view to construct the full change event, rather than relying on the before field in the CDC message.
Use dbt to ingest data from PostgreSQL CDC
Here is an example of how to use dbt to ingest data from PostgreSQL CDC. In this dbt example,source and table_with_connector models will be used. For more details about these two models, please refer to Use dbt for data transformations.
First, we create a source model pg_mydb.sql.
table_with_connector model tt3.sql.
Extract metadata from sources
TheINCLUDE clause allows you to ingest fields not included in the main Debezium payload (such as metadata). See Extracting metadata from sources for details. The available fields are:
- timestamp
- partition
- offset
- database_name
- collection_name
Automatically map upstream table schema
RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source. Instead of defining columns individually, you can use* when creating a table to ingest all columns from the source table. Note that * cannot be used if other columns are specified in the table creation process.
Auto schema change
PREMIUM FEATUREThis is a premium feature. For a comprehensive overview of all premium features and their usage, please see RisingWave premium features.
ALTER TABLE command with the following operations, and we plan to add support for additional DDL operations in future releases.
- ADD COLUMN [DEFAULT expr]: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value.
- DROP COLUMN: Allows you to remove an existing column from a table.
auto.schema.change = 'true' in your PostgreSQL CDC source configuration:
DESCRIBE my_table; in RisingWave.
Ingest data from a partitioned table
RisingWave supports ingesting data from a partitioned table. To configure a publication for your CDC stream, PostgreSQL, by default, creates publications withpublish_via_partition_root = false. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table.
If you need to read from the partitioned table, you should explicitly set this property to TRUE when creating a publication. Execute the following command in your upstream PostgreSQL database:
publish_via_partition_root = true.
Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.
Expression as a column
RisingWave allows users to define expressions as table columns. For example, in the SQL statement below,next_id is not a column from the source PostgreSQL table. Instead, it is a generated column that RisingWave computes dynamically while ingesting data. The value of next_id for each row is always equal to id + 1:
name, an upstream column, is placed after the generated column next_id:
Time travel
RisingWave does not support time travel for the native PostgreSQL CDC connector.What’s next?
- Supported data formats: Data formats and encoding options
- Monitor CDC ingestion progress