Snowflake is a cloud-based data warehousing platform that enables scalable and efficient data storage and analysis. Starting with v2.6.0, RisingWave introduced the Snowflake v2 sink connector, providing a more powerful and improved integration. The original Snowflake connector remains supported for compatibility, but we recommend using Snowflake v2 for more powerful and comprehensive functionality.

Syntax

To use Snowflake v2, specify connector = 'snowflake_v2' in your CREATE SINK statement.
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='snowflake_v2',
   type='append-only' | 'upsert',
   connector_parameter = 'value', ...
);

Parameters

ParameterDescription
typeSink type (e.g., append-only or upsert)
intermediate.table.nameName of the intermediate table used for upsert mode. No need to fill this out in append-only mode
table.nameName of the target table
databaseSnowflake database name
schemaSnowflake schema name
write.target.interval.secondsInterval in seconds for scheduled writes (default: 3600)
warehouseSnowflake warehouse name
jdbc.urlJDBC URL to connect to Snowflake
usernameSnowflake username
passwordSnowflake password
commit_checkpoint_intervalCommit every n checkpoints (default: 10)
auto_schema_changeEnable automatic schema change for upsert sink; adds new columns to target table if needed
create_table_if_not_existsCreate target table if it does not exist
S3 parameters These options only need to be set when with_s3 = true:
ParameterDescription
with_s3Enable writing via S3 (default: true)
s3.region_nameAWS region for S3
s3.bucket_nameS3 bucket name
s3.pathS3 folder path for sink files
enable_config_loadLoad S3 credentials from environment (self-hosted only)
s3.credentials.accessAWS access key ID
s3.credentials.secretAWS secret access key
s3.endpoint_urlCustom S3 endpoint URL (for self-hosted setups)
s3.assume_roleIAM role ARN to assume for S3 access

Auto schema change

Snowflake v2 supports sink auto schema change when sink_decoupling is false. When enabled, RisingWave will automatically alter the target table to add new columns from the upstream data source. To enable it:
auto_schema_change = true
This ensures your Snowflake table stays in sync with upstream schema changes with minimal manual intervention.

Configure RisingWave to write to S3

You need to configure how RisingWave authenticates with AWS S3. There are two primary methods:
  • Access Key / Secret Key (AK/SK) authentication
This is the default method. Provide your AWS Access Key ID and Secret Access Key directly in the CREATE SINK statement.
s3.credentials.access = 'YOUR_AWS_ACCESS_KEY_ID',
s3.credentials.secret = 'YOUR_AWS_SECRET_ACCESS_KEY',
  • Assume role authentication
For enhanced security, RisingWave can assume an IAM Role in your AWS account to gain temporary credentials for S3 access.
s3.assume_role = 'arn:aws:iam::123456789012:role/YourRisingWaveS3Role',
enable_config_load = 'true',
To use this method, you need to configure an IAM Role in AWS that RisingWave can assume. This involves:
  1. Obtaining the RisingWave service ARN from our support team.
  2. Creating an IAM policy with the necessary S3 read/write permissions for your bucket and prefix.
  3. Configuring the IAM Role’s trust policy to allow the RisingWave service ARN to assume it.

Append-only and upsert modes

Snowflake v2 sink connector supports both modes for flexible data handling. In upsert mode, performance is optimized through the use of an intermediate table:
  • An intermediate table is created to stage data before merging it into the target table. If create_table_if_not_exists is set to true, the table is automatically named rw_<target_table_name>_<uuid>.
  • Data is periodically merged from the intermediate table into the target table according to the write.target.interval.seconds setting.
  • By default, an S3 bucket is required to achieve optimal ingestion performance into the intermediate table.
  • Alternatively, you can use INSERT SQL statements to load data directly into the intermediate table, though this approach is not recommended due to performance drawbacks.
Examples:
  1. Snowflake v2 sink with S3 writer (Append-only mode)
CREATE SINK snowflake_sink FROM test_table WITH (
    connector = 'snowflake_v2',
    type = 'append-only',
    table.name = 'test_table',

    -- Snowflake concepts
    warehouse = 'COMPUTE_WH',
    database = 'RW_TEST',
    schema = 'RW_SCHEMA',

    -- S3 writer (enabled by default)
    with_s3 = true,
    s3.bucket_name = '...',
    s3.region_name = '...',
    s3.path = '...',

    -- Authentication: access key / secret (default)
    s3.credentials.access = '...',
    s3.credentials.secret = '...',

    -- Or assume role (requires cloud support)
    s3.assume_role = '...',
    enable_config_load = 'true',

    -- Required if auto.schema.change = false 
    -- and create_table_if_not_exists = false
    jdbc.url = 'jdbc:snowflake://...',
    username = '...',
    password = '...',

    -- Defaults (can be overridden)
    auto.schema.change = 'false',
    create_table_if_not_exists = 'false'
);
  1. Snowflake v2 sink with S3 writer (Upsert mode):
