Alter a table or source

To add or drop columns from a table or source, simply use the ALTER TABLE or ALTER SOURCE command. For example:

ALTER TABLE customers ADD COLUMN birth_date date;

ALTER SOURCE customers ADD COLUMN birth_date date;

The new column will be NULL for existing records.

Alter a materialized view

To alter a materialized view, you need to create a new materialized view and drop the existing one.

For example, suppose we want to add a new column to the materialized view cust_sales:

CREATE MATERIALIZED VIEW cust_sales AS
    SELECT
        customer_id,
        SUM(total_price) AS sales_amount
    FROM orders
    GROUP BY customer_id;

Here we create a new materialized view cust_sales_new with the new column sales_count:

CREATE MATERIALIZED VIEW cust_sales_new AS
    SELECT
        customer_id,
        SUM(total_price) AS sales_amount,
        COUNT(*) AS sales_count -- The new column
    FROM orders
    GROUP BY customer_id;

After the new materialized view is created, we can drop the old materialized view cust_sales and rename cust_sales_new to cust_sales:

DROP MATERIALIZED VIEW cust_sales;
ALTER MATERIALIZED VIEW cust_sales_new RENAME TO cust_sales;

Alter a sink

To alter a sink, you will need to create a new sink and drop the old sink. Please check the example from the last section.

If your sink is created based on a materialized view using the CREATE SINK ... FROM ... statement, you have the option to specify without_backfill = true to exclude existing data.

CREATE SINK ... FROM ... WITH (without_backfill = true).

Alter streaming jobs with dependencies

If other materialized views or sinks depend upon the modified materialized view, or if multiple materialized views or sinks with dependencies need updating simultaneously, similarly, create all the new ones and then drop all the old ones.

Let’s continue with the previous example, but suppose the upstream orders is not a table but another materialized view, derived from tables order_items and price.

CREATE MATERIALIZED VIEW orders AS
    SELECT
        order_id,
        customer_id,
        SUM(price * quantity) AS total_price
    FROM order_items, price
    WHERE order_items.product_id = price.product_id
    GROUP BY order_id, customer_id;

CREATE MATERIALIZED VIEW cust_sales AS
    SELECT
        customer_id,
        SUM(total_price) AS sales_amount
    FROM orders
    GROUP BY customer_id;

To add a new column sales_count to cust_sales, we need to create the new materialized views cust_sales_new and orders_new first:

CREATE MATERIALIZED VIEW orders_new AS
    SELECT
        order_id,
        customer_id,
        SUM(price * quantity) AS total_price,
        COUNT(*) AS item_count -- The new column
    FROM order_items, price
    WHERE order_items.product_id = price.product_id
    GROUP BY order_id, customer_id;

CREATE MATERIALIZED VIEW cust_sales_new AS
    SELECT
        customer_id,
        SUM(total_price) AS sales_amount,
        SUM(item_count) AS sales_count -- The new column
    FROM orders_new -- the new one
    GROUP BY customer_id;

After the new materialized views are created, we can drop the old materialized view cust_sales and rename cust_sales_new to cust_sales:

DROP MATERIALIZED VIEW cust_sales;
ALTER MATERIALIZED VIEW cust_sales_new RENAME TO cust_sales;

DROP MATERIALIZED VIEW orders;
ALTER MATERIALIZED VIEW orders_new RENAME TO orders;

Modifying a streaming job in place

Only SINK INTO TABLE supports modifying the streaming job logic in place. This approach decouples the streaming processing logic from the persistence layer (the materialization step).

Why not always use SINK INTO TABLE? There are performance implications to consider. Materialized views can trust their upstreams to issue consistent records (for example, no duplicate inserts or deletes for the same primary key). However, tables can have multiple sinks writing records to them, which may cause conflicts. These conflicts require resolution, as specified by the ON CONFLICT OVERWRITE clause.

This trade-off allows for dynamic changes to SINK INTO TABLE, but comes with some performance overhead for conflict resolution.

Example

Here is an example of modifying a streaming job in place.

1

Create the initial objects

Given the source table t:

CREATE TABLE t(c1 int, c2 varchar);

Create an initial materialized view:

CREATE MATERIALIZED VIEW m1 as select c2, count(*) from t group by c2;

Modify this materialized view:

CREATE MATERIALIZED VIEW m2 as select c2, count(*), max(c1) from t group by c2;
2

Convert to `SINK INTO TABLE`

The output schema changes from varchar, int to varchar, int, int. Since this is a materialized view with potential downstream dependencies, we cannot simply drop and recreate it. Instead, let’s convert it to use SINK INTO TABLE:

-- The ON CONFLICT clause ensures that only newer job versions can overwrite existing data.
CREATE TABLE m1_store(
    c2 varchar primary key, 
    cnt int, 
    -- The version column is not required, but recommended. See Notes below.
    version int,
) ON CONFLICT OVERWRITE WITH VERSION COLUMN(version);

CREATE SINK m1_stream_v1 INTO m1_store AS SELECT c2, count(*) as cnt, 1 as version from t group by c2;
3

Add column

Alter the table and add a column to it:

ALTER TABLE m1_store ADD COLUMN max int DEFAULT 0;

Create a new sink with the new definition, incrementing the version number. Now two versions of jobs will co-exist.

CREATE SINK m1_stream_v2 INTO m1_store AS 
SELECT c2, count(*) as cnt, max(c1) as max, 2 as version
from t group by c2;

Drop the V1 streaming job after the V2 job is fully ready:

DROP SINK m1_stream_v1;

Clean up V1 data:

DELETE FROM m1_store WHERE version < 2;

The version column is not required, but is recommended. The benefits are:

  1. You can modify the streaming job without the table becoming stale (i.e., the old streaming job continues producing data).
  2. You can clean up old version data with a DELETE statement when some keys do not exist in the new version of the streaming job.

If temporary downtime is acceptable and you don’t need to clean up old version data, you can omit the version column. You can use a plain ON CONFLICT OVERWRITE clause instead, but you will need to first drop the old sink before creating the new one.

Why is it not possible to modify an MV in place?

Streaming systems like RisingWave need to maintain internal state for streaming operators, such as joins and aggregations. Generally, modifying a materialized view would require consistent changes to the internal state accordingly, which is not always feasible. Let’s look at an example.

Given a table adult_users that tracks the number of users aged ≥ 18.

CREATE MATERIALIZED VIEW adult_users AS
  SELECT
    COUNT(*) as user_count
  FROM users
  WHERE age >= 18;

It was discovered later that the legal definition for adulthood should be set at ≥16. Initially, one might consider modifying the filter condition from age >= 18 to age >= 16 as a straightforward solution. However, this is not feasible in stream processing since records with ages between 16 and 18 have already been filtered out. Therefore, the only option to restore the missing data is to recompute the entire stream from the beginning.

Therefore, we recommend persistently storing the source data in a long-term storage solution, such as a RisingWave table. This allows for the recomputation of the materialized view when altering the logic becomes necessary.