Skip to main content

Sink data from RisingWave to PostgreSQL

This guide will show you how to sink data from RisingWave to PostgreSQL using the JDBC connector. The sink parameters are similar to those for other JDBC-available databases, such as MySQL. However, we will cover the configurations specific to PostgreSQL and how to verify that data is successfully sunk.

You can test out this process on your own device by using the postgres-sink demo in the integration_test directory of the RisingWave repository.

Set up a PostgreSQL database

Launch and set up PostgreSQL

To install PostgreSQL locally, see their download options.

note

If you are using the demo version, connect to PostgreSQL with the following command. Ensure that all other programs are disconnected from port 5432.

psql postgresql://myuser:123456@127.0.0.1:5432/mydb

Ensure that the Postgres user is granted the following privileges on the used table with the following SQL query.

GRANT SELECT, INSERT, UPDATE, DELETE ON [table_name] TO [username];

Create a table in PostgreSQL

Use the following query to set up a table in PostgreSQL. We will sink to this table from RisingWave.

CREATE TABLE target_count (
target_id VARCHAR(128) PRIMARY KEY,
target_count BIGINT
);

Set up RisingWave

Install and launch RisingWave

To install and start RisingWave locally, see the Get started guide. We recommend running RisingWave locally for testing purposes.

Enable the connector node in RisingWave

The native PostgreSQL CDC connector is implemented by the connector node in RisingWave. The connector node handles the connections with upstream and downstream systems.

The connector node is enabled by default in this docker-compose configuration. To learn about how to start RisingWave with this configuration, see Docker Compose.

If you are running RisingWave locally with the pre-built library or with the source code, the connector node needs to be started separately. To learn about how to start the connector node in this case, see Enable the connector node.

Create a sink​

Syntax​

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='jdbc',
field_name = 'field', ...
);

Parameters​

All WITH options are required.

Parameter or clauseDescription
sink_nameName of the sink to be created.
sink_fromA clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified.
AS select_queryA SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified.See SELECT for the syntax and examples of the SELECT command.
connectorSink connector type. Currently, only ‘kafka’ is supported. If there is a particular sink you are interested in, go to the Integrations Overview page to see the full list of connectors and integrations we are working on.
jdbc.urlThe JDBC URL of the destination database necessary for the driver to recognize and connect to the database.
table.nameThe table in the destination database you want to sink to.
typeData format. Allowed formats:
  • append-only: Output data with insert operations.
  • upsert: Output data as a changelog stream.
If creating an upsert sink, see the Overview on when to define the primary key.

Sink data from RisingWave to PostgreSQL

Create source and materialized view

You can sink data from a table, source, or materialized view in RisingWave to PostgreSQL.

For demonstration purposes, we'll create a source and a materialized view, and then sink data from the materialized view. If you already have a table or materialized view to sink data from, you don't need to perform this step.

Run the following query to create a source to read data from a Kafka broker.

CREATE SOURCE user_behaviors (
user_id VARCHAR,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR
) WITH (
connector = 'kafka',
topic = 'user_behaviors',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Next, we will create a materialized view that queries the number of targets for each target_id. Note that the materialized view and the target table share the same schema.

CREATE MATERIALIZED VIEW target_count AS
SELECT
target_id,
COUNT(*) AS target_count
FROM
user_behaviors
GROUP BY
target_id;

Sink from RisingWave

Use the following query to sink data from the materialized view to the target table in PostgreSQL. Ensure that the jdbc_url is accurate and reflects the PostgreSQL database that you are connecting to. See CREATE SINK for more details.

CREATE SINK target_count_postgres_sink FROM target_count WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456',
table.name = 'target_count',
type = 'upsert'
);

Verify update

To ensure that the target table has been updated, query from target_count in PostgreSQL.

SELECT * FROM target_count
LIMIT 10;

Help us make this doc better!