To modify the SQL definition of a sink, please refer to Alter a streaming job.

Syntax

ALTER SINK sink_name
    alter_option;
alter_option depends on the operation you want to perform on the sink. For all supported clauses, see the sections below.

Clause

OWNER TO

ALTER SINK sink_name
    OWNER TO new_user;
Parameter or clauseDescription
OWNER TOThis clause changes the owner of the sink. This will cascadingly change all related internal-objects as well.
new_userThe new owner you want to assign to the sink.
-- Change the owner of the sink named "sink1" to user "user1"
ALTER SINK sink1 OWNER TO user1;

SET SCHEMA

ALTER SINK sink_name
    SET SCHEMA schema_name;
Parameter or clauseDescription
SET SCHEMAThis clause moves the sink to a different schema.
schema_nameThe name of the schema to which the sink will be moved.
-- Move the sink named "test_sink" to the schema named "test_schema"
ALTER SINK test_sink SET SCHEMA test_schema;

SET PARALLELISM

ALTER SINK sink_name
SET PARALLELISM { TO | = } parallelism_number;
Parameter or clauseDescription
SET PARALLELISMThis clause controls the degree of parallelism for the targeted streaming job.
parallelism_numberCan be ADAPTIVE or a fixed number (e.g., 1, 2, 3). Setting it to ADAPTIVE expands the job’s parallelism to all available units, while a fixed number locks it at that value. Setting it to 0 is equivalent to ADAPTIVE. The maximum allowed value is determined by the max_parallelism of the job. For more information, see Configuring maximum parallelism.
-- Set the parallelism of the sink "s" to 4.
ALTER SINK s SET PARALLELISM = 4;

SET SINK_RATE_LIMIT

ALTER SINK sink_name
    SET SINK_RATE_LIMIT { TO | = } { default | rate_limit_number };
Use this statement to modify the rate limit of a sink. For the specific value of SINK_RATE_LIMIT, refer to How to view runtime parameters.
Example
-- Alter the rate limit of a sink to default
ALTER SINK s1 SET SINK_RATE_LIMIT = default;

-- Alter the rate limit of a sink to 1000
ALTER SINK s1 SET SINK_RATE_LIMIT = 1000;

RENAME TO

ALTER SINK sink_name
    RENAME TO new_name;
Parameter or clauseDescription
RENAME TOThis clause changes the name of the sink.
new_nameThe new name of the sink.
-- Change the name of the sink named "sink0" to "sink1"
ALTER SINK sink0 RENAME TO sink1;

SWAP WITH

ALTER SINK name
SWAP WITH target_name;
ParameterDescription
nameThe current name of the sink to swap.
target_nameThe target name of the sink you want to swap with.
-- Swap the names of the log_sink sink and the error_sink sink.
ALTER SINK log_sink
SWAP WITH error_sink;

CONNECTOR WITH

Added in v2.5.0.
ALTER SINK sink_name CONNECTOR WITH (
  'property_name' = 'value',
  ...
);
ParameterDescription
CONNECTOR WITHAllows you to update connector-specific properties for an existing sink without recreating it.
property_nameThe properties vary by connector type:
  • For ClickHouse, Iceberg, DeltaLake, and StarRocks, the supported parameter is commit_checkpoint_interval.
  • For Kafka, the supported parameters include: properties.allow.auto.create.topics, properties.batch.num.messages, properties.batch.size, properties.enable.idempotence, properties.max.in.flight.requests.per.connection, properties.message.max.bytes, properties.message.send.max.retries, properties.message.timeout.ms, properties.queue.buffering.max.kbytes, properties.queue.buffering.max.messages, properties.queue.buffering.max.ms, properties.request.required.acks, properties.retry.backoff.ms, properties.receive.message.max.bytes.
-- Enable idempotence for the sink
ALTER SINK my_kafka_sink CONNECTOR WITH (
  'properties.enable.idempotence' = 'true',
);