> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# CREATE CONNECTION

> Use the `CREATE CONNECTION` command to create a reusable catalog for connector parameters.

The `CREATE CONNECTION` command creates a reusable connection configuration that can be referenced when creating sources, sinks, or tables. Currently supported connection types are Kafka, Schema Registry, and Iceberg.

<Note>
  Added in v2.3.0: Support `iceberg` connection.
</Note>

## Syntax

```sql theme={null}
CREATE CONNECTION [ IF NOT EXISTS ] connection_name
WITH (
    type = '<connector_type>',
    connection_parameter = SECRET `<secret_name>`,
    ...
);
```

## Parameter

| Parameter or clause | Description                                                                                                                                                                                                                                      |
| :------------------ | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| type                | **Required**. The type of connection. Supported values: `kafka`, `schema_registry`, `iceberg`.                                                                                                                                                   |
| secret\_name        | Use the `SECRET` keyword to reference secrets, allowing sensitive information to be stored securely and referenced in the connection configuration. Changes to the secret are automatically applied, so there's no need to alter the connection. |

<Accordion title="Click to see all supported properties for Kafka connection.">
  **Required**

  * `properties.bootstrap.server`: The Kafka bootstrap server addresses.

  **Optional**

  For SSL/SASL authentication:

  * `properties.security.protocol`
  * `properties.ssl.endpoint.identification.algorithm`
  * `properties.ssl.ca.location`
  * `properties.ssl.ca.pem`
  * `properties.ssl.certificate.location`
  * `properties.ssl.certificate.pem`
  * `properties.ssl.key.location`
  * `properties.ssl.key.pem`
  * `properties.ssl.key.password`
  * `properties.sasl.mechanism`
  * `properties.sasl.username`
  * `properties.sasl.password`
  * `properties.sasl.kerberos.service.name`
  * `properties.sasl.kerberos.keytab`
  * `properties.sasl.kerberos.principal`
  * `properties.sasl.kerberos.kinit.cmd`
  * `properties.sasl.kerberos.min.time.before.relogin`
  * `properties.sasl.oauthbearer.config`

  For PrivateLink connection:

  * `privatelink.targets`
  * `privatelink.endpoint`

  For AWS authentication:

  * `aws.region`
  * `endpoint`
  * `aws.credentials.access_key_id`
  * `aws.credentials.secret_access_key`
  * `aws.credentials.session_token`
  * `aws.credentials.role.arn`
  * `aws.credentials.role.external_id`
</Accordion>

<Accordion title="Click to see all supported properties for Schema Registry.">
  | Property name            | Description                                                     |
  | :----------------------- | :-------------------------------------------------------------- |
  | schema.registry          | The base URL of the Schema Registry service.                    |
  | schema.registry.username | The username for authenticating to the Schema Registry service. |
  | schema.registry.password | The password for authenticating to the Schema Registry service. |
</Accordion>

<Accordion title="Click to see all supported properties for Iceberg connection.">
  | Property name                | Description                                                                                                          |
  | :--------------------------- | :------------------------------------------------------------------------------------------------------------------- |
  | catalog.type                 | Type of the Iceberg catalog.                                                                                         |
  | s3.region                    | AWS S3 region.                                                                                                       |
  | s3.endpoint                  | AWS S3 endpoint.                                                                                                     |
  | s3.access.key                | AWS S3 access key.                                                                                                   |
  | s3.secret.key                | AWS S3 secret key.                                                                                                   |
  | gcs.credential               | Google Cloud Storage credential.                                                                                     |
  | azblob.account\_name         | The Azure Storage account name.                                                                                      |
  | azblob.account\_key          | The Azure Storage account key associated with the account name                                                       |
  | azblob.endpoint\_url         | The full endpoint URL of the Azure Blob service.                                                                     |
  | warehouse.path               | Path of the Iceberg warehouse, applicable only in `storage` catalog.                                                 |
  | catalog.name                 | Name of the catalog, optional for `storage` catalog, required for others.                                            |
  | catalog.uri                  | URI of the Iceberg catalog, applicable only in `rest` catalog.                                                       |
  | catalog.credential           | Credential for accessing Iceberg catalog, applicable only in `rest` catalog.                                         |
  | catalog.token                | Token for interacting with `rest` catalog server.                                                                    |
  | catalog.oauth2\_server\_uri  | OAuth2 token endpoint URI, applicable only in `rest` catalog.                                                        |
  | catalog.scope                | Additional OAuth2 scope for accessing the Iceberg catalog, applicable only in `rest` catalog.                        |
  | catalog.rest.signing\_region | The signing region when signing requests to the `rest` catalog.                                                      |
  | catalog.rest.signing\_name   | The signing name when signing requests to the `rest` catalog.                                                        |
  | catalog.rest.signing\_region | Specify whether to use SigV4 for signing requests to the `rest` catalog.                                             |
  | s3.path.style.access         | Enables path-style access for AWS S3.                                                                                |
  | catalog.jdbc.user            | JDBC user for catalog access.                                                                                        |
  | catalog.jdbc.password        | JDBC password for catalog access.                                                                                    |
  | enable\_config\_load         | If set to `true`, load warehouse credentials from environment variables. Only supported in self-hosted environments. |

  <Note>
    In RisingWave Cloud, AWS credentials (`s3.access.key`, `s3.secret.key`) are required and cannot be omitted. In self-hosted deployments, you may omit them to rely on the AWS SDK default credential chain (for example, EC2 instance profile or environment variables).
  </Note>
