Skip to main content
RisingWave allows you to query PostgreSQL tables directly with the postgres_query table-valued function (TVF). It offers a simpler alternative to Change Data Capture (CDC) when working with PostgreSQL data in RisingWave. Unlike CDC, which continuously syncs data changes, this function lets you fetch data directly from PostgreSQL when needed. Therefore, this approach is ideal for static or infrequently updated data, as it’s more resource-efficient than maintaining a constant CDC connection.
Not CDC. This approach fetches rows on demand via a table-valued function. For continuous change replication, use PostgreSQL CDC.

Syntax

Define postgres_query in either of the following forms:
postgres_query(
    hostname varchar,      -- Database hostname
    port varchar,          -- Database port
    username varchar,      -- Authentication username
    password varchar,      -- Authentication password
    database_name varchar, -- Target database name
    query varchar          -- SQL query to execute
)
postgres_query(
    cdc_source_name varchar, -- Existing PostgreSQL CDC source name
    query varchar            -- SQL query to execute
)
The source-based form requires an existing PostgreSQL CDC source. It reuses the connection properties already defined on that source, including credentials managed as secrets.

Example

  1. In your PostgreSQL database, create a table and populate it with sample data.
CREATE TABLE test (id bigint primary key, x int);
INSERT INTO test SELECT id, id::int FROM generate_series(1, 100) AS t(id);
  1. In RisingWave, use postgres_query function to retrieve rows where id > 90.
SELECT * 
FROM postgres_query('localhost', '5432', 'postgres', 'postgres', 'mydb', 'SELECT * FROM test WHERE id > 90;');
----RESULT
91 91
92 92
93 93
94 94
95 95
96 96
97 97
98 98
99 99
100 100
If you already have a PostgreSQL CDC source, you can reuse its connection properties instead of passing them inline again.
CREATE SOURCE pg_source WITH (
    connector = 'postgres-cdc',
    hostname = 'postgres',
    port = '5432',
    username = 'myuser',
    password = secret pg_password,
    database.name = 'mydb',
    schema.name = 'public'
);

SELECT *
FROM postgres_query(
    'pg_source',
    'SELECT * FROM test WHERE id > 90;'
);