Ingest data from S3 buckets
Use the SQL statement below to connect RisingWave to an Amazon S3 source.
Syntax
CREATE SOURCE [ IF NOT EXISTS ] source_name
schema_definition
WITH (
connector='s3',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);
info
For CSV data, specify the delimiter in the delimiter
option in ENCODE properties
.
schema_definition:
(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
Parameters
Field | Notes |
---|---|
s3.region_name | Required. The service region. |
s3.bucket_name | Required. The name of the bucket the data source is stored in. |
s3.credentials.access | Required. This field indicates the access key ID of AWS. |
s3.credentials.secret | Required. This field indicates the secret access key of AWS. |
match_pattern | Conditional. This field is used to find object keys in s3.bucket_name that match the given pattern. Standard Unix-style glob syntax is supported. |
s3.endpoint_url | Conditional. The host URL for an S3-compatible object storage server. This allows users to use a different server instead of the standard S3 server. |
note
Empty cells in CSV files will be parsed to NULL
.
Field | Notes |
---|---|
data_format | Supported data format: PLAIN . |
data_encode | Supported data encodes: CSV , JSON . |
without_header | Whether the first line is header. Accepted values: 'true' , 'false' . Default: 'true' . |
delimiter | How RisingWave splits contents. For JSON encode, the delimiter is \n . |
Example
Here are examples of connecting RisingWave to an S3 source to read data from individual streams.
- CSV
- JSON
CREATE TABLE s(
id int,
name varchar,
age int,
primary key(id)
)
WITH (
connector = 's3',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ','
);
CREATE TABLE s3(
id int,
name TEXT,
age int,
mark int,
)
WITH (
connector = 's3',
match_pattern = '%Ring%*.ndjson',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx',
s3.endpoint_url = 'https://s3.us-east-1.amazonaws.com'
) FORMAT PLAIN ENCODE JSON;