Prerequisites

  • Ensure you already have a Delta Lake table that you can sink data to. For additional guidance on creating a table and setting up Delta Lake, refer to this quickstart guide.
  • Ensure you have an upstream materialized view or source that you can sink data from.

Spark compatibility

Version 2.4 of Delta Lake is compatible with version 3.4 of Spark.

Versions 2.3, 2.2, and 2.1 of Delta Lake are compatible with version 3.3 of Spark.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='deltalake',
   connector_parameter = 'value', ...
);

Parameters

Parameter NamesDescription
typeRequired. Currently, only append-only is supported.
locationRequired. The file path that the Delta Lake table is reading data from, as specified when creating the Delta Lake table. For AWS, start with s3:// or s3a://;For GCS, start with gs://; For local files, start with file://.
s3.endpointRequired. Endpoint of the S3. For MinIO object store backend, it should be http://MINIOHOST:{MINIO_HOST}:. For AWS S3, refer to S3.
s3.access.keyRequired. Access key of the S3 compatible object store.
s3.secret.keyRequired. Secret key of the S3 compatible object store.
gcs.service.accountRequired for GCS. Specifies the service account JSON file as a string.
commit_checkpoint_intervalOptional. Commit every N checkpoints (N > 0). Default value is 10. The behavior of this field also depends on the sink_decouple setting:If sink_decouple is true (the default), the default value of commit_checkpoint_interval is 10. If sink_decouple is set to false, the default value of commit_checkpoint_interval is 1. If sink_decouple is set to false and commit_checkpoint_interval is set to larger than 1, an error will occur.

Example

Here is a step-by-step example on how you can sink data from RisingWave to Delta Lake.

Create a Delta Lake table

In a spark-sql shell, create a Delta table. For more information, see the Delta Lake quickstart.

For example, the following spark-sql command creates a Delta Lake table in AWS S3. The table is in an S3 bucket named my-delta-lake-bucket in region ap-southeast-1 and under the path path/to/table. Before running the following command to create a Delta Lake table, create an empty directory path/to/table. The full URL of the table location is s3://my-delta-lake-bucket/path/to/table.

Note that only S3-compatible object store is supported, such as AWS S3 or MinIO.

spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
    --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
    --conf 'spark.hadoop.fs.s3a.access.key=${ACCESS_KEY}' \
    --conf 'spark.hadoop.fs.s3a.secret.key=${SECRET_KEY}' \
    --conf 'spark.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com' \
    --conf 'spark.hadoop.fs.s3a.path.style.access=true' \
    --e "create table delta.\`s3a://my-delta-lake-bucket/path/to/table\`(id int, name string) using delta"

Create an upstream materialized view or source

The following query creates a source using the built-in load generator, which creates mock data. For more details, see CREATE SOURCE and Generate test data. You can transform the data using additional SQL queries if needed.

CREATE SOURCE s1_source (id int, name varchar)
WITH (
     connector = 'datagen',
     fields.id.kind = 'sequence',
     fields.id.start = '1',
     fields.id.end = '10000',
     fields.name.kind = 'random',
     fields.name.length = '10',
     datagen.rows.per.second = '200'
 ) FORMAT PLAIN ENCODE JSON;

You can also choose to create an upsert table, which supports in-place updates. For more details on creating a table, see CREATE TABLE.

CREATE TABLE s1_table (id int, name varchar)
WITH (
    connector = 'datagen',
    fields.id.kind = 'sequence',
    fields.id.start = '1',
    fields.id.end = '10000',
    fields.name.kind = 'random',
    fields.name.length = '10',
    datagen.rows.per.second = '200'
) FORMAT UPSERT ENCODE JSON;

Create a sink

Append-only sink from append-only source

If you have an append-only source and want to create an append-only sink, set type = append-only in the CREATE SINK query.

CREATE SINK s1_sink FROM t1_table
WITH (
    connector = 'deltalake',
    type = 'append-only',
    location = 's3a://my-delta-lake-bucket/path/to/table',
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY}'
);

Append-only sink from upsert table

If you have a table or source that is not of type append-only and want to create an append-only sink, set type = append-only and set force_append_only = true in the CREATE SINK query.

CREATE SINK s1_sink FROM s1_table
WITH (
    connector = 'deltalake',
    type = 'append-only',
    force_append_only = 'true',
    location = 's3a://my-delta-lake-bucket/path/to/table',
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
    s3.access.key = '${ACCESS_KEY}',
    s3.secret.key = '${SECRET_KEY}'
);

Query data in Delta Lake

To ensure that data is flushed to the sink, use the FLUSH command in RisingWave.

The following query checks the total number of records sinked to the Delta Lake table using spark-sql.

spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
    --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
    --conf 'spark.hadoop.fs.s3a.access.key=${ACCESS_KEY}' \
    --conf 'spark.hadoop.fs.s3a.secret.key=${SECRET_KEY}' \
    --conf 'spark.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com' \
    --conf 'spark.hadoop.fs.s3a.path.style.access=true' \
    --e "select count(*) from delta.\`s3a://my-delta-lake-bucket/path/to/table\`"