RisingWave comprises several key components: Serving, Streaming, Compactor and the Meta service. At its core, RisingWave is a unified batch and streaming database. If you are new to RisingWave, we recommend starting with What is RisingWave to gain a comprehensive understanding.

Serving node

This service handles user requests and is designed to be compatible with the PostgreSQL wire protocol, allowing tools like psql to connect seamlessly

  • For batch queries, the service executes them directly.
  • For streaming queries, it generates an execution plan and dispatches it to the stream engine for processing.

Batch query execution modes

There are two serving execution modes, local and distributed. Depending on the projected computational workload of the batch query, the system automatically selects one of these modes.

  • For queries that don’t require extensive computation, the primary overhead is likely in the initial optimization phases. In such cases, we use local execution mode. This mode avoids full optimizer passes and opts for simpler, heuristic-based passes. Point queries typically use this execution mode.

  • For more complex queries with several joins and aggregations, we use distributed execution mode. These queries often require more time during batch execution; therefore, we thoroughly optimize them and distribute their execution across the serving worker nodes.

The iceberg serving engine operates when a table is specified with engine=iceberg, powered by Apache Iceberg table format. Data is stored in a columnar structure to enhance performance for ad hoc OLAP-style queries.

Batch query lifecycle

When a user submits a query, it first goes to the serving node where the parser converts the raw query text into an Abstract Syntax Tree (AST). Next, the binder matches the query elements in the ASTs to actual database objects, creating a Bound AST. During this binding step, table names are linked to their actual specifications, and wildcards (*) are expanded to show all physical columns in a table. Finally, the optimizer processes this Bound AST through several optimization passes to create a batch execution plan.

The fragmenter breaks the execution plan into fragments, which are groups of execution nodes that share the same data distribution to minimize data shuffling.

The scheduler then manages the execution of this fragmented plan. It spawns multiple instances of each execution node per data partition for parallel computation. A final shuffle aggregates all partitioned results into a single instance, and a root node performs any necessary sorting. Finally, the root node sends the results back to the session.

Streaming node

The streaming node executes streaming queries. This involves managing their state and performing computations such as aggregations and joins.

Streaming query

These are queries that run incremental, “real-time” computation. Given a normal batch query like:

SELECT COUNT(*) FROM t;

By prefixing CREATE MATERIALIZED VIEW, it can be changed to its streaming equivalent:

CREATE MATERIALIZED VIEW m1 AS SELECT COUNT(*) FROM t;

Instead of executing once, the query executes continuously as updates arrive from its upstream relations. For example, when table t receives a DML update like INSERT INTO t VALUES(1), this update propagates to m1. The stream graph for m1 takes the last count, adds 1 to it, and materializes this new count. You can query the latest results from m1 at any time using SELECT * FROM m1.

Streaming query lifecycle

The query will go through the same initial phases as the Batch query lifecycle. After fragmentation, we diverge from the batch execution path, and send the execution plans to the meta node.

Using the execution plan, the meta node schedules jobs on streaming worker nodes by instructing them to construct the execution nodes specified in the plan. These execution nodes handle filtering, join, aggregation and various other computations. You can run EXPLAIN to see what the execution graph looks like.

Once the execution nodes are built, we trigger historical data backfilling to ensure consistency with upstream sources. After backfilling completes, the streaming job will be created to continuously process upstream data, materialize updates, and propagate the transformed data stream to any downstream systems. See An overview of the RisingWave streaming engine for more information.

Meta node

The meta node manages cluster metadata by interacting with a meta store, which serves as the persistence layer for metadata. RisingWave supports Postgres, MySQL, and SQLite as meta store options. All database objects are persisted in the meta store, while the serving node retrieves the database catalog from the meta node and caches it locally to serve queries.

