Sink data from RisingWave to PostgreSQL
This guide will show you how to sink data from RisingWave to PostgreSQL using the JDBC connector. The sink parameters are similar to those for other JDBC-available databases, such as MySQL. However, we will cover the configurations specific to PostgreSQL and how to verify that data is successfully sunk.
You can test out this process on your own device by using the postgres-sink
demo in the integration_test directory of the RisingWave repository.
Set up a PostgreSQL database
Set up a PostgreSQL RDS instance on AWS
Here we will use a standard class instance without Multi-AZ deployment as an example.
- Log in to the AWS console. Search “RDS” in services and select the RDS panel.
- Create a database with PostgreSQL as the Engine type. We recommend setting up a username and password or using other security options.
- When the new instance becomes available, click on its panel.
- From the Connectivity panel, we can find the endpoint and connection port information.
Connect to the RDS instance from Postgres
Now we can connect to the RDS instance. Make sure you have installed psql on your local machine, and start a psql prompt. Fill in the endpoint, the port, and login credentials in the connection parameters.
For more login options, refer to the RDS connection guide.
Create a table in PostgreSQL
Use the following query to set up a table in PostgreSQL. We will sink to this table from RisingWave.
Set up RisingWave
Install and launch RisingWave
To install and start RisingWave locally, see the Get started guide. We recommend running RisingWave locally for testing purposes.
Notes about running RisingWave from binaries
If you are running RisingWave locally from binaries and intend to use the native CDC source connectors or the JDBC sink connector, make sure you have JDK 11 or later versions installed in your environment.
Create a sink
Syntax
Parameters
All WITH
options are required unless noted.
Parameter or clause | Description |
---|---|
sink_name | Name of the sink to be created. |
sink_from | A clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified. |
AS select_query | A SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified.See SELECT for the syntax and examples of the SELECT command. |
connector | Sink connector type must be ‘jdbc’ for PostgresQL sink. |
jdbc.url | The JDBC URL of the destination database necessary for the driver to recognize and connect to the database. |
jdbc.query.timeout | Specifies the timeout for the operations to downstream. If not set, the default is 10 minutes. |
table.name | The table in the destination database you want to sink to. |
schema.name | Optional. The schema in the destination database you want to sink to. The default value is public. |
type | Sink data type. Supported types: append-only: Sink data as INSERT operations. upsert: Sink data as UPDATE, INSERT and DELETE operations. |
primary_key | Required if type is upsert. The primary key of the sink, which should match the primary key of the downstream table. |
Sink data from RisingWave to PostgreSQL
Create source and materialized view
You can sink data from a table or a materialized view in RisingWave to PostgreSQL.
For demonstration purposes, we’ll create a source and a materialized view, and then sink data from the materialized view. If you already have a table or materialized view to sink data from, you don’t need to perform this step.
Run the following query to create a source to read data from a Kafka broker.
Next, we will create a materialized view that queries the number of targets for each target_id
. Note that the materialized view and the target table share the same schema.
Sink from RisingWave
Use the following query to sink data from the materialized view to the target table in PostgreSQL. Ensure that the jdbc_url
is accurate and reflects the PostgreSQL database that you are connecting to. See CREATE SINK for more details.
Verify update
To ensure that the target table has been updated, query from target_count
in PostgreSQL.
Data type mapping
For the PostgreSQL data type mapping table, see the Data type mapping table under the Ingest data from PostgreSQL CDC topic.
Additional notes regarding sinking data to PostgreSQL:
- A
varchar
column in RisingWave can be sinked to auuid
column in Postgres. - Only one-dimensional arrays in RisingWave can be sinked to PostgreSQL.
- For array type, we only support
smallint
,integer
,bigint
,real
,double precision
, andvarchar
type now.
Was this page helpful?