This guide describes how to ingest batch data from Snowflake tables into RisingWave using the ADBC (Arrow Database Connectivity) connector. This enables you to create refreshable tables that periodically pull data from Snowflake.
Snowflake is a cloud-based data warehousing platform that allows for scalable and efficient data storage and analysis. For more information about Snowflake, see Snowflake official website.
Prerequisites
- A Snowflake account with access to the database and tables you want to ingest.
- The Snowflake account identifier (e.g.,
myaccount.us-east-1).
- Valid authentication credentials (username/password, OAuth token, JWT private key, etc.).
- Network access from RisingWave to your Snowflake instance.
Connecting to Snowflake
RisingWave supports loading data from Snowflake tables using the adbc_snowflake connector. This creates a refreshable table that periodically fetches the latest data from Snowflake.
Syntax
CREATE TABLE table_name (
primary key (order_id) -- Replace with your actual primary key column(s)
) WITH (
connector = 'adbc_snowflake',
refresh_mode = 'FULL_RELOAD',
refresh_interval_sec = 'interval_in_seconds',
adbc_snowflake.account = 'snowflake_account',
adbc_snowflake.username = 'username',
adbc_snowflake.password = 'password',
adbc_snowflake.database = 'database_name',
adbc_snowflake.schema = 'schema_name',
adbc_snowflake.warehouse = 'warehouse_name',
adbc_snowflake.table = 'source_table_in_snowflake'
);
Automatic Schema InferenceColumn definitions are automatically inferred from the Snowflake table and should not be manually specified in the CREATE TABLE statement. You must specify a primary key when creating a Snowflake table in RisingWave.
Parameters
Unless specified otherwise, parameters are required.
| Parameter | Description |
|---|
| connector | Must be adbc_snowflake. |
| refresh_mode | Must be FULL_RELOAD. The entire table is re-read on each refresh. |
| refresh_interval_sec | Optional. The refresh interval in seconds. Determines how frequently data is fetched from Snowflake. |
| adbc_snowflake.account | The Snowflake account identifier (e.g., myaccount.us-east-1 or myaccount). |
| adbc_snowflake.username | The Snowflake username for authentication. |
| adbc_snowflake.password | Optional. The password for username/password authentication. Required when using username/password authentication (the default auth type). |
| adbc_snowflake.database | The name of the Snowflake database. |
| adbc_snowflake.schema | The Snowflake schema containing the table. |
| adbc_snowflake.warehouse | The Snowflake warehouse to use for queries. |
| adbc_snowflake.table | The name of the Snowflake table to ingest. |
| adbc_snowflake.auth_type | Optional. The authentication method. Default is auth_snowflake (username/password). Other options: auth_oauth, auth_jwt, auth_ext_browser, auth_okta, auth_mfa, auth_pat, auth_wif. |
| adbc_snowflake.auth_token | Optional. OAuth token for authentication. Required when adbc_snowflake.auth_type is auth_oauth. |
| adbc_snowflake.jwt_private_key_path | Optional. Local file path on the RisingWave server to the JWT private key file (e.g., /path/to/key.pem). |
| adbc_snowflake.jwt_private_key_pkcs8_value | Optional. Inline PKCS#8 private key value for JWT authentication. |
| adbc_snowflake.jwt_private_key_pkcs8_password | Optional. Password for adbc_snowflake.jwt_private_key_pkcs8_value, if the private key is encrypted. |
- If
refresh_interval_sec is omitted, no automatic refresh is scheduled. Run REFRESH TABLE to load or refresh data.
- When using JWT authentication (set
adbc_snowflake.auth_type to auth_jwt), provide exactly one of adbc_snowflake.jwt_private_key_path and adbc_snowflake.jwt_private_key_pkcs8_value.
Authentication methods
The Snowflake connector supports multiple authentication methods:
Username and password (default)
CREATE TABLE my_snowflake_table (
primary key ("order_id")
) WITH (
connector = 'adbc_snowflake',
refresh_mode = 'FULL_RELOAD',
refresh_interval_sec = '3600',
adbc_snowflake.account = 'myaccount.us-east-1',
adbc_snowflake.username = 'myuser',
adbc_snowflake.password = 'mypassword',
adbc_snowflake.database = 'SALES_DB',
adbc_snowflake.schema = 'PUBLIC',
adbc_snowflake.warehouse = 'COMPUTE_WH',
adbc_snowflake.table = 'ORDERS'
);
JWT authentication
CREATE TABLE my_snowflake_table (
primary key ("order_id")
) WITH (
connector = 'adbc_snowflake',
refresh_mode = 'FULL_RELOAD',
refresh_interval_sec = '7200',
adbc_snowflake.account = 'myaccount',
adbc_snowflake.username = 'myuser',
adbc_snowflake.database = 'SALES_DB',
adbc_snowflake.schema = 'PUBLIC',
adbc_snowflake.warehouse = 'COMPUTE_WH',
adbc_snowflake.table = 'ORDERS',
adbc_snowflake.auth_type = 'auth_jwt',
adbc_snowflake.jwt_private_key_path = '/path/to/key.pem'
);
OAuth authentication
CREATE TABLE my_snowflake_table (
primary key ("order_id")
) WITH (
connector = 'adbc_snowflake',
refresh_mode = 'FULL_RELOAD',
refresh_interval_sec = '3600',
adbc_snowflake.account = 'myaccount.us-east-1',
adbc_snowflake.username = 'myuser',
adbc_snowflake.database = 'SALES_DB',
adbc_snowflake.schema = 'PUBLIC',
adbc_snowflake.warehouse = 'COMPUTE_WH',
adbc_snowflake.table = 'ORDERS',
adbc_snowflake.auth_type = 'auth_oauth',
adbc_snowflake.auth_token = 'your_oauth_token'
);
Data type mapping
The following table shows the corresponding data types between Snowflake and RisingWave. For details on native RisingWave data types, see Overview of data types.
| Snowflake type | RisingWave type | Notes |
|---|
| STRING | VARCHAR | |
| NUMBER | DECIMAL or BIGINT | Depends on scale and precision |
| FLOAT | DOUBLE PRECISION | |
| DECIMAL | DECIMAL | |
| CHAR | VARCHAR | |
| TEXT | VARCHAR | |
| DATE | DATE | |
| TIME | Not supported | Will report an error |
| TIMESTAMP_NTZ | TIMESTAMP WITHOUT TIME ZONE | |
| TIMESTAMP_LTZ | TIMESTAMP WITH TIME ZONE | |
| TIMESTAMP_TZ | TIMESTAMP WITH TIME ZONE | |
| BOOLEAN | BOOLEAN | |
| BINARY | BYTEA | |
| VARIANT | VARCHAR | JSON data stored as string |
| OBJECT | VARCHAR | JSON objects stored as string |
| ARRAY | VARCHAR | Arrays stored as string |
Complete example
This example demonstrates how to create a refreshable table that loads data from a Snowflake table every hour.
Step 1: Create the refreshable table
CREATE TABLE snowflake_orders (
primary key ("order_id")
) WITH (
connector = 'adbc_snowflake',
refresh_mode = 'FULL_RELOAD',
refresh_interval_sec = '3600', -- Refresh every hour
-- Snowflake connection parameters
adbc_snowflake.account = 'myaccount.us-east-1',
adbc_snowflake.username = 'analytics_user',
adbc_snowflake.password = 'secure_password',
adbc_snowflake.database = 'PRODUCTION',
adbc_snowflake.schema = 'SALES',
adbc_snowflake.warehouse = 'ANALYTICS_WH',
adbc_snowflake.table = 'ORDERS'
);
Step 2: Query the data
SELECT * FROM snowflake_orders LIMIT 10;
Step 3: Create materialized views
You can create materialized views based on the Snowflake data:
-- The columns order_date and total_amount are automatically inferred from the Snowflake table
CREATE MATERIALIZED VIEW daily_sales AS
SELECT
DATE_TRUNC('day', order_date) AS sale_date,
COUNT(*) AS order_count,
SUM(total_amount) AS total_revenue
FROM snowflake_orders
GROUP BY DATE_TRUNC('day', order_date);
Limitations and requirements
- Refresh mode: Only
FULL_RELOAD mode is supported. If refresh_interval_sec is set, RisingWave refreshes on that schedule. If it is omitted, refresh manually with REFRESH TABLE.
- Primary key: You must define a primary key when creating a Snowflake table in RisingWave.
- Schema inference: Column definitions are automatically inferred from the Snowflake table. Do not manually specify columns in the
CREATE TABLE statement.
- Feature flag: The Snowflake connector requires the
source-adbc_snowflake feature to be enabled at compile time. This is enabled by default in official RisingWave builds.
- Snapshot consistency: The connector uses Snowflake’s time travel feature when available to read data from a consistent point in time; when unavailable (for example, due to Snowflake retention settings), it falls back to reading current table data.
- Performance: For large tables, consider the refresh interval carefully to balance data freshness with query costs in Snowflake.
What’s next?