Skip to main content

Ingest data from Azure Blob

Azure Blob Storage, provided by Microsoft Azure, allows you to store and manage large amounts of unstructured data.

Use the SQL statement below to connect RisingWave to Azure Blob Storage using Azblob connector. Note that the Azblob connector does not guarantee the sequential reading of files or complete file reading.

Syntax

CREATE SOURCE [ IF NOT EXISTS ] source_name 
schema_definition
[INCLUDE { file | offset } [AS <column_name>]]
WITH (
connector = 'azblob',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)

Connector parameters

FieldNotes
azblob.container_nameRequired. The name of the container the data source is stored in.
azblob.credentials.account_nameOptional. The name of the Azure Blob Storage account.
azblob.credentials.account_keyOptional. The account key for the Azure Blob Storage account.
azblob.endpoint_urlRequired. The URL of the Azure Blob Storage service endpoint.
match_patternConditional. Set to find object keys in azblob.container_name that match the given pattern. Standard Unix-style glob syntax is supported.
compression_formatOptional. Specifies the compression format of the file being read. When set to gzip or gz, the file reader reads all files with the .gz suffix; when set to None or not defined, the file reader will automatically read and decompress .gz and .gzip files.

Other parameters

FieldNotes
data_formatSupported data format: PLAIN.
data_encodeSupported data encodes: CSV, JSON, PARQUET.
without_headerThis field is only for CSV encode, and it indicates whether the first line is header. Accepted values: 'true', 'false'. Default: 'true'.
delimiterHow RisingWave splits contents. For JSON encode, the delimiter is \n; for CSV encode, the delimiter can be one of ,, ;, E'\t'.

Additional columns

FieldNotes
fileOptional. The column contains the file name where current record comes from.
offsetOptional. The column contains the corresponding bytes offset (record offset for parquet files) where current message begins

Examples

Here are examples of connecting RisingWave to the Azblob source to read data from individual streams.

CREATE SOURCE s(
id int,
name varchar,
age int
)
WITH (
connector = 'azblob',
azblob.container_name = 'xxx',
azblob.credentials.account_name = 'xxx',
azblob.credentials.account_key = 'xxx',
azblob.endpoint_url = 'xxx',
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ',' -- set delimiter = E'\t' for tab-separated files
);

Help us make this doc better!