Syntax

CREATE SOURCE [ IF NOT EXISTS ] source_name
schema_definition
[INCLUDE { file | offset | payload } [AS <column_name>]]
WITH (
   connector = 'gcs',
   connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode (
   without_header = 'true' | 'false',
   delimiter = 'delimiter'
);

schema_definition:

(
   column_name data_type [ PRIMARY KEY ], ...
   [ PRIMARY KEY ( column_name, ... ) ]
)

Connector parameters

FieldNotes
gcs.bucket_nameRequired. The name of the bucket the data source is stored in.
gcs.credentialRequired. Base64-encoded credential key obtained from the GCS service account key JSON file. To get this JSON file, refer to the guides of GCS documentation.
  • To encode it in base64, run the following command: cat ~/Downloads/rwc-byoc-test-464bdd851bce.json | base64 -b 0 | pbcopy, and then paste the output as the value for this parameter.
  • If this field is not specified, ADC (application default credentials) will be used.
gcs.service_accountOptional. The service account of the target GCS source. If gcs.credential or ADC is not specified, the credentials will be derived from the service account.
match_patternConditional. This field is used to find object keys in the bucket that match the given pattern. Standard Unix-style glob syntax is supported.
compression_formatOptional. This field specifies the compression format of the file being read. You can define compression_format in the CREATE TABLE statement. When set to gzip or gz, the file reader reads all files with the .gz suffix. When set to None or not defined, the file reader will automatically read and decompress .gz and .gzip files.
refresh.interval.secOptional. Configure the time interval between operations of listing files. It determines the delay in discovering new files, with a default value of 60 seconds.

Other parameters

FieldNotes
data_formatSupported data format: PLAIN.
data_encodeSupported data encodes: CSV, JSON, PARQUET.
without_headerThis field is only for CSV encode, and it indicates whether the first line is header. Accepted values: ‘true’, ‘false’. Default: ‘true’.
delimiterHow RisingWave splits contents. For JSON encode, the delimiter is \n; for CSV encode, the delimiter can be one of ,, ;, E'\t'.

Additional columns

FieldNotes
fileOptional. The column contains the file name where current record comes from.
offsetOptional. The column contains the corresponding bytes offset (record offset for parquet files) where current message begins.

Loading order of GCS files

The GCS connector does not guarantee the sequential reading of files.

For example, RisingWave reads file F1 to offset O1 and crashes. After RisingWave rebuilds the task queue, it is not guaranteed the next task is reading file F1.

Examples

Here are examples of connecting RisingWave to an GCS source to read data from individual streams.

CREATE TABLE t(
    id int,
    name varchar,
    age int,
    primary key(id)
)
INCLUDE file as file_name
INCLUDE offset -- default column name is `_rw_gcs_offset`
WITH (
    connector = 'gcs',
    gcs.bucket_name = 'example-bucket',
    gcs.credential = 'xxxxx'
) FORMAT PLAIN ENCODE JSON (
    without_header = 'true',
    delimiter = ',' -- set delimiter = E'\t' for tab-separated files
);