Snowflake is a cloud-based data warehousing platform that allows for scalable and efficient data storage and analysis. For more information about Snowflake, see Snowflake official website.

Sinking from RisingWave to Snowflake utilizes Snowpipe for data loading. Initially, data is staged in a user-managed S3 bucket in JSON format, and then loaded into the Snowflake table via Snowpipe. For more information, see Overview of the Snowpipe REST endpoints to load data.

PREMIUM EDITION FEATURE

This is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.

For a full list of Premium Edition features, see RisingWave Premium Edition.

Prerequisite

  • Ensure you have an S3 bucket that RisingWave can connect to.
  • Ensure you have an upstream materialized view or source in RisingWave that you can sink data from.
  • Ensure the S3 user account has WRITE permission.
  • Ensure that Snowflake and S3 are set up in the same manner as described in the Automating Snowpipe for Amazon S3, as RisingWave is only responsible for writing data to S3.

RisingWave will not be responsible for deleting data already imported by S3. You can manually set the lifecycle configuration of your S3 bucket to clear out unnecessary data. See Lifecycle configuration and Delete staged files for more details.

Syntax

Use the following syntax to create a sink in RisingWave:

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='snowflake',
   connector_parameter = 'value', ...
);

Parameters

All parameters are required unless specified otherwise.

ParameterDescription
s3.bucket_nameThe S3 bucket where intermediate sink files will be stored.
s3.pathOptional. The S3 path to be specified.
  • If specified, the actual file location would be <s3_bucket>://<s3_path>/<rw_auto_gen_file_name>.
  • If not, it would be <s3_bucket>://<rw_auto_gen_file_name>.
s3.credentials.accessS3 access credentials.
s3.credentials.secretS3 secret credentials.
s3.region_nameThe S3 region, e.g., us-east-2.
force_append_onlyOptional. If true, forces the sink to be append-only, even if it cannot be.

Data type mapping

The following table shows the corresponding data types between RisingWave and Snowflake. For details on native RisingWave data types, see Overview of data types.

RisingWave typeSnowflake type
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
REALFLOAT4
DECIMALDECIMAL
DOUBLEFLOAT8
BYTEABINARY
VARCHARVARCHAR
BOOLEANBOOLEAN
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMPTZTIMESTAMP_TZ
INTERVALUnsupported
ARRAYARRAY
JSONBVARIANT (You need to convert JSONB to VARIANT using parse_json.)

Example

Here is an example on how you can sink data from RisingWave to Snowflake.

Set up S3

Set up an external S3 bucket and ensure you have the corresponding credentials. Both Snowflake stage and RisingWave sink creation require these credentials:

  • snowflake.s3_bucket: URL in Snowflake stage.
  • snowflake.aws_access_key_id: AWS_KEY_ID in Snowflake stage.
  • snowflake.aws_secret_access_key: AWS_SECRET_KEY in Snowflake stage.

Set up Snowflake

Next, you need to set up a table, a stage, and a pipe. Additionally, make sure to open the SQS queue in S3.

To complete the setup, follow the instructions in Automating Snowpipe for Amazon S3.

Sink data with append-only

Now you can start sinking data. Launch your RisingWave cluster and execute the following SQL queries to create source and sink. See the examples below:

Create source
CREATE SOURCE s1_source (id int,name varchar)
WITH (
	connector ='datagen',
	fields.id.kind ='sequence',
	fields.id.start ='1',
	fields.id.end ='10000',
	fields.name.kind ='random',
	fields.name.length ='10',
	datagen.rows.per.second ='200'
 ) FORMAT PLAIN ENCODE JSON;
Create sink
CREATE SINK snowflake_sink FROM ss_mv WITH (
    connector = 'snowflake',
    type = 'append-only',
    s3.bucket_name = 'EXAMPLE_S3_BUCKET',
    s3.credentials.access = 'EXAMPLE_AWS_ACCESS',
    s3.credentials.secret = 'EXAMPLE_AWS_SECRET',
    s3.region_name = 'EXAMPLE_REGION',
    s3.path = 'EXAMPLE_S3_PATH',
    force_append_only = 'true'
);

Sink data with upsert

Snowpipe only ingests new rows; it does not apply updates or deletes.

To surface those changes, you need to create a materialized view using AS CHANGELOG in RisingWave.
This instructs RisingWave to add two metadata columns to each change log row:

  • changelog_op: the change type (1 = INSERT, 2 = DELETE, 3 = UPDATE-AFTER, 4 = UPDATE-BEFORE)
  • _changelog_row_id: a monotonically increasing identifier that captures the order of the change

You can rename these columns (for example, to __op and __row_id) in the SELECT list so they appear in the Snowflake sink.

