Skip to main content

Ingest data from S3 buckets

Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports both CSV and ndjson file formats.

The S3 connector does not guarantee the sequential reading of files or complete file reading.

Syntax

CREATE SOURCE [ IF NOT EXISTS ] source_name 
schema_definition
[INCLUDE { file | offset } [AS <column_name>]]
WITH (
connector='s3',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);
info

For CSV data, specify the delimiter in the delimiter option in ENCODE properties.

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)

Parameters

FieldNotes
connectorRequired. Support the s3 connector only.
s3.region_nameRequired. The service region.
s3.bucket_nameRequired. The name of the bucket the data source is stored in.
s3.credentials.accessRequired. This field indicates the access key ID of AWS.
s3.credentials.secretRequired. This field indicates the secret access key of AWS.
s3.endpoint_urlConditional. The host URL for an S3-compatible object storage server. This allows users to use a different server instead of the standard S3 server.
compression_formatOptional. This field specifies the compression format of the file being read. You can define compression_format in the CREATE TABLE statement. When set to gzip or gz, the file reader reads all files with the .gz suffix. When set to None or not defined, the file reader will automatically read and decompress .gz and .gzip files.
match_patternConditional. This field is used to find object keys in s3.bucket_name that match the given pattern. Standard Unix-style glob syntax is supported.
s3.assume_roleOptional. Specifies the ARN of an IAM role to assume when accessing S3. It allows temporary, secure access to S3 resources without sharing long-term credentials.
note

Empty cells in CSV files will be parsed to NULL.

FieldNotes
data_formatSupported data format: PLAIN.
data_encodeSupported data encodes: CSV, JSON, PARQUET.
without_headerThis field is only for CSV encode, and it indicates whether the first line is header. Accepted values: 'true', 'false'. Default: 'true'.
delimiterHow RisingWave splits contents. For JSON encode, the delimiter is \n; for CSV encode, the delimiter can be one of ,, ;, E'\t'.

Additional columns

FieldNotes
fileOptional. The column contains the file name where current record comes from.
offsetOptional. The column contains the corresponding bytes offset (record offset for parquet files) where current message begins

Examples

Here are examples of connecting RisingWave to an S3 source to read data from individual streams.

CREATE TABLE s(
id int,
name varchar,
age int
)
WITH (
connector = 's3',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ',' -- set delimiter = E'\t' for tab-separated files
);

Important considerations

Object filtering in S3 buckets

RisingWave has a prefix argument designed for filtering objects in the S3 bucket. It relies on Apache Opendal whose prefix filter implementation is expected to be released soon.

Handle new files in the bucket

RisingWave automatically ingests new files added to the bucket. However, it does not detect updates to a file if a file is deleted and a new file with the same name is added simultaneously. Additionally, RisingWave will ignore file deletions.

Read data from the source

You need to create a materialized view from the source or create a table with the S3 connector to read the data. Here are some examples:

-- Create a materialized view from the source
CREATE SOURCE s3_source WITH ( connector = 's3', ... );
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- Create a table with the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... );

To view the progress of the source, you can also read the source directly

-- Create a materialized view from the source
CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... );
SELECT count(*) from s3_source;

Read Parquet files from S3

You can use the table function file_scan() to read Parquet files from S3, either a single file or a directory of Parquet files.

Function signature
file_scan(file_format, storage_type, s3_region, s3_access_key, s3_secret_key, file_location_or_directory)
note

When reading a directory of Parquet files, the schema will be based on the first Parquet file listed. Please ensure that all Parquet files in the directory have the same schema.

For example, assume you have a Parquet file named sales_data.parquet that stores a company's sales data, containing the following fields:

  • product_id: Product ID

  • sales_date: Sales date

  • quantity: Sales quantity

  • revenue: Sales revenue

You can use the following SQL statement to read this Parquet file:

Read a single Parquet file
SELECT
product_id,
sales_date,
quantity,
revenue
FROM file_scan(
'parquet',
's3',
'ap-southeast-2',
'xxxxxxxxxx',
'yyyyyyyy',
's3://your-bucket/path/to/sales_data.parquet'
);

----RESULT
product_id | sales_date | quantity | revenue
------------+-------------+----------+----------
12 | 2023-04-01 | 50 | 1000.00
12 | 2023-04-02 | 30 | 600.00
15 | 2023-04-01 | 20 | 400.00
(3 rows)

If you have several such Parquet files, you can also read by their file directory:

Read a directory of Parquet files
SELECT
product_id,
sales_date,
quantity,
revenue
FROM file_scan(
'parquet',
's3',
'ap-southeast-2',
'xxxxxxxxxx',
'yyyyyyyy',
's3://your-bucket/path/to/sales_data_file_directory/'
);

----RESULT
product_id | sales_date | quantity | revenue
------------+-------------+----------+----------
12 | 2023-04-01 | 50 | 1000.00
12 | 2023-04-02 | 30 | 600.00
15 | 2023-04-01 | 20 | 400.00
15 | 2023-04-03 | 40 | 800.00
18 | 2023-04-02 | 25 | 500.00
18 | 2023-04-04 | 35 | 700.00
(6 rows)

Handle unexpected file types or poorly formatted files

RisingWave will attempt to interpret and parse files, regardless of their type, as CSV or ndjson, based on the specified rules. Warnings will be reported for parts of the file that cannot be parsed, but the source part will not fail. Poorly formatted parts of a file will be discarded.

Help us make this doc better!