> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Ingest data from Kinesis

> Use the SQL statement below to connect RisingWave to Kinesis Data Streams.

When creating a source, you can choose to persist the data from the source in RisingWave by using `CREATE TABLE` instead of `CREATE SOURCE` and specifying the connection settings and data format.

## Syntax

```sql theme={null}
CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
[INCLUDE { header | key | offset | partition | timestamp | payload } [AS <column_name>]]
WITH (
   connector='kinesis',
   connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
   message = 'message',
   schema.location = 'location'
);
```

**schema\_definition**:

```sql theme={null}
(
   column_name data_type [ PRIMARY KEY ], ...
   [ PRIMARY KEY ( column_name, ... ) ]
)
```

<Note>
  For Avro and Protobuf data, do not specify `schema_definition` in the `CREATE SOURCE` or `CREATE TABLE` statement. The schema should be provided in a Web location in the option `schema.location` in the `ENCODE` section.
</Note>

RisingWave performs primary key constraint checks on tables with connector settings but not on regular sources. If you need the checks to be performed, please create a table with connector settings.

For a table with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.

### Connector parameters

| Field                               | Notes                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| :---------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| stream                              | **Required**. Name of the stream.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| aws.region                          | **Required**. AWS service region. For example, US East (N. Virginia).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| endpoint                            | **Optional**. URL of the entry point for the AWS Kinesis service.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| aws.credentials.access\_key\_id     | **Conditional**. This field indicates the access key ID of AWS.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| aws.credentials.secret\_access\_key | **Conditional**. This field indicates the secret access key of AWS.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| aws.credentials.session\_token      | **Optional**. The session token associated with the temporary security credentials. Using this field is not recommended as RisingWave contains long-running jobs and the token may expire. Creating a [new role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id%5Froles%5Fcommon-scenarios%5Faws-accounts.html) is preferred.                                                                                                                                                                                                                                                                                                               |
| aws.credentials.role.arn            | **Optional**. The Amazon Resource Name (ARN) of the role to assume.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| aws.credentials.role.external\_id   | **Optional**. The [external id](https://aws.amazon.com/blogs/security/how-to-use-external-id-when-granting-access-to-your-aws-resources/) used to authorize access to third-party resources.                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| scan.startup.mode                   | **Optional**. The startup mode for Kinesis consumer. Supported modes: `earliest` (corresponding to [starting position](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartingPosition.html) `TRIM_HORIZON`), `latest` (corresponding to [starting position](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartingPosition.html) `LATEST`), and `timestamp` (starts from a specific timestamp specified by `scan.startup.timestamp.millis`, corresponding to [starting position](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartingPosition.html) `AT_TIMESTAMP`). The default mode is `earliest`. |
| scan.startup.timestamp.millis       | **Optional**. This field specifies the timestamp, represented in i64, to start consuming from.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |

<Note>
  In RisingWave Cloud, AWS credentials (`aws.credentials.access_key_id`, `aws.credentials.secret_access_key`) are required and cannot be omitted. In self-hosted deployments, you may omit them to rely on the AWS SDK default credential chain (for example, EC2 instance profile or environment variables).
</Note>

### Other parameters

| Field          | Notes                                                                                                                                                                                                                                                          |
| :------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| *data\_format* | Supported formats: DEBEZIUM, UPSERT, PLAIN.                                                                                                                                                                                                                    |
| *data\_encode* | Supported encodes: JSON, AVRO, PROTOBUF, CSV, BYTES.                                                                                                                                                                                                           |
| *message*      | Message name of the main Message in schema definition. Required when data\_encode is PROTOBUF.                                                                                                                                                                 |
| *location*     | Web location of the schema file in `http://...`, `https://...`, or `S3://...` format. Required when `data_encode` is `AVRO` or `PROTOBUF`. <br />Examples:`https://<example_host>/risingwave/proto-simple-schema.proto` `s3://risingwave-demo/schema-location` |

## Example

Here is an example of connecting RisingWave to Kinesis Data Streams to read data from individual streams.

<Tabs>
  <Tab title="Avro">
    ```sql theme={null}
    CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name
    WITH (
       connector='kinesis',
       stream='kafka',
       aws.region='user_test_topic',
       endpoint='172.10.1.1:9090,172.10.1.2:9090',
       aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
       aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
       aws.credentials.role.external_id='demo_external_id',
       aws.credentials.access_key_id = 'your_access_key',
       aws.credentials.secret_access_key = 'your_secret_key'
    ) FORMAT PLAIN ENCODE AVRO (
        schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
    );
    ```
  </Tab>

  <Tab title="JSON">
    ```sql theme={null}
    CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name (
       column1 varchar,
       column2 integer,
    )
    WITH (
       connector='kinesis',
       stream='kafka',
       aws.region='user_test_topic',
       endpoint='172.10.1.1:9090,172.10.1.2:9090',
       aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
       aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
       aws.credentials.role.external_id='demo_external_id',
       aws.credentials.access_key_id = 'your_access_key',
       aws.credentials.secret_access_key = 'your_secret_key'
    ) FORMAT PLAIN ENCODE JSON;
    ```

    Use the `payload` keyword to ingest JSON data when you are unsure of the exact schema beforehand. Instead of defining specific column names and types at the very beginning, you can load all JSON data first and then prune and filter the data during runtime. Check the example below:

    ```sql theme={null}
    CREATE TABLE table_include_payload (v1 int, v2 varchar)
    INCLUDE payload
    WITH (
        connector = 'kinesis',
        topic = 'kinesis_1_partition_topic',
        properties.bootstrap.server = 'message_queue:29092',
        scan.startup.mode = 'earliest'
    ) FORMAT PLAIN ENCODE JSON;
    ```
  </Tab>

  <Tab title="Protobuf">
    ```sql theme={null}
    CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name
    WITH (
       connector='kinesis',
       stream='kafka',
       aws.region='user_test_topic',
       endpoint='172.10.1.1:9090,172.10.1.2:9090',
       aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
       aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
       aws.credentials.role.external_id='demo_external_id',
       aws.credentials.access_key_id = 'your_access_key',
       aws.credentials.secret_access_key = 'your_secret_key'
    ) FORMAT PLAIN ENCODE PROTOBUF (
        message = 'package.message_name',
        schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.proto'
    );
    ```
  </Tab>
</Tabs>
