Sink data from RisingWave to Snowflake
This guide describes how to sink data from RisingWave to Snowflake using the Snowflake sink connector in RisingWave.
Snowflake is a cloud-based data warehousing platform that allows for scalable and efficient data storage and analysis. For more information about Snowflake, see Snowflake official website.
Sinking from RisingWave to Snowflake utilizes Snowpipe for data loading. Initially, data is staged in a user-managed S3 bucket in JSON format, and then loaded into the Snowflake table via Snowpipe. For more information, see Overview of the Snowpipe REST endpoints to load data.
PREMIUM EDITION FEATURE
This feature is only available in the premium edition of RisingWave. The premium edition offers additional advanced features and capabilities beyond the free and community editions. If you have any questions about upgrading to the premium edition, please contact our sales team at sales@risingwave-labs.com.
PUBLIC PREVIEW
This feature is in the public preview stage, meaning it’s nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.
Prerequisite
- Ensure you have an S3 bucket that RisingWave can connect to.
- Ensure you have an upstream materialized view or source in RisingWave that you can sink data from.
- Ensure the S3 user account has
WRITE
permission. - Ensure that Snowflake and S3 are set up in the same manner as described in the Automating Snowpipe for Amazon S3, as RisingWave is only responsible for writing data to S3.
RisingWave will not be responsible for deleting data already imported by S3. You can manually set the lifecycle configuration of your S3 bucket to clear out unnecessary data. See Lifecycle configuration and Delete staged files for more details.
Syntax
Use the following syntax to create a sink in RisingWave:
Parameter
All parameters are required unless specified otherwise.
Parameter | Description |
---|---|
s3.bucket_name | The S3 bucket where intermediate sink files will be stored. |
s3.path | Optional. The S3 path to be specified.
|
s3.credentials.access | S3 access credentials. |
s3.credentials.secret | S3 secret credentials. |
s3.region_name | The S3 region, e.g., us-east-2 . |
force_append_only | Optional. If true, forces the sink to be append-only, even if it cannot be. |
Data type mapping
The following table shows the corresponding data types between RisingWave and Snowflake. For details on native RisingWave data types, see Overview of data types.
RisingWave type | Snowflake type |
---|---|
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
REAL | FLOAT4 |
DECIMAL | DECIMAL |
DOUBLE | FLOAT8 |
BYTEA | BINARY |
VARCHAR | VARCHAR |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_TZ |
INTERVAL | Unsupported |
ARRAY | ARRAY |
JSONB | VARIANT (You need to convert JSONB to VARIANT using parse_json.) |
Example
Here is an example on how you can sink data from RisingWave to Snowflake.
Set up S3
Set up an external S3 bucket and ensure you have the corresponding credentials. Both Snowflake stage and RisingWave sink creation require these credentials:
snowflake.s3_bucket
: URL in Snowflake stage.snowflake.aws_access_key_id
: AWS_KEY_ID in Snowflake stage.snowflake.aws_secret_access_key
: AWS_SECRET_KEY in Snowflake stage.
Set up Snowflake
Next, you need to set up a table, a stage, and a pipe. Additionally, make sure to open the SQS queue in S3.
To complete the setup, follow the instructions in Automating Snowpipe for Amazon S3.
Sink data with append-only
Now you can start sinking data. Launch your RisingWave cluster and execute the following SQL queries to create source and sink. See the examples below:
Sink data with upsert
Snowflake tables don’t support direct updates. To handle this, RisingWave imports incremental logs and modification order for each data piece into Snowflake. You can then merge these as you read, or create dynamic tables to read the upsert results. See How dynamic tables work for more details.
Below are some examples for your reference.
Note that RisingWave uses changelog
to transform streaming data into incremental logs. In the example above, changelog_op
represents the type of modification (Insert/Update/Delete), while _changelog_row_id
indicates the order of the modification. For more information, see AS CHANGELOG.
Was this page helpful?