syntax

CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name [ ( col_name [ , ... ] ) ]
[FROM sink_from | AS select_query]
[ WITH ( sink_option = value [, ...] ) ];

Parameters

Parameter or clauseDescription
sink_nameThe name of the sink. If a schema name is given (for example, CREATE SINK <schema>.<sink> ...), then the sink is created in the specified schema. Otherwise it is created in the current schema.
col_nameThe corresponding table columns in the sink result. For those columns not listed, it will be inserted as the default value defined in the table.
WITH clauseYou can use options such as type = 'append-only' and force_append_only = 'true' to explicitly control the append-only behavior.

A table without a primary key can only accept the append-only sink.

Currently, if there are sinks in the table, the table cannot be altered to add or drop columns.

Enable sink decoupling

Added in v2.5.0.

By default, sink decoupling is disabled for sinks created with CREATE SINK INTO. To enable it, set sink_decouple = true before creating the sink.

SET sink_decouple = true;

CREATE TABLE t (
  v1 INT PRIMARY KEY,
  v2 INT
);

CREATE TABLE t2 (
  v1 INT PRIMARY KEY,
  v2 INT
);

CREATE SINK s1 INTO t FROM t2;

-- If this downstream MV is slow, it will not block the sink s1,
-- since the upstream and downstream are decoupled.
CREATE MATERIALIZED VIEW m1 AS
SELECT v1, SUM(v2)
FROM t
GROUP BY v1;

Examples

You can union data from two different Kafka topics.

CREATE TABLE orders (
    id int primary key,
    price int,
    item_id int,
    customer_id int
);

CREATE source orders_s0 (
    id int primary key,
    price int,
    item_id int,
    customer_id int
) WITH (
    connector = 'kafka',
    topic = 'topic_0',
    ...
) FORMAT PLAIN ENCODE JSON;

CREATE source orders_s1 (
    id int primary key,
    price int,
    item_id int,
    customer_id int
) WITH (
    connector = 'kafka',
    topic = 'topic_1',
    ...
) FORMAT PLAIN ENCODE JSON;

CREATE SINK orders_sink0 INTO orders FROM orders_s0;
CREATE SINK orders_sink1 INTO orders FROM orders_s1;

If you don’t want one of the topics, you can drop it.

DROP SINK orders_sink0;

See also