Skip to main content

CREATE TABLE

Use the CREATE TABLE command to create a new table. Tables consist of fixed columns and insertable rows. Rows can be added using the INSERT command. When creating a table, you can specify connector settings and data format.

info

If you choose not to persist the data from the source in RisingWave, use CREATE SOURCE instead.

Syntax

CREATE TABLE [ IF NOT EXISTS ] table_name (
col_name data_type [ PRIMARY KEY ] [ DEFAULT default_expr ] [ AS generation_expression ],
...
[ PRIMARY KEY (col_name, ... ) ]
[ watermark_clause ]
)
[ APPEND ONLY ]
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
[ WITH (
connector='connector_name',
connector_parameter='value', ...)]
[FORMAT data_format ENCODE data_encode [ (
message='message',
schema.location='location', ...) ]
];

Notes

For tables with primary key constraints, if you insert a new data record with an existing key, the new record will overwrite the existing record.

A generated column that is defined with non-deterministic functions cannot be specified as part of the primary key. For example, if A1 is defined as current_timestamp(), then it cannot be part of the primary key.

Names and unquoted identifiers are case-insensitive. Therefore, you must double-quote any of these fields for them to be case-sensitive.

The syntax for creating a table with connector settings and the supported connectors are the same as for creating a source. See CREATE SOURCE for a full list of supported connectors and data formats.

To know when a data record is loaded to RisingWave, you can define a column that is generated based on the processing time (<column_name> timestamptz AS proctime()) when creating the table or source. See also proctime().

For a table with schema from external connector, use * to represent all columns from the external connector first, so that you can define a generated column on table with an external connector. See the example below.

CREATE TABLE from_kafka (
*,
gen_i32_field INT AS int32_field + 2,
PRIMARY KEY (some_key)
)
INCLUDE KEY AS some_key
[INCLUDE { header | offset | partition | timestamp } [AS <column_name>]]
WITH (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092'
)
FORMAT upsert ENCODE AVRO (
schema.registry = 'http://message_queue:8081'
);

Parameters

Parameter or clauseDescription
table_nameThe name of the table. If a schema name is given (for example, CREATE TABLE <schema>.<table> ...), then the table is created in the specified schema. Otherwise it is created in the current schema.
col_nameThe name of a column.
data_typeThe data type of a column. With the struct data type, you can create a nested table. Elements in a nested table need to be enclosed with angle brackets (<>).
DEFAULTThe DEFAULT clause allows you to assign a default value to a column. This default value is used when a new row is inserted, and no explicit value is provided for that column. default_expr is any constant value or variable-free expression that does not reference other columns in the current table or involve subqueries. The data type of default_expr must match the data type of the column.
generation_expressionThe expression for the generated column. For details about generated columns, see Generated columns.
watermark_clauseA clause that defines the watermark for a timestamp column. The syntax is WATERMARK FOR column_name as expr. For the watermark clause to be valid, the table must be an append-only table. That is, the APPEND ONLY option must be specified. This restriction only applies to a table. For details about watermarks, refer to Watermarks.
APPEND ONLYWhen this option is specified, the table will be created as an append-only table. An append-only table cannot have primary keys. UPDATE and DELETE statements are not valid for append-only tables. Note that append-only tables is a Beta feature.
INCLUDE clauseExtract fields not included in the payload as separate columns. For more details on its usage, see INCLUDE clause.
WITH clauseSpecify the connector settings here if trying to store all the source data. See the Data ingestion page for the full list of supported source as well as links to specific connector pages detailing the syntax for each source.
FORMAT and ENCODE optionsSpecify the data format and the encoding format of the source data. To learn about the supported data formats, see Data formats.

Watermarks

RisingWave supports generating watermarks when creating an append-only streaming table. Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. For more information on the syntax on how to create a watermark, see Watermarks.

Examples

The statement below creates a table that has three columns.

CREATE TABLE taxi_trips(
id VARCHAR,
distance DOUBLE PRECISION,
city VARCHAR
);

The statement below creates a table that includes nested tables.

CREATE TABLE IF NOT EXISTS taxi_trips(
id VARCHAR,
distance DOUBLE PRECISION,
duration DOUBLE PRECISION,
fare STRUCT<
initial_charge DOUBLE PRECISION,
subsequent_charge DOUBLE PRECISION,
surcharge DOUBLE PRECISION,
tolls DOUBLE PRECISION>);

The statement below creates a table with a Kafka broker as the source.

CREATE TABLE IF NOT EXISTS table_abc (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp_millis='140000000',
) FORMAT PLAIN ENCODE JSON;

Help us make this doc better!