Sink data from RisingWave to Delta Lake
This guide describes how to sink data from RisingWave to Delta Lake. Delta Lake is an open-source storage framework designed to allow you to build a lakehouse architecture with another compute engine. For more information, see Delta Lake.
Prerequisites
-
Ensure you already have a Delta Lake table that you can sink data to. For additional guidance on creating a table and setting up Delta Lake, refer to this quickstart guide.
-
Ensure you have an upstream materialized view or source that you can sink data from.
Spark compatibility
Version 2.4 of Delta Lake is compatible with version 3.4 of Spark.
Versions 2.3, 2.2, and 2.1 of Delta Lake are compatible with version 3.3 of Spark.
Syntax
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='deltalake',
connector_parameter = 'value', ...
);
Parameters
Parameter Names | Description |
---|---|
type | Required. Currently, only append-only is supported. |
location | Required. The file path that the Delta Lake table is reading data from, as specified when creating the Delta Lake table.
|
s3.endpoint | Required. Endpoint of the S3.
|
s3.access.key | Required. Access key of the S3 compatible object store. |
s3.secret.key | Required. Secret key of the S3 compatible object store. |
gcs.service.account | Required for GCS. Specifies the service account JSON file as a string. |
Example
Here is a step-by-step example on how you can sink data from RisingWave to Delta Lake.
Create a Delta Lake table
In a spark-sql
shell, create a Delta table. For more information, see the Delta Lake quickstart.
For example, the following spark-sql
command creates a Delta Lake table in AWS S3. The table is in an S3 bucket named my-delta-lake-bucket
in region ap-southeast-1
and under the path path/to/table
. Before running the following command to create a Delta Lake table, create an empty directory path/to/table
. The full URL of the table location is s3://my-delta-lake-bucket/path/to/table
.
Note that only S3-compatible object store is supported, such as AWS S3 or MinIO.
spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
--conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
--conf 'spark.hadoop.fs.s3a.access.key=${ACCESS_KEY}' \
--conf 'spark.hadoop.fs.s3a.secret.key=${SECRET_KEY}' \
--conf 'spark.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--e "create table delta.\`s3a://my-delta-lake-bucket/path/to/table\`(id int, name string) using delta"
Create an upstream materialized view or source
The following query creates a source using the built-in load generator, which creates mock data. For more details, see CREATE SOURCE and Generate test data. You can transform the data using additional SQL queries if needed.
CREATE SOURCE s1_source (id int, name varchar)
WITH (
connector = 'datagen',
fields.id.kind = 'sequence',
fields.id.start = '1',
fields.id.end = '10000',
fields.name.kind = 'random',
fields.name.length = '10',
datagen.rows.per.second = '200'
) FORMAT PLAIN ENCODE JSON;
You can also choose to create an upsert table, which supports in-place updates. For more details on creating a table, see CREATE TABLE.
CREATE TABLE s1_table (id int, name varchar)
WITH (
connector = 'datagen',
fields.id.kind = 'sequence',
fields.id.start = '1',
fields.id.end = '10000',
fields.name.kind = 'random',
fields.name.length = '10',
datagen.rows.per.second = '200'
) FORMAT UPSERT ENCODE JSON;
Create a sink
Append-only sink from append-only source
If you have an append-only
source and want to create an append-only
sink, set type = append-only
in the CREATE SINK
query.
CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'deltalake',
type = 'append-only',
location = 's3a://my-delta-lake-bucket/path/to/table',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}'
);
Append-only sink from upsert table
If you have a table or source that is not of type append-only
and want to create an append-only
sink, set type = append-only
and set force_append_only = true
in the CREATE SINK
query.
CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'deltalake',
type = 'append-only',
force_append_only = 'true',
location = 's3a://my-delta-lake-bucket/path/to/table',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}'
);
Query data in Delta Lake
To ensure that data is flushed to the sink, use the FLUSH
command in RisingWave.
The following query checks the total number of records sinked to the Delta Lake table using spark-sql
.
spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
--conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
--conf 'spark.hadoop.fs.s3a.access.key=${ACCESS_KEY}' \
--conf 'spark.hadoop.fs.s3a.secret.key=${SECRET_KEY}' \
--conf 'spark.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--e "select count(*) from delta.\`s3a://my-delta-lake-bucket/path/to/table\`"