Learn best practices for maximizing the performance of your RisingWave deployments.
This section outlines best practices for optimizing RisingWave performance. Applying these recommendations can help you achieve lower latency, higher throughput, and more efficient resource utilization. These best practices cover various aspects of working with RisingWave, from data modeling and query writing to resource allocation and data ingestion.
Proper data modeling is a foundational aspect of achieving good performance. How you structure your data and define your sources and tables can significantly impact query efficiency.
append only
on a table?In RisingWave, you can declare a source, a table, and an append-only table when connecting to an external upstream system. The key difference lies in whether RisingWave persists the ingested data and whether modifications are allowed:
The difference between the source and the two types of tables is that a source does not persist the ingested data within RisingWave while the tables do. When the data is stored within RisingWave, it gives users the ability to insert, delete, and update data to a table and only insert data to an append-only table. Therefore, users make a tradeoff between taking up more storage space and giving up the ability to modify source data within RisingWave. See details in CREATE SOURCE.
Another difference is the performance implication. Unlike the table, the source and the append-only table will never process any updates or deletes. This gives us opportunities for optimization.
Suppose we have a materialized view that tracks the maximal account balance among all the clients at a retail bank, e.g., CREATE MATERIALIZED VIEW max_account_balance AS SELECT max(balance) FROM account_table
. Since the account balance of each client is frequently changing, we cannot declare it as a source or an append-only table. Then RisingWave has to persist the balances from all the accounts to maintain the result, such as finding how much the second largest balance is when the first largest balance decreases.
Suppose we have another materialized view that tracks the time of the latest transaction among all the transactions, e.g., CREATE MATERIALIZED VIEW time_latest_transaction AS SELECT max(timestamp) FROM transactions
. Since the transaction is irreversible (even if one transaction is a mistake, we correct it with a new transaction), it is perfect for us to declare transactions as a source or an append-only table. Then RisingWave only needs to keep a single data point, i.e., the timestamp of the latest transaction, and simply compare it with the timestamp of a new transaction to update the result.
This append-only versus non-append-only difference can make an impact in a few use cases:
ORDER BY
clause and are only interested in the top N rows.Efficient resource allocation is crucial for achieving optimal performance and avoiding bottlenecks.
The discussion is limited to compute nodes and compactor nodes as other components are not involved in processing in most cases.
RisingWave was built as a scalable distributed processing system from day one. However, just like any other distributed system, being a distributed system introduces extra overhead as there is more network communication among different machines. Moreover, resource fragmentation is more likely to appear with more machines splitting the same total amount of resources. We remark that this is the intrinsic nature of a distributed system but not a special limitation of RisingWave itself.
Therefore, we generally prefer scaling up over scaling out for compute nodes. If the streaming queries are mostly stateless (i.e., those without aggregation, join, or over window functions) and do not involve data shuffling, then scaling up and scaling out is the same. If you are experiencing high latency or slow stream processing, see Troubleshooting - High Latency.
In terms of compactor nodes, we favor scaling up when the resources of the compactor node are less than 4 CPU and 8 GB memory. This is because some compaction tasks can occasionally be quite resource-intensive and use up to 4 CPU and 8 GB memory. However, once the resource of the compactor node exceeds this threshold, both scaling up and scaling out are equally fine. See Monitoring and metrics - Resource utilization to monitor CPU and memory usage, and Monitoring and Metrics - Compaction for compaction-related metrics.
Writing efficient SQL queries is a critical aspect of achieving good performance.
You can checkout the following pages for more details on SQL optimization:
Backfilling refers to the operation where a query processes all existing or historical data.
Suppose we have 100 GB of data in an upstream Kafka topic, with new data continuously flowing in. When we define a query to ingest data from this topic, it first processes all existing data, and this is the backfilling stage. Afterward, it continues to process new incoming data.
RisingWave always tries to ingest and process data as quickly as possible, so the backfilling stage tends to saturate cluster resources. Backfilling performance often worsens with more complex queries, such as large materialized view (MV) or sink queries with many common table expressions (CTEs) or subqueries. While these queries might be efficient for ongoing processing, they may not be ideal for backfilling when the cluster is overloaded.
The reason is straightforward: running a query with 10 CTEs or subqueries is similar to running 10 different queries simultaneously. This can lead to serious resource competition:
Note that this resource competition doesn’t simply mean each of the 10 queries gets 1/10 of the total resources. Extra overhead and limitations can lead to even worse performance than 1/10 of the original.
If you observe high CPU utilization or high cache miss rates during backfilling of a large, complex query, we suggest decomposing it into several independent queries and creating each materialized view one by one. This approach could significantly improve performance. See Troubleshooting - High latency.
During the backfilling process of the “MV on MV” or “MV on SINK INTO TABLE”, the data from the upstream MV may not have good locality. To optimize this, we recommend modifying the SQL query so that the order key of the upstream table aligns with the group key of the downstream MV. This alignment improves the performance of the backfilling process and ensures better efficiency.
Suppose we create a table t
using create table t(v1 int, v2 varchar, v3 timestamp);
. This table contains a large amount of historical data. When creating a new MV on top of this table, it requires backfilling all the historical data, which can be a time-consuming process. Then we create an MV called m
using create materialized view m as select v1, sum(v2) from t group by v1;
. The challenge during backfilling is that when reading data from table t
to build the MV m
, the order of the v1
values may be random. This randomness can lead to poor cache locality and trigger remote I/O, ultimately decreasing performance.
However, if we use create table t(v1 int primary key, v2 varchar);
instead to create a table, or define an intermediate mv like create materialized view tmp as select * from t order by v1;
. The storage of RisingWave will order rows by v1
, which means that rows with same v1
will be read consecutively. This adjustment enhances cache locality and thereby improves performance.
When building the MV on a table populated via SINK (SINK INTO TABLE), backfilling performance can be improved by ensuring the sink writes data in an order that matches the MV’s partitioning or ordering keys.
For example, if the MV uses ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC)
, writing sink data ordered by order_id
, updated_at
helps store related rows consecutively. This improves data locality and allows the MV to backfill more efficiently by reducing random I/O and increasing cache hit rates.
Materialized views (MVs) can significantly improve query performance by pre-computing and storing results.
When building a materialized view (MV), it is possible to build more MVs on top of the existing one. This approach is similar to creating complex functionalities by composing simple functions in a codebase. The benefits of building MVs on MVs include:
Users may have concerns if decomposing a complex pipeline into multiple MVs introduces additional performance overhead. We remark that more decomposition does not lead to more computation overhead but only more storage space. Since RisingWave is typically deployed with cheap object store on the public cloud to store large amounts of data, we generally consider it a less crucial factor in most cases.
Efficient data ingestion is crucial for maintaining low latency and high throughput in your streaming pipelines.
To effectively monitor the progress of direct Change Data Capture (CDC), you can employ two key methods tailored to historical and real-time data for PostgreSQL and MySQL databases. For more details, see Use Direct CDC for PostgreSQL and Use Direct CDC for MySQL. See Monitoring and Metrics - Data Ingestion.
Learn best practices for maximizing the performance of your RisingWave deployments.
This section outlines best practices for optimizing RisingWave performance. Applying these recommendations can help you achieve lower latency, higher throughput, and more efficient resource utilization. These best practices cover various aspects of working with RisingWave, from data modeling and query writing to resource allocation and data ingestion.
Proper data modeling is a foundational aspect of achieving good performance. How you structure your data and define your sources and tables can significantly impact query efficiency.
append only
on a table?In RisingWave, you can declare a source, a table, and an append-only table when connecting to an external upstream system. The key difference lies in whether RisingWave persists the ingested data and whether modifications are allowed:
The difference between the source and the two types of tables is that a source does not persist the ingested data within RisingWave while the tables do. When the data is stored within RisingWave, it gives users the ability to insert, delete, and update data to a table and only insert data to an append-only table. Therefore, users make a tradeoff between taking up more storage space and giving up the ability to modify source data within RisingWave. See details in CREATE SOURCE.
Another difference is the performance implication. Unlike the table, the source and the append-only table will never process any updates or deletes. This gives us opportunities for optimization.
Suppose we have a materialized view that tracks the maximal account balance among all the clients at a retail bank, e.g., CREATE MATERIALIZED VIEW max_account_balance AS SELECT max(balance) FROM account_table
. Since the account balance of each client is frequently changing, we cannot declare it as a source or an append-only table. Then RisingWave has to persist the balances from all the accounts to maintain the result, such as finding how much the second largest balance is when the first largest balance decreases.
Suppose we have another materialized view that tracks the time of the latest transaction among all the transactions, e.g., CREATE MATERIALIZED VIEW time_latest_transaction AS SELECT max(timestamp) FROM transactions
. Since the transaction is irreversible (even if one transaction is a mistake, we correct it with a new transaction), it is perfect for us to declare transactions as a source or an append-only table. Then RisingWave only needs to keep a single data point, i.e., the timestamp of the latest transaction, and simply compare it with the timestamp of a new transaction to update the result.
This append-only versus non-append-only difference can make an impact in a few use cases:
ORDER BY
clause and are only interested in the top N rows.Efficient resource allocation is crucial for achieving optimal performance and avoiding bottlenecks.
The discussion is limited to compute nodes and compactor nodes as other components are not involved in processing in most cases.
RisingWave was built as a scalable distributed processing system from day one. However, just like any other distributed system, being a distributed system introduces extra overhead as there is more network communication among different machines. Moreover, resource fragmentation is more likely to appear with more machines splitting the same total amount of resources. We remark that this is the intrinsic nature of a distributed system but not a special limitation of RisingWave itself.
Therefore, we generally prefer scaling up over scaling out for compute nodes. If the streaming queries are mostly stateless (i.e., those without aggregation, join, or over window functions) and do not involve data shuffling, then scaling up and scaling out is the same. If you are experiencing high latency or slow stream processing, see Troubleshooting - High Latency.
In terms of compactor nodes, we favor scaling up when the resources of the compactor node are less than 4 CPU and 8 GB memory. This is because some compaction tasks can occasionally be quite resource-intensive and use up to 4 CPU and 8 GB memory. However, once the resource of the compactor node exceeds this threshold, both scaling up and scaling out are equally fine. See Monitoring and metrics - Resource utilization to monitor CPU and memory usage, and Monitoring and Metrics - Compaction for compaction-related metrics.
Writing efficient SQL queries is a critical aspect of achieving good performance.
You can checkout the following pages for more details on SQL optimization:
Backfilling refers to the operation where a query processes all existing or historical data.
Suppose we have 100 GB of data in an upstream Kafka topic, with new data continuously flowing in. When we define a query to ingest data from this topic, it first processes all existing data, and this is the backfilling stage. Afterward, it continues to process new incoming data.
RisingWave always tries to ingest and process data as quickly as possible, so the backfilling stage tends to saturate cluster resources. Backfilling performance often worsens with more complex queries, such as large materialized view (MV) or sink queries with many common table expressions (CTEs) or subqueries. While these queries might be efficient for ongoing processing, they may not be ideal for backfilling when the cluster is overloaded.
The reason is straightforward: running a query with 10 CTEs or subqueries is similar to running 10 different queries simultaneously. This can lead to serious resource competition:
Note that this resource competition doesn’t simply mean each of the 10 queries gets 1/10 of the total resources. Extra overhead and limitations can lead to even worse performance than 1/10 of the original.
If you observe high CPU utilization or high cache miss rates during backfilling of a large, complex query, we suggest decomposing it into several independent queries and creating each materialized view one by one. This approach could significantly improve performance. See Troubleshooting - High latency.
During the backfilling process of the “MV on MV” or “MV on SINK INTO TABLE”, the data from the upstream MV may not have good locality. To optimize this, we recommend modifying the SQL query so that the order key of the upstream table aligns with the group key of the downstream MV. This alignment improves the performance of the backfilling process and ensures better efficiency.
Suppose we create a table t
using create table t(v1 int, v2 varchar, v3 timestamp);
. This table contains a large amount of historical data. When creating a new MV on top of this table, it requires backfilling all the historical data, which can be a time-consuming process. Then we create an MV called m
using create materialized view m as select v1, sum(v2) from t group by v1;
. The challenge during backfilling is that when reading data from table t
to build the MV m
, the order of the v1
values may be random. This randomness can lead to poor cache locality and trigger remote I/O, ultimately decreasing performance.
However, if we use create table t(v1 int primary key, v2 varchar);
instead to create a table, or define an intermediate mv like create materialized view tmp as select * from t order by v1;
. The storage of RisingWave will order rows by v1
, which means that rows with same v1
will be read consecutively. This adjustment enhances cache locality and thereby improves performance.
When building the MV on a table populated via SINK (SINK INTO TABLE), backfilling performance can be improved by ensuring the sink writes data in an order that matches the MV’s partitioning or ordering keys.
For example, if the MV uses ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC)
, writing sink data ordered by order_id
, updated_at
helps store related rows consecutively. This improves data locality and allows the MV to backfill more efficiently by reducing random I/O and increasing cache hit rates.
Materialized views (MVs) can significantly improve query performance by pre-computing and storing results.
When building a materialized view (MV), it is possible to build more MVs on top of the existing one. This approach is similar to creating complex functionalities by composing simple functions in a codebase. The benefits of building MVs on MVs include:
Users may have concerns if decomposing a complex pipeline into multiple MVs introduces additional performance overhead. We remark that more decomposition does not lead to more computation overhead but only more storage space. Since RisingWave is typically deployed with cheap object store on the public cloud to store large amounts of data, we generally consider it a less crucial factor in most cases.
Efficient data ingestion is crucial for maintaining low latency and high throughput in your streaming pipelines.
To effectively monitor the progress of direct Change Data Capture (CDC), you can employ two key methods tailored to historical and real-time data for PostgreSQL and MySQL databases. For more details, see Use Direct CDC for PostgreSQL and Use Direct CDC for MySQL. See Monitoring and Metrics - Data Ingestion.