In RisingWave, a Top-N query includes a ranking function clause and a rank filtering condition. In the ranking function clause, you can include a PARTITION BY clause to fetch top N rows per group.

Syntax

SELECT [column_list]
  FROM (
    SELECT [column_list],
      ranking_function_clause AS rank
    FROM table_name)
WHERE rank_range;

The syntax of the ranking_function_clause is:

function_name() OVER ([PARTITION BY col1[, col2...]]
        ORDER BY col1 [ ASC | DESC ][, col2 [ ASC | DESC ]...])

rank cannot be included in column_list.

You must follow the pattern exactly to construct a valid Top-N query.

ParameterDescription
function_nameRisingWave supports two window functions in top-N queries: row_number(): Returns the sequential row ordinal (1-based) of each row for each ordered partition.rank(): Returns the ordinal (1-based) rank of each row within the ordered partition. All peer rows receive the same rank value. The next row or set of peer rows receives a rank value which increments by the number of peers with the previous rank value.
PARTITION BY clauseSpecifies the partition columns. Each partition will have a Top-N result.
ORDER BY clauseSpecifies how the rows are ordered.
rank_rangeSpecifies the range of the rank number. The rank range is required for the query to be recognized as a top-N query. The range can be specified in these forms. Examples: WHERE M < rank AND rank < N or WHERE rank between M and N. Optionally, you can specify any additional conditions to further filter the results.

Example

Create a table
CREATE TABLE t (x int, y int, z int);
Insert data
INSERT INTO t (x, y, z) VALUES
  (1, 10, 50),
  (1, 10, 60),
  (1, 10, 70),
  (1, 11, 55),
  (1, 11, 65),
  (2, 20, 30),
  (2, 20, 40),
  (2, 21, 25),
  (2, 21, 35),
  (2, 21, 45);

Run a top-N query
SELECT r
  FROM (
    SELECT
      *,
      row_number() OVER (PARTITION BY x ORDER BY y) r
    FROM t
  ) Q
WHERE Q.r < 10;

Converting StreamOverWindow to StreamGroupTopN

Given the following table:

create table t3(v1 int, v2 int);

You may create the following ranked materialized view to sort the data over the partition v1 and order by v2:

explain create materialized view m1 as select r from (select *, row_number() over (PARTITION BY v1 order by v2) r from t3);
                                                                          QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [r, t3._row_id(hidden), t3.v1(hidden)], stream_key: [t3._row_id, t3.v1], pk_columns: [t3._row_id, t3.v1], pk_conflict: NoCheck }
 └─StreamProject { exprs: [row_number, t3._row_id, t3.v1] }
   └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t3.v1 ORDER BY t3.v2 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
     └─StreamExchange { dist: HashShard(t3.v1) }
       └─StreamTableScan { table: t3, columns: [v1, v2, _row_id] }
(5 rows)

The operator responsible for sorting the data is StreamOverWindow. This is a streaming operator that maintains the state of the window and computes the row number for each row in the partition. This can be quite expensive, especially if the partition is large. If you only need the top N rows from each partition, you can use the StreamGroupTopN operator instead. This can be done by simply adding a WHERE clause to the query, and applying it to the row column (r in the above example).

You can view the following query as an example, which maintains top 10 rows for each partition:

explain create materialized view m1 as select r from (select *, row_number() over (PARTITION BY v1 order by v2) r from t3) where r < 10;
                                                                          QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [r, t3.v1(hidden), t3._row_id(hidden)], stream_key: [t3.v1, t3._row_id], pk_columns: [t3.v1, t3._row_id], pk_conflict: NoCheck }
 └─StreamProject { exprs: [row_number, t3.v1, t3._row_id] }
   └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t3.v1 ORDER BY t3.v2 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
     └─StreamGroupTopN { order: [t3.v2 ASC], limit: 9, offset: 0, group_key: [t3.v1] }
       └─StreamExchange { dist: HashShard(t3.v1) }
         └─StreamTableScan { table: t3, columns: [v1, v2, _row_id] }
(6 rows)

Given that the state size to maintain is a lot smaller, this means both reading and writing the state has less overhead. As a result, the StreamGroupTopN operator is much more efficient than the StreamOverWindow operator.