Sink data from RisingWave to Snowflake
This guide describes how to sink data from RisingWave to Snowflake using the Snowflake sink connector in RisingWave.
Snowflake is a cloud-based data warehousing platform that allows for scalable and efficient data storage and analysis. For more information about Snowflake, see Snowflake official website.
Sinking from RisingWave to Snowflake utilizes Snowpipe for data loading. Initially, data is staged in a user-managed S3 bucket in JSON format, and then loaded into the Snowflake table via Snowpipe. For more information, see Overview of the Snowpipe REST endpoints to load data.
PREMIUM EDITION FEATURE
This is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.
For a full list of Premium Edition features, see RisingWave Premium Edition.
Prerequisite
- Ensure you have an S3 bucket that RisingWave can connect to.
- Ensure you have an upstream materialized view or source in RisingWave that you can sink data from.
- Ensure the S3 user account has
WRITE
permission. - Ensure that Snowflake and S3 are set up in the same manner as described in the Automating Snowpipe for Amazon S3, as RisingWave is only responsible for writing data to S3.
RisingWave will not be responsible for deleting data already imported by S3. You can manually set the lifecycle configuration of your S3 bucket to clear out unnecessary data. See Lifecycle configuration and Delete staged files for more details.
Syntax
Use the following syntax to create a sink in RisingWave:
Parameters
All parameters are required unless specified otherwise.
Parameter | Description |
---|---|
s3.bucket_name | The S3 bucket where intermediate sink files will be stored. |
s3.path | Optional. The S3 path to be specified.
|
s3.credentials.access | S3 access credentials. |
s3.credentials.secret | S3 secret credentials. |
s3.region_name | The S3 region, e.g., us-east-2 . |
force_append_only | Optional. If true, forces the sink to be append-only, even if it cannot be. |
Data type mapping
The following table shows the corresponding data types between RisingWave and Snowflake. For details on native RisingWave data types, see Overview of data types.
RisingWave type | Snowflake type |
---|---|
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
REAL | FLOAT4 |
DECIMAL | DECIMAL |
DOUBLE | FLOAT8 |
BYTEA | BINARY |
VARCHAR | VARCHAR |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_TZ |
INTERVAL | Unsupported |
ARRAY | ARRAY |
JSONB | VARIANT (You need to convert JSONB to VARIANT using parse_json.) |
Example
Here is an example on how you can sink data from RisingWave to Snowflake.
Set up S3
Set up an external S3 bucket and ensure you have the corresponding credentials. Both Snowflake stage and RisingWave sink creation require these credentials:
snowflake.s3_bucket
: URL in Snowflake stage.snowflake.aws_access_key_id
: AWS_KEY_ID in Snowflake stage.snowflake.aws_secret_access_key
: AWS_SECRET_KEY in Snowflake stage.
Set up Snowflake
Next, you need to set up a table, a stage, and a pipe. Additionally, make sure to open the SQS queue in S3.
To complete the setup, follow the instructions in Automating Snowpipe for Amazon S3.
Sink data with append-only
Now you can start sinking data. Launch your RisingWave cluster and execute the following SQL queries to create source and sink. See the examples below:
Sink data with upsert
Snowpipe only ingests new rows; it does not apply updates or deletes.
To surface those changes, you need to create a materialized view using AS CHANGELOG
in RisingWave.
This instructs RisingWave to add two metadata columns to each change log row:
changelog_op
: the change type (1
= INSERT,2
= DELETE,3
= UPDATE-AFTER,4
= UPDATE-BEFORE)_changelog_row_id
: a monotonically increasing identifier that captures the order of the change
You can rename these columns (for example, to __op
and __row_id
) in the SELECT
list so they appear in the Snowflake sink.
RisingWave stops here. It does not merge the incremental data for you.
You can choose one of the following approaches to obtain an up-to-date view in Snowflake:
- Merge into a new table: run an explicit
MERGE INTO
that keeps the newest_changelog_row_id
per primary key. - Dynamic tables: create a dynamic table that expresses the same logic and lets Snowflake materialize it continuously.
The examples below demonstrate both approaches.
Example workflow
- Create a materialized view in RisingWave that emits change logs.
- Create the Snowflake sink in RisingWave.
- Create a warehouse in Snowflake.
- Approach A: Merge on read
In this approach you periodically run a MERGE INTO
statement to fold the change-log records into the target table. The workflow relies on three Snowflake tables:
target_table
: Stores the consolidated, up-to-date data.cdc_table
: Holds the incremental change-log records produced by the sink.offset_table
: Records the last processed_row_id
for each CDC table. If you maintain multiple CDC tables, they can all share the same offset table.
- Approach B: Dynamic tables
In this approach you let Snowflake maintain the up-to-date view by defining a dynamic table that refreshes automatically from the CDC table. Two Snowflake tables are involved:
cdc_table
: Holds the incremental change-log records written by the sink.target_table
: A dynamic table that materializes the consolidated, current state.