Skip to main content

Ingest data from Google Cloud Storage

Use the SQL statement below to connect RisingWave to a Google Cloud Storage source.

Syntax

CREATE SOURCE [ IF NOT EXISTS ] source_name 
schema_definition
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
WITH (
connector = 'gcs',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)

Connector parameters

FieldNotes
gcs.bucket_nameRequired. The name of the bucket the data source is stored in.
gcs.credentialOptional. The base64 encoded credential key. This key is obtained from the GCS service account key JSON file, and should be encoded with base64. To get this JSON file, refer to the guides of GCS documentation. To encode it with base64, run the following command: cat ~/Downloads/rwc-byoc-test-464bdd851bce.json | base64 -b 0 | pbcopy, and then paste the output as the value for this parameter. If this field is not specified, ADC (application default credentials) will be used.
match_patternConditional. This field is used to find object keys in the bucket that match the given pattern. Standard Unix-style glob syntax is supported.

Other parameters

FieldNotes
data_formatSupported data format: PLAIN.
data_encodeSupported data encodes: CSV, JSON.
without_headerWhether the first line is header. Accepted values: 'true', 'false'. Default: 'true'.
delimiterHow RisingWave splits contents. For JSON encode, the delimiter is \n.

Loading order of GCS files

The GCS connector does not guarantee the sequential reading of files.

For example, RisingWave reads file F1 to offset O1 and crashes. After RisingWave rebuilds the task queue, it is not guaranteed the next task is reading file F1.

Examples

Here is an example of connecting RisingWave to a GCS source to read data.

CREATE TABLE s(
id int,
name varchar,
age int,
primary key(id)
)
WITH (
connector = 'gcs',
gcs.bucket_name = 'example-bucket',
gcs.credential = 'xxxxx'
) FORMAT PLAIN ENCODE JSON (
without_header = 'true',
delimiter = '\n'
);

Help us make this doc better!