Skip to main content

Sink to AWS Kinesis

This topic described how to sink data from RisingWave to AWS Kinesis Data Streams.

Beta Feature

The AWS Kinesis sink connector in RisingWave is currently in Beta. Please contact us if you encounter any issues or have feedback.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='kinesis',
connector_parameter = 'value', ...
);

Basic parameters

FieldNotes
streamRequired. Name of the stream.
aws.regionRequired. AWS service region. For example, US East (N. Virginia).
endpointOptional. URL of the entry point for the AWS Kinesis service.
aws.credentials.access_key_idConditional. This field indicates the access key ID of AWS. It must appear in pairs with aws.credentials.secret_access_key.
aws.credentials.secret_access_keyConditional. This field indicates the secret access key of AWS. It must appear in pairs with aws.credentials.access_key_id.
aws.credentials.session_tokenOptional. The session token associated with the temporary security credentials.
aws.credentials.role.arnOptional. The Amazon Resource Name (ARN) of the role to assume.
aws.credentials.role.external_idOptional. The external id used to authorize access to third-party resources.

Sink parameters

FieldNotes
typeData format. Allowed formats:
  • append-only: Output data with insert operations.
  • debezium: Output change data capture (CDC) log in Debezium format.
  • upsert: Output data as a changelog stream. primary_key must be specified in this case.
To learn about when to define the primary key if creating an upsert sink, see the Overview.
force_append_onlyIf true, forces the sink to be append-only, even if it cannot be.
primary_keyThe primary keys of the sink. Use ',' to delimit the primary key columns. If the external sink has its own primary key, this field should not be specified.

Examples

CREATE SINK s1 FROM t WITH (
connector = 'kinesis',
stream = 'kinesis-sink-demo',
aws.region = 'us-east-1',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key',
type = 'debezium');

Help us make this doc better!