Skip to main content

Ingest data from S3 buckets

Use the SQL statement below to connect RisingWave to an Amazon S3 source.

Syntax

CREATE SOURCE [ IF NOT EXISTS ] source_name
schema_definition
WITH (
connector='s3',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);
info

For CSV data, specify the delimiter in the delimiter option in ENCODE properties.

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)

Parameters

FieldNotes
s3.region_nameRequired. The service region.
s3.bucket_nameRequired. The name of the bucket the data source is stored in.
s3.credentials.accessConditional. This field indicates the access key ID of AWS. It must be used with s3.credentials.secret. If not specified, RisingWave will automatically try to use ~/.aws/credentials.
s3.credentials.secretConditional. This field indicates the secret access key of AWS. It must be used with s3.credentials.access. If not specified, RisingWave will automatically try to use ~/.aws/credentials.
match_patternConditional. This field is used to find object keys in s3.bucket_name that match the given pattern. Standard Unix-style glob syntax is supported.
s3.endpoint_urlConditional. The host URL for an S3-compatible object storage server. This allows users to use a different server instead of the standard S3 server.
note

Empty cells in CSV files will be parsed to NULL.

FieldNotes
data_formatSupported data format: PLAIN.
data_encodeSupported data encodes: CSV, JSON.
without_headerWhether the first line is header. Accepted values: 'true', 'false'. Default: 'true'.
delimiterHow RisingWave splits contents. For JSON encode, the delimiter is \n.

Example

Here are examples of connecting RisingWave to an S3 source to read data from individual streams.

CREATE TABLE s(
id int,
name varchar,
age int,
primary key(id)
)
WITH (
connector = 's3',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ','
);

Help us make this doc better!