Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. The RisingWave Redshift sink connector provides efficient data ingestion with support for automatic schema changes and both S3-based and direct loading methods.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='redshift',
   connector_parameter = 'value', ...
);

Parameters

ParameterDescription
jdbc.urlJDBC URL to connect to Redshift
userRedshift username
passwordRedshift password
schemaRedshift schema name
table.nameName of the target table
intermediate.table.nameName of the intermediate table used for upsert mode. No need to fill this out in append-only mode
auto_schema_changeEnable automatic schema change for upsert sink; adds new columns to target table if needed
create_table_if_not_existsCreate target table if it does not exist
write.target.interval.secondsInterval in seconds for scheduled writes (default: 3600)
batch_insert_rowsNumber of rows per batch insert (default: 4096)
S3 parameters These options only need to be set when with_s3 = true:
ParameterDescription
with_s3Enable writing via S3 (default: true)
s3.region_nameAWS region for S3
s3.bucket_nameS3 bucket name
s3.pathS3 folder path for sink files
enable_config_loadLoad S3 credentials from environment (self-hosted only)
s3.credentials.accessAWS access key ID
s3.credentials.secretAWS secret access key
s3.endpoint_urlCustom S3 endpoint URL (for self-hosted setups)
s3.assume_roleIAM role ARN to assume for S3 access

Auto schema change

Redshift supports sink auto schema change when sink_decoupling is false. With auto schema change enabled in the Redshift sink, RisingWave can automatically evolve the schema of your Redshift table. When new columns are detected in the source data, RisingWave will add those columns to the Redshift table before ingesting new records. This automation simplifies data pipeline maintenance and ensures that your Redshift table structure always reflects the latest upstream schema. To enable it, set auto_schema_change to true, and RisingWave will automatically add new columns to the target table.

Configure RisingWave to write to S3

You need to configure how RisingWave authenticates with AWS S3. There are two primary methods:
  • Access Key / Secret Key (AK/SK) authentication
This is the default method. Provide your AWS Access Key ID and Secret Access Key directly in the CREATE SINK statement.
s3.credentials.access = 'YOUR_AWS_ACCESS_KEY_ID',
s3.credentials.secret = 'YOUR_AWS_SECRET_ACCESS_KEY',
  • Assume role authentication
For enhanced security, RisingWave can assume an IAM Role in your AWS account to gain temporary credentials for S3 access.
s3.assume_role = 'arn:aws:iam::123456789012:role/YourRisingWaveS3Role',
enable_config_load = 'true',
To use this method, you need to configure an IAM Role in AWS that RisingWave can assume. This involves:
  1. Obtaining the RisingWave service ARN from our support team.
  2. Creating an IAM policy with the necessary S3 read/write permissions for your bucket and prefix.
  3. Configuring the IAM Role’s trust policy to allow the RisingWave service ARN to assume it.

Append-only and upsert modes

Amazon Redshift sink connector supports both append-only and upsert modes for flexible data handling. In upsert mode, performance is optimized through the use of an intermediate table:
  • An intermediate table is created to stage data before merging it into the target table. If create_table_if_not_exists is set to true, the table is automatically named rw_<target_table_name>_<uuid>.
  • Data is periodically merged from the intermediate table into the target table according to the write.target.interval.seconds setting.
  • By default, an S3 bucket is required to achieve optimal ingestion performance into the intermediate table.
  • Alternatively, you can use INSERT SQL statements to load data directly into the intermediate table, though this approach is not recommended due to performance drawbacks.
Examples:
  1. Redshift sink with S3 writer (Append-only mode)
