Ingest data from Amazon MSK
Amazon Managed Streaming for Apache Kafka (MSK) is a fully managed service that simplifies the setup, scaling, and management of Apache Kafka clusters, a popular open-source distributed streaming platform.
Kafka is designed to handle real-time data feeds and follows the publisher-subscriber (pub-sub) model. Kafka’s ability to handle high-volume real-time data makes it crucial for data pipelines, analytics, and event-driven architectures.
To ingest data from Amazon MSK into RisingWave, you need an operational Amazon MSK cluster and a Kafka topic established. Once set, you’ll leverage the Kafka connector in RisingWave to consume data from your MSK topic.
This guide will detail the ingesting streaming data from Amazon MSK into RisingWave.
Set up Amazon MSK
To learn about how to set up an Amazon MSK account and create a cluster, see Getting started using Amazon MSK. For this demo, we will assume the selection of Quick create for the Cluster creation method and Provisioned for the Cluster type. The cluster creation can take about 15 minutes.
While creating your cluster, note down the following information regarding the cluster you want to connect to.
- Get the VPC value from the All cluster settings.
- Get the Security groups associated with VPC from the All cluster settings.
- Get the ARN value from the Cluster summary.
To customize the IAM policy, see IAM access control.
Set up EC2 on AWS
To learn how to create an EC2 client machine and add the security group of the client to the inbound rules of the cluster’s security group from the VPC console, see Create a client machine.
Configure MSK Kafka
Enable SASL
- Access the Amazon MSK console and select the MSK cluster.
- Click on the Properties tab, and click Edit in the Security settings section.
- Select SASL/SCRAM authentication and click Save changes.
For more information regarding SASL settings, see Sign-in credentials authentication with AWS Secrets Manager.
Create a symmetric key
- Access the AWS Key Management Service (AWS KMS) console.
- Click Create Key, select Symmetric, and click Next.
- Give the key an Alias and click Next.
- Under Administrative permissions, select AWSServiceRoleForKafka and click Next.
- Under Key usage permissions, again select AWSServiceRoleForKafka and click Next.
- Lastly, review the details and click Finish.
For more information, see Creating symmetric encryption KMS keys.
Store a new Secret
- Access the AWS Secrets Manager console.
- Click Store a new secret.
- Under Secret type, select Other type of secret.
- Under Key/value pairs, click on Plaintext, paste the following in the space below, and replace
<your-username>
and<your-password>
with the username and password you want to set for the cluster.
- Under Encryption key, select the symmetric key alias you previously created.
- On the next page, enter a Secret name that starts with
AmazonMSK_
. - After creating the secret, record the Secret ARN (Amazon Resource Name) value.
For more information, see Sign-in credentials authentication with AWS Secrets Manager.
Link the Secret with the MSK cluster
- Access the Amazon MSK console and select the MSK cluster.
- Click the Actions tab and select Edit security settings.
- Select SASL/SCRAM authentication and click Save changes.
- Back on the main page, click the Properties tab, and in the Security settings section, under SASL/SCRAM authentication, click Associate secrets.
- Paste the Secret ARN value you recorded in the previous step and click Associate secrets.
Use SSH to log into the EC2 machine
To find your specific command values:
- Access the EC2 console and select the instance you created.
- Click Connect, select SSH client, and copy the command example provided.
Install AWS CLI and Java
Download Kafka client
Configure AWS IAM credentials on EC2
- Run the following command to configure AWS credentials and default settings.
- Place the
users_jaas.conf
with the following contents in/home/ubuntu
.
- Run the following command to define the specific security settings Kafka should use.
- Use the following command to copy the JDK key store file from your JVM
cacerts
folder into thekafka.client.truststore.jks
copy.
- Create
client_sasl.properties
at/home/ubuntu
with the following contents.
Create a topic using the broker address with SASL
- Access the Amazon MSK console and select the cluster.
- Click View client information and copy the URL under Private endpoint for SASL/SCRAM. This will be your
<broker-url>
from now on. - Run the following command to create a topic.
Optional: The following command will list the topics.
- Insert test data.
Once you run the kafka-console-producer
command, you will be prompted to enter messages into the console. Each message should be entered on a new line; you can enter as many messages as you like.
After entering messages, you can close the console window or press Ctrl + C to exit the producer.
Consume data from Amazon MSK in RisingWave
Install and launch RisingWave
See Quick start for options on how you can run RisingWave.
Connect the cluster
Create a source in RisingWave
To learn about the specific syntax used to consume data from a Kafka topic, see Ingest data from Kafka.
For example, the following query creates a table that consumes data from an MSK topic connected to Kafka.
Then, you can count the records for accuracy.
Access MSK using IAM
Create cluster and set IAM role
To create a cluster and set up an IAM role for the cluster, see Getting started using Amazon MSK.
RisingWave requires the following permissions to access MSK:
kafka-cluster:Connect
kafka-cluster:DescribeTopic
kafka-cluster:DescribeGroup
kafka-cluster:AlterGroup
kafka-cluster:ReadData
kafka-cluster:WriteData
Access MSK in RisingWave
To access MSK using IAM, you need to use the AWS_MSK_IAM
SASL mechanism. You also need to specify the following parameters.
Parameter | Notes |
---|---|
aws.region | Required. AWS service region. For example, US East (N. Virginia). |
aws.endpoint | Optional. URL of the entry point for the AWS Kinesis service. |
aws.credentials.access_key_id | Required. This field indicates the access key ID of AWS. |
aws.credentials.secret_access_key | Required. This field indicates the secret access key of AWS. |
aws.credentials.session_token | Optional. The session token associated with the temporary security credentials. Using this field is not recommended as RisingWave contains long-running jobs and the token may expire. Creating a new role is preferred. |
aws.credentials.role.arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
aws.credentials.role.external_id | Optional. The external id used to authorize access to third-party resources. |
Here is an example of creating a sink authenticated with AWS_MSK_IAM
on AWS.
Was this page helpful?