Skip to main content

Ingest data from Kinesis

Use the SQL statement below to connect RisingWave to Kinesis Data Streams.

When creating a source, you can choose to persist the data from the source in RisingWave by using CREATE TABLE instead of CREATE SOURCE and specifying the connection settings and data format.

Syntax

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
WITH (
connector='kinesis',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
info

For Avro and Protobuf data, do not specify schema_definition in the CREATE SOURCE or CREATE TABLE statement. The schema should be provided in a Web location in the option schema.location in the ENCODE section.

note

RisingWave performs primary key constraint checks on tables with connector settings but not on regular sources. If you need the checks to be performed, please create a table with connector settings.

For a table with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.

Connector parameters

FieldNotes
streamRequired. Name of the stream.
aws.regionRequired. AWS service region. For example, US East (N. Virginia).
endpointOptional. URL of the entry point for the AWS Kinesis service.
aws.credentials.access_key_idRequired. This field indicates the access key ID of AWS.
aws.credentials.secret_access_keyRequired. This field indicates the secret access key of AWS.
aws.credentials.session_tokenOptional. The session token associated with the temporary security credentials.
aws.credentials.role.arnOptional. The Amazon Resource Name (ARN) of the role to assume.
aws.credentials.role.external_idOptional. The external id used to authorize access to third-party resources.
scan.startup.modeOptional. The startup mode for Kinesis consumer. Supported modes: earliest (starts from the earliest offset), latest (starts from the latest offset), and timestamp (starts from a specific timestamp, specified by scan.startup.timestamp.millis). The default mode is earliest.
scan.startup.timestamp.millisOptional. This field specifies the timestamp, represented in i64, to start consuming from.

Other parameters

FieldNotes
data_formatSupported formats: DEBEZIUM, MAXWELL, CANAL.
data_encodeSupported encodes: JSON, AVRO, PROTOBUF, CSV.
messageMessage name of the main Message in schema definition. Required when data_encode is PROTOBUF.
locationWeb location of the schema file in http://..., https://..., or S3://... format. Required when data_encode is AVRO or PROTOBUF. Examples:
https://<example_host>/risingwave/proto-simple-schema.proto
s3://risingwave-demo/schema-location

Example

Here is an example of connecting RisingWave to Kinesis Data Streams to read data from individual streams.

CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
) FORMAT PLAIN ENCODE AVRO (
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);

Help us make this doc better!