Skip to main content

CREATE AGGREGATE

The CREATE AGGREGATE command can be used to create user-defined aggregate functions (UDAFs). Currently, UDAFs are only supported in Python and JavaScript as embedded UDFs.

Syntax

CREATE AGGREGATE function_name ( argument_type [, ...] )
RETURNS return_type
LANGUAGE language_name
AS $$ function_body $$;

Parameters

Parameter or clauseDescription
function_nameThe name of the aggregate function that you want to declare in RisingWave.
argument_typeThe data type of the input parameter(s) that the function expects to receive.
RETURNS return_typeThe data type of the return value from the aggregate function.
LANGUAGEThe programming language used to implement the UDAF.
Currently, Python and JavaScript are supported.
AS function_bodyThe source code of the UDAF.

In the function_body, the code should define several functions to implement the aggregate function.

Required functions:

  • create_state() -> state: Create a new state.
  • accumulate(state, *args) -> state: Accumulate a new value into the state, returning the updated state.

Optional functions:

  • finish(state) -> value: Get the result of the aggregate function. If not defined, the state is returned as the result.
  • retract(state, *args) -> state: Retract a value from the state, returning the updated state. If not defined, the state can not be updated incrementally in materialized views and performance may be affected.

See examples below for more details.

Examples

Python

The following command creates an aggregate function named weighted_avg to calculate the weighted average.

Python UDAF
create aggregate weighted_avg(value int, weight int) returns float language python as $$
def create_state():
return (0, 0)

def accumulate(state, value, weight):
if value is None or weight is None:
return state
(s, w) = state
s += value * weight
w += weight
return (s, w)

def retract(state, value, weight):
if value is None or weight is None:
return state
(s, w) = state
s -= value * weight
w -= weight
return (s, w)

def finish(state):
(sum, weight) = state
if weight == 0:
return None
else:
return sum / weight
$$;

For more details, see Use UDFs in Python.

JavaScript

The following command creates an aggregate function named weighted_avg to calculate the weighted average.

Javascript UDAF
create aggregate weighted_avg(value int, weight int) returns float language javascript as $$
export function create_state() {
return { sum: 0, weight: 0 };
}
export function accumulate(state, value, weight) {
if (value == null || weight == null) {
return state;
}
state.sum += value * weight;
state.weight += weight;
return state;
}
export function retract(state, value, weight) {
if (value == null || weight == null) {
return state;
}
state.sum -= value * weight;
state.weight -= weight;
return state;
}
export function finish(state) {
if (state.weight == 0) {
return null;
}
return state.sum / state.weight;
}
$$;

For more details, see Use UDFs in JavaScript.

Using UDAFs

After creating aggregate functions, you can use them in SQL queries like any built-in aggregate functions.

Use UDAF
-- call UDAF in a batch query
select weighted_avg(value, weight) from (values (1, 1), (null, 2), (3, 3)) as t(value, weight);
-----RESULT
2.5

-- call UDAF in a materialized view
create table t(value int, weight int);
create materialized view mv as select weighted_avg(value, weight) from t;

insert into t values (1, 1), (null, 2), (3, 3);
flush;

select * from mv;
-----RESULT
2.5

See also

DROP AGGREGATE — Drop a user-defined aggregate function.

CREATE FUNCTION — Create a user-defined scalar or table function.

Help us make this doc better!