CREATE SINK snowflake_sink FROM test_table WITH (
    connector = 'snowflake_v2',
    type = 'upsert',
    table.name = 'test_table',

    -- Snowflake concepts
    warehouse = 'COMPUTE_WH',
    database = 'RW_TEST',
    schema = 'RW_SCHEMA',

    -- Intermediate table required for upsert
    intermediate.table.name = '...',
    -- Default: 3600 seconds (1 hour)
    write.target.interval.seconds = '...',

    -- S3 writer (enabled by default)
    with_s3 = true,
    s3.bucket_name = '...',
    s3.region_name = '...',
    s3.path = '...',

    -- Authentication: access key / secret (default)
    s3.credentials.access = '...',
    s3.credentials.secret = '...',

    -- Or assume role (requires cloud support)
    s3.assume_role = '...',
    enable_config_load = 'true',

    -- Unlike append-only mode, JDBC configs are required
    jdbc.url = 'jdbc:snowflake://...',
    username = '...',
    password = '...',

    -- Defaults (can be overridden)
    auto.schema.change = 'false',
    create_table_if_not_exists = 'false'
);

Set up Snowflake

Before connecting RisingWave to Snowflake, you need to prepare a dedicated user, role, schema, and warehouse in Snowflake. If you already have a Snowflake user with the necessary privileges, you can skip this step. Follow the steps below to create and configure the required resources.
  1. Set database_namewarehouse_name, and rw_schema accordingly to use an existing database, warehouse, and/or schema.
Example
-- Set database, warehouse, role, user, schema, and password
set database_name = 'RW_DB';
set warehouse_name = 'RW_WH';
set rw_role = 'RW_ROLE';
set rw_user = 'RW_USER';
set rw_schema = 'RW_SCHEMA';
set password = 'xxxxxx';
  1. Create role and schema for RisingWave.
create role if not exists identifier($rw_role);
grant role identifier($rw_role) to role SYSADMIN;
  1. Create Snowflake database.
create database if not exists identifier($database_name);
use database identifier($database_name);
create schema if not exists identifier($rw_schema);
  1. Create a user for RisingWave.
create user if not exists identifier($rw_user) PASSWORD = $password
default_role = $rw_role
default_warehouse = $warehouse_name;
grant role identifier($rw_role) to user identifier($rw_user);
grant all on schema identifier($rw_schema) to identifier($rw_role);
grant execute task on account to role identifier($rw_role);
  1. Create a warehouse for RisingWave.
create warehouse if not exists identifier($warehouse_name)
warehouse_size = xsmall
warehouse_type = standard
auto_suspend = 60
auto_resume = true
initially_suspended = true;
  1. Grant RisingWave role access to warehouse.
grant USAGE
on warehouse identifier($warehouse_name)
to role identifier($rw_role);
  1. Grant RisingWave access to database.
grant CREATE SCHEMA, MONITOR, USAGE on database identifier($database_name) to role identifier($rw_role);
use role sysadmin;
COMMIT;

Set up Snowflake S3 integration

To enable RisingWave to write data into Snowflake through S3, you need to set up a Snowflake Storage Integration and a Stage in Snowflake. Follow these steps:
  1. Use ACCOUNTADMIN role in RW_DB.RW_SCHEMA created above.
USE SCHEMA RW_DB.RW_SCHEMA;
USE ROLE ACCOUNTADMIN;
  1. Create a Snowflake storage integration that connects to your S3 bucket. Replace the ARN and S3 path with your own.
CREATE STORAGE INTEGRATION S3_INT
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::023339134545:role/mysnowflakerole'  -- IAM role created in AWS
  STORAGE_ALLOWED_LOCATIONS = ('s3://iceberg-test/mysnowflake/');  -- Your S3 bucket with proper policy
  1. Configure IAM trust policy
Run the following command to retrieve values required for the AWS IAM trust policy.
DESC INTEGRATION S3_INT;
From the output, copy STORAGE_AWS_IAM_USER_ARN(e.g. arn:aws:iam::398211280607:user/....) and STORAGE_AWS_EXTERNAL_ID(e.g. ....c9aPS5npkOTkSO8=) to configure AWS IAM trusted policy.
  1. Validate that Snowflake can access your S3 bucket.
SELECT SYSTEM$VALIDATE_STORAGE_INTEGRATION(
  'S3_INT',
  's3://iceberg-test/mysnowflake/',
  'validate_all.txt',
  'ALL');
  1. Create an external stage.
USE SCHEMA SNOWFLAKE_LEARNING_DB.PUBLIC;
CREATE STAGE MY_S3_STAGE
  STORAGE_INTEGRATION = S3_INT
  URL = 's3://iceberg-test/mysnowflake/'
  FILE_FORMAT = (TYPE = JSON);
If you want to use public or other roles, you need to fully configure and grant these privileges:
GRANT USAGE ON STAGE RW_DB.RW_SCHEMA.MY_S3_STAGE TO ROLE <role_name>;
GRANT READ, WRITE ON STAGE RW_DB.RW_SCHEMA.MY_S3_STAGE TO ROLE <role_name>;
GRANT USAGE ON INTEGRATION S3_INT TO ROLE <role_name>;

Set up S3 IAM and role

RisingWave supports direct access to S3 using Access Key / Secret Key (AK/SK) by default. If you prefer to set up S3 IAM and role, follow the official Snowflake guide. Below are additional notes specific to RisingWave.
  1. Create an IAM policy for S3 access.
Example
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
              "s3:PutObject",
              "s3:GetObject",
              "s3:GetObjectVersion",
              "s3:DeleteObject",
              "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::<bucket>",
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "<prefix>/*"
                    ]
                }
            }
        }
    ]
}
  1. Configure the following additional trusted entities for your IAM Role to allow RisingWave arn to assume the role. Contact our support team to obtain the correct role to use. Please don’t click required external id to trust RisingWave arn here.
Example
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::<account-id>:role/<placeholder-role>"
                ]
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}