Syntax
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector = 'kinesis',
   connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
    key = 'value'
) ]
[KEY ENCODE key_encode [(...)]]
;
Basic parameters
| Field | Notes | 
|---|
| stream | Required. Name of the stream. | 
| aws.region | Required. AWS service region. For example, US East (N. Virginia). | 
| endpoint | Optional. URL of the entry point for the AWS Kinesis service. | 
| aws.credentials.access_key_id | Required. This field indicates the access key ID of AWS. | 
| aws.credentials.secret_access_key | Required. This field indicates the secret access key of AWS. | 
| aws.credentials.session_token | Optional. The session token associated with the temporary security credentials. | 
| aws.credentials.role.arn | Optional. The Amazon Resource Name (ARN) of the role to assume. | 
| aws.credentials.role.external_id | Optional. The external id used to authorize access to third-party resources. | 
| primary_key | Required. The primary keys of the sink. Use ,to delimit the primary key columns. | 
In the Kinesis sink, we use PutRecords API to send multiple records in batches to achieve higher throughput. Due to the limitations of Kinesis, records might be out of order when using this API. Nevertheless, the current implementation of the Kinesis sink guarantees at-least-once delivery and eventual consistency.
These options should be set in FORMAT data_format ENCODE data_encode (key = 'value'), instead of the WITH clause
| Field | Notes | 
|---|
| data_format | Data format. Allowed formats: To learn about when to define the primary key if creating an UPSERT sink, see the Overview. PLAIN: Output data with insert operations. DEBEZIUM: Output change data capture (CDC) log in Debezium format. UPSERT: Output data as a changelog stream.primary_keymust be specified in this case.
 | 
| data_encode | Data encode. Supported encode: JSON. | 
| force_append_only | If true, forces the sink to bePLAIN(also known asappend-only), even if it cannot be. | 
| timestamptz.handling.mode | Controls the timestamptz output format. This parameter specifically applies to append-only or upsert sinks using JSON encoding. If omitted, the output format of timestamptz is 2023-11-11T18:30:09.453000Zwhich includes the UTC suffixZ.When utc_without_suffixis specified, the format is changed to2023-11-11 18:30:09.453000.
 | 
| key_encode | Optional. When specified, the key encode can only be TEXT, and the primary key should be one and only one of the following types: varchar,bool,smallint,int, andbigint; When absent, both key and value will use the same setting ofENCODE data_encode ( ... ). | 
Examples
CREATE SINK s1 FROM t WITH (
 connector = 'kinesis',
 stream = 'kinesis-sink-demo',
 aws.region = 'us-east-1',
 aws.credentials.access_key_id = 'your_access_key',
 aws.credentials.secret_access_key = 'your_secret_key'
)
FORMAT DEBEZIUM ENCODE JSON;