CREATE CONNECTION
Use the CREATE CONNECTION
command to create an AWS PrivateLink connection for a Kafka source connector. This is necessary in order to be able to consume messages from a Kafka service located in a different VPC from the RisingWave cluster in the cloud.
Syntax
CREATE CONNECTION [ IF NOT EXISTS ] connection_name
WITH (
connection_parameter = 'value'
);
Parameters
Parameter or clause | Description |
---|---|
connection_name | The name of the connection to be created. |
type | The type of connection. |
provider | The provider of the connection. |
service.name | The service name of the endpoint service. |
Example
The statement below creates an AWS PrivateLink connection.
CREATE CONNECTION connection_name with (
type = 'privatelink',
provider = 'aws',
service.name = 'com.amazonaws.xyz.us-east-1.abc-xyz-0000'
);
Create an AWS PrivateLink connection
If you are using a cloud-hosted source or sink, such as AWS MSK, there might be connectivity issues when your service is located in a different VPC from where you have deployed RisingWave. To establish a secure, direct connection between these two different VPCs and allow RisingWave to read consumer messages from the broker or send messages to the broker, use the AWS PrivateLink service.
The support for AWS PrivateLink connection is a beta feature and the syntax for CREATE CONNECTION
is subject to change in future versions.
Follow the steps below to create an AWS PrivateLink connection.
-
Create a target group for each broker. Set the target type as IP addresses and the protocol as TCP. Ensure that the VPC of the target group is the same as your cloud-hosted source.
-
Create a Network Load Balancer. Ensure that it is enabled in the same subnets your broker sources are in and the Cross-zone load balancing is also enabled.
-
Create a TCP listener for each MSK broker that corresponds to the target groups created. Ensure the ports are unique.
-
Complete the health check for each target group.
-
Create a VPC endpoint service associated with the Network Load Balancer created. Be sure to add the AWS principal of the account that will access the endpoint service to allow the service consumer to connect. See Manage permissions for more details.
-
Use the
CREATE CONNECTION
command in RisingWave to create an AWS PrivateLink connection referencing the endpoint service created. Here is an example of creating an AWS PrivateLink connection.CREATE CONNECTION connection_name WITH (
type = 'privatelink',
provider = 'aws',
service.name = 'com.amazonaws.xyz.us-east-1.abc-xyz-0000'
); -
Create a source or sink with AWS PrivateLink connection.
- Use the
CREATE SOURCE
command to create a Kafka source with PrivateLink connection. For more details on the syntax, see the Ingest data from Kafka topic. Here is an example of connecting to a Kafka source through AWS PrivateLink.
CREATE SOURCE tcp_metrics_rw (
device_id VARCHAR,
metric_name VARCHAR,
report_time TIMESTAMP,
metric_value DOUBLE PRECISION
) WITH (
connector = 'kafka',
topic = 'tcp_metrics',
properties.bootstrap.server = 'ip1:9092, ip2:9092',
connection.name = 'my_connection',
privatelink.targets = '[{"port": 8001}, {"port": 8002}]',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;- Use the
CREATE SINK
command to create a Kafka sink with PrivateLink connection. For more details on the syntax, see the Sink to Kafka topic. Here is an example of sinking to Kafka with an AWS PrivateLink.
CREATE SINK sink2 FROM mv2
WITH (
connector='kafka',
type='append-only',
properties.bootstrap.server='b-1.xxx.amazonaws.com:9092,b-2.test.xxx.amazonaws.com:9092',
topic='msk_topic',
force_append_only='true',
connection.name = 'connection1',
privatelink.targets = '[{"port": 8001}, {"port": 8002}]'
); - Use the