Additionally, the meta node manages the lifecycle of streaming jobs, including their creation, state checkpointing for consistency, and eventual deletion.

  • For job creation: The serving node sends the planned query to the meta node. The meta node then instantiates the query into actors and assigns them to compute nodes. It then triggers actor creation on the designated compute nodes.

  • For checkpointing: The meta node sends barriers to the streaming nodes. These nodes commit their state and propagate the barriers downstream. Terminal nodes then return the barriers to the meta node for collection. Once all checkpoint barriers are collected and the state is uploaded to the object store, the checkpoint process completes, resulting in a consistent snapshot.

  • For recovery and compaction: the meta node recreates actors on the compute node, starting from the last checkpoint snapshot to maintain consistency. The meta node manages compaction by generating compaction tasks, assigning them to compaction nodes, and updating the meta store’s metadata upon task completion.

Compactor

RisingWave employs a Log Structured Merge (LSM) Tree storage model, meaning that all data operations are handled in an append-only manner, even deletions are represented as tombstone records. The storage is tiered, organized hierarchically from L0 to L6 (or potentially more levels). The data is initially written to L0 and progressively compacted into higher levels (L0 → L1 → L2, etc.). As a result, hot data resides in lower levels (frequently accessed), while cold data migrates to higher levels (less frequently accessed).

Compaction is required to ensure any reads from storage are not slow. Data written to L0 is unsorted, but compaction sorts it starting from L1 onward. To read a consistent snapshot of the data, we need to merge-sort it based on the access key range, as different parts of the data may reside in different levels from L0 to L6. If the data is fragmented across all levels, and there is a large unsorted chunk in L0, the merge sort will take longer. However, if compaction runs regularly, L0 should remain small and well-sorted. By doing this ahead of time, it minimizes the amount of sorting and merging needed during reads to ensure faster reads.

Node interactions

This section covers how different nodes interact, providing insights into resource isolation at the node level and scaling considerations.

Compute-Storage

Object storage serves as our persistence layer. We use OpenDAL to abstract over different storage backends such as AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, and Minio.

This architecture means there’s no limit to the amount of data that can be warehoused in RisingWave. It also allows us to scale Compute Nodes freely without scaling storage, since storage capacity is virtually unlimited.

Each Compute Node operates independently, enabling high parallelism for both streaming and batch jobs. Data streams are partitioned and hashed to vnodes. As each Compute Node owns an independent set of vnodes, its computation remains isolated from other Compute Nodes.

Compute-Compactor

As the compute node scales up in resources, for instance, from 1 CPU to 8 CPUs, the parallelism of data ingestion also increases proportionally. This scaling can raise ingestion rates from 100K records per second to 800K records per second. The corresponding increase in write pressure on storage often leads to significant accumulation of unsorted data in the L0 level.

This accumulation directly impacts read performance because, when reading a range of values, we need to sort them to resolve conflicts between older and newer records. This affects both serving and streaming performance, as streaming sometimes needs to read states to update them or refresh the operator cache for stateful queries.

Hence, it is typically recommended to maintain a 2:1 ratio between compute and compactor sizes. For instance, with a 4 CPU cores, 16 GB memory compute node, we recommend using a 2 CPU cores, 8 GB compactor node. In a write-heavy workload, such as when 500K rows are written to storage per second, we recommend a 1:8 compute-to-compactor ratio. This typically occurs during the backfilling of historical data, where the ingestion size is really large. See how to diagnose compaction bottleneck for more information.

In RisingWave Cloud, we can use Serverless Compaction (currently in Technical preview stage) to automatically scale compactors and simplify compaction resource management.

Compute-Meta

As the compute node scales up in resources, the metadata size also increases. For instance, if we scale from 1 to 64 cores, the number of actors increases by 64 times. We need to capture the metadata of these new actors, which puts pressure on the meta node.

As such, we generally recommend a 4:1 ratio between compute and meta sizes. For instance, with 8 CPU cores for the compute node, we recommend 2 CPU cores for the meta node.

CPU-Memory

When scaling vertically, we should maintain a 1:4 ratio between CPU and memory. This is because memory is used by stateful operators to maintain a cache that avoids latency penalties. It is also used to cache blocks of SortedStringTable (SST) files that are fetched from S3.

If memory is insufficient, disk cache can serve as an intermediary tier between the in-memory block cache and the object store.

Related topics