Ingest data from S3 buckets
Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports both CSV and ndjson file formats.
Syntax
CREATE SOURCE [ IF NOT EXISTS ] source_name
schema_definition
WITH (
connector={ 's3' | 's3_v2' },
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);
For CSV data, specify the delimiter in the delimiter
option in ENCODE properties
.
schema_definition:
(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
Parameters
Field | Notes |
---|---|
connector | Required. Select between the s3 and s3_v2 (recommended) connector. Learn more about s3_v2 . |
s3.region_name | Required. The service region. |
s3.bucket_name | Required. The name of the bucket the data source is stored in. |
s3.credentials.access | Required. This field indicates the access key ID of AWS. |
s3.credentials.secret | Required. This field indicates the secret access key of AWS. |
match_pattern | Conditional. This field is used to find object keys in s3.bucket_name that match the given pattern. Standard Unix-style glob syntax is supported. |
s3.endpoint_url | Conditional. The host URL for an S3-compatible object storage server. This allows users to use a different server instead of the standard S3 server. |
Empty cells in CSV files will be parsed to NULL
.
Field | Notes |
---|---|
data_format | Supported data format: PLAIN . |
data_encode | Supported data encodes: CSV , JSON . |
without_header | Whether the first line is header. Accepted values: 'true' , 'false' . Default: 'true' . |
delimiter | How RisingWave splits contents. For JSON encode, the delimiter is \n . |
s3_v2
connector
The s3_v2
connector is currently in Beta. Please contact us if you encounter any issues or have feedback.
The s3
connector treats files as splits, resulting in poor scalability and potential timeouts when dealing with a large number of files.
The s3_v2
connector is designed to address the scalability and performance limitations of the s3
connector by implementing a more efficient listing and fetching mechanism. If you want to explore the technical details of this new approach, refer to the design document.
Example
Here are examples of connecting RisingWave to an S3 source to read data from individual streams.
- CSV
- JSON
CREATE TABLE s(
id int,
name varchar,
age int,
primary key(id)
)
WITH (
connector = 's3_v2',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ','
);
CREATE TABLE s3(
id int,
name TEXT,
age int,
mark int,
)
WITH (
connector = 's3_v2',
match_pattern = '%Ring%*.ndjson',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx',
s3.endpoint_url = 'https://s3.us-east-1.amazonaws.com'
) FORMAT PLAIN ENCODE JSON;
Important considerations
Object filtering in S3 buckets
RisingWave has a prefix argument designed for filtering objects in the S3 bucket. It relies on Apache Opendal whose prefix filter implementation is expected to be released soon.
Source file name as column
A feature to create a column with the source file name is currently under development. You can track the progress here.
Handling new files in the bucket
RisingWave automatically ingests new files added to the bucket. However, it does not detect updates to a file if a file is deleted and a new file with the same name is added simultaneously. Additionally, RisingWave will ignore file deletions.
Reading data from the source
You need to create a materialized view from the source or create a table with the S3 connector to read the data. Here are some examples:
-- Create a materialized view from the source
CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... );
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;
-- Create a table with the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3_v2', ... );
Handling unexpected file types or poorly formatted files
RisingWave will attempt to interpret and parse files, regardless of their type, as CSV or ndjson, based on the specified rules. Warnings will be reported for parts of the file that cannot be parsed, but the source part will not fail. Poorly formatted parts of a file will be discarded.