Skip to main content

Sink data from RisingWave to Snowflake

This guide describes how to sink data from RisingWave to Snowflake using the Snowflake sink connector in RisingWave.

Snowflake is a cloud-based data warehousing platform that allows for scalable and efficient data storage and analysis. For more information about Snowflake, see Snowflake official website.

Public Preview

This feature is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.

Prerequisite

  • Ensure you already have a Snowflake table that you can sink data to. For additional guidance on creating a table and setting up Snowflake, refer to the Getting started guide of Snowflake documentation.

  • Ensure you have an upstream materialized view or source in RisingWave that you can sink data from.

Required permission

To successfully sink data into Snowflake, the Snowflake user account must have the appropriate permissions. These permissions include:

  • For the table: The user must have either OWNERSHIP permission, or at the very least INSERT permission.
  • For the database: The user must have USAGE permission.
  • For the schema: The user must have USAGE permission.

Syntax

Use the following syntax to create a sink in RisingWave:

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='snowflake',
connector_parameter = 'value', ...
);

Parameter

All parameters are required unless specified otherwise.

ParameterDescription
snowflake.databaseThe Snowflake database used for sinking.
snowflake.schemaThe name of the corresponding schema where sink table exists.
snowflake.pipeThe created pipe object, will be used as insertFiles target.
snowflake.account_identifierThe unique account_identifier provided by Snowflake. Please use the form <orgname>-<account_name>. See Account identifiers for more details.
snowflake.userThe user that owns the table to be sinked. The user should have been granted corresponding role. See Grant role for more details.
snowflake.rsa_public_key_fpThe public key fingerprint used when generating custom jwt_token. See Authenticating to the server for more details.
snowflake.private_keyThe RSA Privacy Enhanced Mail (PEM) key without encryption. See Key-pair authentication and key-pair rotation for more details.
snowflake.s3_bucketThe S3 bucket where intermediate sink files will be stored.
snowflake.s3_pathOptional. The S3 path to be specified. If this field is specified, the actual file location would be <s3_bucket>://<s3_path>/<rw_auto_gen_file_name>. Otherwise, it would be <s3_bucket>://<rw_auto_gen_file_name>.
snowflake.aws_access_key_idS3 credentials.
snowflake.aws_secret_access_keyS3 credentials.
snowflake.aws_regionThe S3 region, e.g., us-east-2.
snowflake.max_batch_row_numThe configurable max row(s) to batch, which should be explicitly specified.
force_append_onlyOptional. If true, forces the sink to be append-only, even if it cannot be.

Data type mapping

The following table shows the corresponding data types between RisingWave and Snowflake. For details on native RisingWave data types, see Overview of data types.

RisingWave typeSnowflake type
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
REALFLOAT4
DECIMALDECIMAL
DOUBLEFLOAT8
BYTEABINARY
VARCHARVARCHAR
BOOLEANBOOLEAN
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMPTZTIMESTAMP_TZ
INTERVALUnsupported
ARRAYARRAY
JSONBVARIANT (You need to convert JSONB to VARIANT using parse_json.)

Example

Here is an example on how you can sink data from RisingWave to Snowflake. For detailed data-pipelining and sinking logic, see Overview of the Snowpipe REST endpoints to load data.

Set up S3

Set up an external S3 bucket and ensure you have the corresponding credentials. Both Snowflake stage and RisingWave sink creation require these credentials:

  • snowflake.s3_bucket: URL in Snowflake stage.
  • snowflake.aws_access_key_id: AWS_KEY_ID in Snowflake stage.
  • snowflake.aws_secret_access_key: AWS_SECRET_KEY in Snowflake stage.

Set up Snowflake

Next, you need to set up Snowflake, which involves the following steps:

  • Generate a key-value pair for authentication.
  • Create a role and grant the appropriate permissions.
  • Set up the credential for the user (e.g., RSA_PUBLIC_KEY) and retrieve snowflake.rsa_public_key_fp for later use in RisingWave.
  • Create a table to store the sink data from RisingWave.
  • Create a stage to reference the external S3 bucket, which Snowflake will use internally to load the data.
  • Create a pipe to receive the loaded data from the pre-defined stage and copy it to the Snowflake table.

This assumes that you have already created your accounts and corresponding databases in Snowflake. For detailed authentication processes, see Authenticating to the server; for detailed commands, see Reference. You can find an example of Snowflake setup commands in snowflake_prep.sql.

Sink data

Now you can start sinking data. Launch your RisingWave cluster and execute the following SQL queries to create source and sink. See the examples below:

Create source
CREATE SOURCE s1_source (id int, name varchar)
WITH (
connector = 'datagen',
fields.id.kind = 'sequence',
fields.id.start = '1',
fields.id.end = '10000',
fields.name.kind = 'random',
fields.name.length = '10',
datagen.rows.per.second = '200'
) FORMAT PLAIN ENCODE JSON;
Create sink
CREATE SINK snowflake_sink FROM ss_mv WITH (
connector = 'snowflake',
type = 'append-only',
snowflake.database = 'EXAMPLE_DB',
snowflake.schema = 'EXAMPLE_SCHEMA',
snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE',
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
snowflake.user = 'EXAMPLE_USER',
snowflake.rsa_public_key_fp = 'EXAMPLE_FP',
snowflake.private_key = 'EXAMPLE_PK',
snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET',
snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID',
snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY',
snowflake.aws_region = 'EXAMPLE_REGION',
snowflake.max_batch_row_num = '1030',
snowflake.s3_path = 'EXAMPLE_S3_PATH',
);

Help us make this doc better!