RisingWave stops here. It does not merge the incremental data for you.
You can choose one of the following approaches to obtain an up-to-date view in Snowflake:

  1. Merge into a new table: run an explicit MERGE INTO that keeps the newest _changelog_row_id per primary key.
  2. Dynamic tables: create a dynamic table that expresses the same logic and lets Snowflake materialize it continuously.

The examples below demonstrate both approaches.

Example workflow

  1. Create a materialized view in RisingWave that emits change logs.
CREATE MATERIALIZED VIEW ss_mv AS
WITH sub AS CHANGELOG FROM user_behaviors
SELECT
    user_id,
    target_id,
    event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' AS event_timestamp,
    changelog_op AS __op,
    _changelog_row_id::BIGINT AS __row_id
FROM sub;
  1. Create the Snowflake sink in RisingWave.
CREATE SINK snowflake_sink FROM ss_mv WITH (
    connector            = 'snowflake',
    type                 = 'append-only',
    s3.bucket_name       = 'EXAMPLE_S3_BUCKET',
    s3.credentials.access= 'EXAMPLE_AWS_ACCESS',
    s3.credentials.secret= 'EXAMPLE_AWS_SECRET',
    s3.region_name       = 'EXAMPLE_REGION',
    s3.path              = 'EXAMPLE_S3_PATH'
);
  1. Create a warehouse in Snowflake.
CREATE WAREHOUSE test_warehouse;
  1. Approach A: Merge on read

In this approach you periodically run a MERGE INTO statement to fold the change-log records into the target table. The workflow relies on three Snowflake tables:

  • target_table: Stores the consolidated, up-to-date data.
  • cdc_table: Holds the incremental change-log records produced by the sink.
  • offset_table: Records the last processed _row_id for each CDC table. If you maintain multiple CDC tables, they can all share the same offset table.
-- 1. Create a table to store the results
CREATE TABLE target_table(xxx, xxx);

-- 2. Create a table to store the offset.
CREATE TABLE offset_table (table_name string, offset_row_id int); 

-- 3. Create a task to perform incremental updates on a regular basis.
CREATE OR REPLACE TASK multi_statement_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 HOUR'
AS
BEGIN
    -- 1. Query the offset from the table
    LET my_offset INT;
    
    SELECT COALESCE(MAX(offset_row_id), 0) INTO :my_offset
    FROM offset_table
    WHERE table_name = 'cdc_table'
    LIMIT 1;

    -- 2. Query max row id
    LET max_row_id INT;
    
    SELECT COALESCE(MAX(__row_id), 0) INTO :max_row_id
    FROM cdc_table;
    
    -- 3. Use MERGE INTO statement to merge the data from cdc_table to target_table
    MERGE INTO target_table AS target
    USING (
        SELECT *
        FROM (
            SELECT *,
                   ROW_NUMBER() OVER (PARTITION BY v1 ORDER BY __row_id DESC) AS dedupe_id
            FROM cdc_table
            WHERE __row_id > :my_offset and __row_id <= :max_row_id 
        ) AS subquery
        WHERE dedupe_id = 1
    ) AS source
    ON target.v1= source.v1
    WHEN MATCHED AND source.__op IN (2, 4) THEN DELETE
    WHEN MATCHED AND source.__op IN (1, 3) THEN UPDATE SET target.v1= source.v1, target.v2 = source.v2
    WHEN NOT MATCHED AND source.__op IN (1, 3) THEN INSERT (v1, v2) VALUES (source.v1, source.v2);
    
    -- 4. Update offset_table
    MERGE INTO offset_table AS target
    USING (SELECT 'cdc_table' AS table_name, :max_row_id AS offset_row_id) AS source
    ON target.table_name = source.table_name
    WHEN MATCHED THEN
      UPDATE SET offset_row_id = source.offset_row_id
    WHEN NOT MATCHED THEN
      INSERT (table_name, offset_row_id) VALUES (source.table_name, source.offset_row_id);
      
    -- 5. Delete old data from cdc_table
    DELETE FROM cdc_table
    WHERE __row_id <= :max_row_id;
END;

-- 4. Start the task
ALTER TASK multi_statement_task RESUME;
-- if you want execute the task once, you can
EXECUTE TASK multi_statement_task;
  1. Approach B: Dynamic tables

In this approach you let Snowflake maintain the up-to-date view by defining a dynamic table that refreshes automatically from the CDC table. Two Snowflake tables are involved:

  • cdc_table: Holds the incremental change-log records written by the sink.
  • target_table: A dynamic table that materializes the consolidated, current state.
CREATE OR REPLACE DYNAMIC TABLE target_table
TARGET_LAG = '1 hour'
WAREHOUSE  = test_warehouse
AS
SELECT *
FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY {primary_key} ORDER BY __row_id DESC) AS dedupe_id
    FROM cdc_table              -- cdc_table is the table populated by the sink
) AS sub
WHERE dedupe_id = 1
  AND (__op = 1 OR __op = 3);