</Accordion>

## Example

To connect to a schema registry:

```sql theme={null}
CREATE CONNECTION sr_conn WITH (
  type = 'schema_registry',
  schema.registry = 'http://...',
  schema.registry.username = 'admin_user',
  schema.registry.password = 'schema_registry_password'
);
```

To create a Kafka connection that securely integrates secrets:

```sql theme={null}
CREATE CONNECTION conn_kafka WITH (
    type = 'kafka',
    properties.bootstrap.server='<broker addr>', 
    properties.sasl.mechanism='PLAIN', 
    properties.security.protocol='SASL_PLAINTEXT', 
    properties.sasl.username=SECRET <username>, 
    properties.sasl.password=SECRET <password>
);
```

```sql theme={null}
CREATE TABLE t WITH (
    connector = 'kafka', 
    topic = 'demo-topic', 
    connection = conn_kafka
) FORMAT PLAIN ENCODE AVRO (connection = sr_conn);
```

To create an Iceberg connection:

```sql theme={null}
CREATE CONNECTION CONN WITH (
    type = 'iceberg',
    catalog.name = 'demo',
    catalog.type = 'storage',
    warehouse.path = 's3a://hummock001/iceberg-data',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.region = 'us-east-1',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin'
);
```

```sql theme={null}
CREATE SINK sink1 from s1 WITH (
    connector = 'iceberg',
    type = 'upsert',
    database.name = 'demo_db',
    table.name = 'test_connection_table',
    connection = conn,
    create_table_if_not_exists = 'true',
    commit_checkpoint_interval = 1,
    primary_key = 'i1,i2',
);
```

To create a source, table or sink from the connection, the name of connector and connection must match those specified above. Also, the attributes defined in the connection and the source/table/sink cannot overlap:

```sql theme={null}
CREATE SINK sink_kafka from data_table WITH (
  connector = 'kafka',
  connection = conn_kafka,
  topic = 'connection_ddl_1'
) FORMAT PLAIN ENCODE JSON (
  force_append_only='true'
);
```

## Create an AWS PrivateLink connection

If you are using a cloud-hosted source or sink, such as AWS MSK, there might be connectivity issues when your service is located in a different VPC from where you have deployed RisingWave. To establish a secure, direct connection between these two different VPCs and allow RisingWave to read consumer messages from the broker or send messages to the broker, use the [AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/privatelink-share-your-services.html) service.

Follow the steps below to create an AWS PrivateLink connection.

1. Create a [target group](https://docs.aws.amazon.com/elasticloadbalancing/latest/network/create-target-group.html) for each broker. Set the **target type** as **IP addresses** and the **protocol** as **TCP**. Ensure that the VPC of the target group is the same as your cloud-hosted source.
2. Create a [Network Load Balancer](https://docs.aws.amazon.com/elasticloadbalancing/latest/network/create-network-load-balancer.html). Ensure that it is enabled in the same subnets your broker sources are in and the Cross-zone load balancing is also enabled.
3. Create a [TCP listener](https://docs.aws.amazon.com/elasticloadbalancing/latest/network/create-listener.html) for each MSK broker that corresponds to the target groups created. Ensure the ports are unique.
4. Complete the [health check](https://docs.aws.amazon.com/elasticloadbalancing/latest/network/target-group-health-checks.html) for each target group.
5. Create a [VPC endpoint service](https://docs.aws.amazon.com/vpc/latest/privatelink/create-endpoint-service.html) associated with the Network Load Balancer created. Be sure to add the AWS principal of the account that will access the endpoint service to allow the service consumer to connect. See [Manage permissions](https://docs.aws.amazon.com/vpc/latest/privatelink/configure-endpoint-service.html#add-remove-permissions) for more details.
6. Use the `CREATE CONNECTION` command in RisingWave to create an AWS PrivateLink connection referencing the endpoint service created. Here is an example of creating an AWS PrivateLink connection.

```sql theme={null}
CREATE CONNECTION connection_name WITH (
    type = 'privatelink',
    provider = 'aws',
    service.name = 'com.amazonaws.xyz.us-east-1.abc-xyz-0000'
);
```

7. Create a source or sink with AWS PrivateLink connection.
   * Use the `CREATE SOURCE/TABLE` command to create a Kafka source with PrivateLink connection. For more details, see [Create source with AWS PrivateLink connection](/integrations/sources/kafka#create-source-with-privatelink-connection).
   * Use the `CREATE SINK` command to create a Kafka sink with PrivateLink connection. For more details, see [Create sink with AWS PrivateLink connection](/integrations/destinations/apache-kafka#create-sink-with-vpc-connection).
