syntax
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name [ ( col_name [ , ... ] ) ]
[FROM sink_from | AS select_query]
[ WITH ( sink_option = value [, ...] ) ];
Parameters
| Parameter or clause | Description |
|---|
| sink_name | The name of the sink. If a schema name is given (for example, CREATE SINK <schema>.<sink> ...), then the sink is created in the specified schema. Otherwise it is created in the current schema. |
| col_name | The corresponding table columns in the sink result. For those columns not listed, it will be inserted as the default value defined in the table. |
WITH clause | You can use options such as type = 'append-only' and force_append_only = 'true' to explicitly control the append-only behavior. |
A table without a primary key can only accept the append-only sink.
Enable sink decoupling
By default, sink decoupling is disabled for sinks created with CREATE SINK INTO. To enable it, set sink_decouple = true before creating the sink.
SET sink_decouple = true;
CREATE TABLE t (
v1 INT PRIMARY KEY,
v2 INT
);
CREATE TABLE t2 (
v1 INT PRIMARY KEY,
v2 INT
);
CREATE SINK s1 INTO t FROM t2;
-- If this downstream MV is slow, it will not block the sink s1,
-- since the upstream and downstream are decoupled.
CREATE MATERIALIZED VIEW m1 AS
SELECT v1, SUM(v2)
FROM t
GROUP BY v1;
Examples
You can union data from two different Kafka topics.
CREATE TABLE orders (
id int primary key,
price int,
item_id int,
customer_id int
);
CREATE source orders_s0 (
id int primary key,
price int,
item_id int,
customer_id int
) WITH (
connector = 'kafka',
topic = 'topic_0',
...
) FORMAT PLAIN ENCODE JSON;
CREATE source orders_s1 (
id int primary key,
price int,
item_id int,
customer_id int
) WITH (
connector = 'kafka',
topic = 'topic_1',
...
) FORMAT PLAIN ENCODE JSON;
CREATE SINK orders_sink0 INTO orders FROM orders_s0;
CREATE SINK orders_sink1 INTO orders FROM orders_s1;
If you don’t want one of the topics, you can drop it.
See also