Scaling policies

RisingWave supports adaptive and fixed parallelism for each streaming job, including materialized view, sink, and table.

  • Adaptive parallelism

    Adaptive parallelism is the default setting for newly created streaming jobs since v1.7. In this mode, RisingWave automatically adjusts parallelism to utilize all CPU cores across the compute nodes in the cluster. When nodes are added or removed, parallelism adjusts accordingly based on the current number of CPU cores. To modify the scaling policy to adaptive parallelism, use the SQL command:

    ALTER TABLE t SET PARALLELISM = adaptive;

    To modify on a materialized view:

    ALTER MATERIALIZED VIEW mv SET PARALLELISM = adaptive;
  • Fixed parallelism

    Fixed parallelism is the advanced mode that allows manually specifying a parallelism number that remains constant as the cluster resizes. It’s commonly used to throttle stream bandwidth and ensures predictable resource allocation. For example:

    ALTER TABLE t SET PARALLELISM = 16; -- Replace 16 with the desired parallelism

    When there are many streaming jobs running in the cluster, it’s recommended to use fixed parallelism to avoid overloading the system.

RisingWave distributes its computation across lightweight threads called “streaming actors,” which run simultaneously on CPU cores. By spreading these streaming actors across cores, RisingWave achieves parallel computation, resulting in improved performance, scalability, and throughput.

In both scaling modes, streaming actors will redistribute across the cluster to maintain balanced workloads.

Configuring maximum parallelism

Streaming jobs used to have a maximum parallelism of 256. Starting from RisingWave version 2.2, you can configure the maximum parallelism for streaming jobs. This is controlled by the streaming_max_parallelism session variable, which determines the upper limit for parallelism that can be set for a streaming job.

  • With adaptive parallelism, the parallelism of a streaming job will be capped at the configured maximum parallelism, even if the number of CPU cores in the cluster increases.
  • With fixed parallelism, it’s not allowed to set a parallelism greater than the configured maximum parallelism.

The default value is 256, and the valid range is from 2 to 32768. To modify this setting:

-- Set the maximum parallelism to 512
SET streaming_max_parallelism = 512;

-- Create a materialized view with this setting
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM source_table;

This setting affects all new streaming jobs created within the session. You can also set it system-wide using:

ALTER SYSTEM SET streaming_max_parallelism = 512;

You can check the maximum parallelism of existing streaming jobs from system table rw_streaming_parallelism. See Monitor parallelism for more details.

When deciding on a value, please note that:

  • The maximum parallelism for a job cannot be changed after creation, unless you drop and recreate it.
  • Higher values enable compute-intensive jobs to utilize more resources.
  • Lower values may provide better range scan performance on the result table/materialized view.

Scale-out

Scale-out here refers to the process of adding more compute nodes to the cluster. For frontend nodes, you can simply scale out/in by adding more frontend nodes to the cluster, because they are stateless and can be automatically discovered by the meta nodes.

  1. First, add more compute nodes with kubectl.
# If you are using risingwave-operator
kubectl apply -f <file-with-more-replicas>.yaml # or kubectl edit RisingWave/<name>
# If you are not using risingwave-operator
kubectl scale statefulset/risingwave-compute --replicas=<number-of-replicas>

Then wait until new compute nodes start.

  1. If you are using fixed parallelism, you may need to manually adjust the parallelism of the streaming jobs to utilize the new compute nodes. For adaptive parallelism, the system will automatically adjust the parallelism to utilize the new compute nodes.

Scale-in

Scale-in here refers to the process of decreasing compute nodes from the cluster. By default, there’s a 5-minute delay in scale-in operations. The delay is intentional to prevent unnecessary heavy recovery operations caused by transient failures like network jitters and CPU stalls.

  1. Since v2.0, to trigger an immediate scale-in, apply the following yaml files to decrease the number of compute nodes:
# If you are using risingwave-operator
kubectl apply -f <file-with-less-replicas>.yaml # or kubectl edit RisingWave/<name>

# If you are not using risingwave-operator
kubectl scale statefulset/risingwave-compute --replicas=<number-of-replicas>
  1. If you are using fixed parallelism, you may need to manually adjust the parallelism of the streaming jobs. For adaptive parallelism, the system will automatically adjust the streaming jobs to use less parallelism.

Upgrade to v1.7

After upgrading to v1.7 from prior versions, if the parallelism is unset, streaming jobs will automatically upgrade to adaptive parallelism (adaptive). If the parallelism is set, streaming jobs will use fixed parallelism.

Monitor parallelism

You can use a system table to view the current scaling policy of tables, materialized views, and sinks:

dev=> SELECT * FROM rw_streaming_parallelism;
  id  | name |   relation_type   | parallelism | max_parallelism
------+------+-------------------+-------------+----------------
 1001 | t    | table             | FIXED(4)    |            128
 1002 | mv1  | materialized view | AUTO        |            256
 1004 | idx  | index             | AUTO        |           1024
(3 rows)

To view the parallelism of fragments within a specific streaming job, please use the system table rw_fragment_parallelism instead:

dev=> SELECT * FROM rw_fragment_parallelism WHERE name = 't';
  id  | name |   relation_type   | fragment_id | distribution_type | state_table_ids | upstream_fragment_ids |        flags        | parallelism | max_parallelism
------+------+-------------------+-------------+-------------------+-----------------+-----------------------+---------------------+-------------+----------------
 1001 | t    | table             |           2 | HASH              | {}              | {}                    | {SOURCE,DML}        |           4 |            128
 1001 | t    | table             |           1 | HASH              | {1001}          | {2}                   | {MVIEW}             |           4 |            128

To understand the output of the query, you may need to know about these two concepts: streaming actors and fragments.