CREATE SINK redshift_sink FROM test_table WITH (
    connector = 'redshift',
    type = 'append-only',
    table.name = 'test_table',
    schema = 'RW_SCHEMA',

    -- JDBC configs are always required for Redshift
    jdbc.url = 'jdbc:redshift://...',
    username = '...',
    password = '...',

    -- `with_s3` is default to true. If `with_s3` is set to false (not recommended), you to specify `batch.insert.rows`, and the value must be a power of 2, such as 1024 or 4096.
    with_s3 = true,
    s3.bucket_name = '...',
    s3.region_name = '...',
    s3.path = '...',

    -- Authentication: access key / secret (default)
    s3.credentials.access = '...',
    s3.credentials.secret = '...',

    -- Or assume role (requires cloud support)
    s3.assume_role = '...',
    enable_config_load = 'true',

    -- Defaults (can be overridden)
    auto.schema.change = 'false',
    create_table_if_not_exists = 'false'
);
  1. Redshift sink with S3 writer (Upsert mode)
CREATE SINK redshift_sink FROM test_table WITH (
    connector = 'redshift',
    type = 'upsert',
    table.name = 'test_table',
    schema = 'RW_SCHEMA',

    -- Intermediate table required for upsert
    intermediate.table.name = '...',
    -- Default: 3600 seconds (1 hour)
    write.target.interval.seconds = '...',

    -- JDBC configs are always required for Redshift
    jdbc.url = 'jdbc:redshift://...',
    username = '...',
    password = '...',

    -- `with_s3` is default to true. If `with_s3` is set to false (not recommended), you to specify `batch.insert.rows`, and the value must be a power of 2, such as 1024 or 4096.
    with_s3 = true,
    s3.bucket_name = '...',
    s3.region_name = '...',
    s3.path = '...',

    -- Authentication: access key / secret (default)
    s3.credentials.access = '...',
    s3.credentials.secret = '...',

    -- Or assume role (requires cloud support)
    s3.assume_role = '...',
    enable_config_load = 'true',

    -- Defaults (can be overridden)
    auto.schema.change = 'false',
    create_table_if_not_exists = 'false'
);

Set up Redshift

When configuring RisingWave to write to Redshift, the JDBC user must have appropriate permissions depending on whether the create_table_if_not_exists option is enabled.
  • If create_table_if_not_exists is not enabled, the user must have permissions on the target table:
GRANT SELECT, INSERT, UPDATE, DELETE, ALTER ON [table_name] TO [username];
These permissions allow RisingWave to read, insert, update, delete, and alter the existing table.
  • If create_table_if_not_exists is enabled, the user needs the table permissions above plus schema-level permissions to create new tables:
GRANT CREATE ON SCHEMA [schema_name] TO [username];
This ensures that RisingWave can create tables in the specified schema when they do not already exist.

Set up Redshift S3 integration

When using S3 as an intermediate storage for Redshift sinks, you need to configure assume role permissions so that RisingWave can write to the user’s S3 bucket.
  1. Permissions for the S3 account configured in RisingWave
The S3 account credentials used in RisingWave must have the following permissions on the target bucket/path:
s3:GetObject
s3:ListBucket
s3:PutObject
s3:DeleteObject
These permissions allow RisingWave to read, list, write, and delete files in the staging S3 location.
  1. Permissions for Redshift to access the S3 account
Redshift itself also needs permissions to read from the same S3 bucket. Grant the following permissions to Redshift’s IAM role:
s3:GetObject
s3:ListBucket
This allows Redshift to copy data from S3 into the Redshift table.

Set up S3 IAM and role

To guarantee that the IAM role has sufficient permissions to connect to Redshift and access AWS resources:
  1. Attach policy to the IAM Role
Attach the policy AmazonRedshiftAllCommandsFullAccess to the role.
  1. Configure trusted entities
In the IAM Role trust relationship, allow Redshift service to assume the role:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "redshift.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
  1. Attach the IAM role to Redshift instance
  • Navigate to the Amazon Redshift service in the AWS Management Console.
  • In the left navigation pane, select Clusters and choose the cluster you want to configure.
  • From the Actions menu, select Manage IAM Roles, and attach the configured IAM role.