Syntax

CREATE AGGREGATE function_name ( argument_type [, ...] )
    RETURNS return_type
    LANGUAGE language_name
    AS $$ function_body $$;

Parameters

Parameter or clauseDescription
function_nameThe name of the aggregate function that you want to declare in RisingWave.
argument_typeThe data type of the input parameter(s) that the function expects to receive.
RETURNS return_typeThe data type of the return value from the aggregate function.
LANGUAGEThe programming language used to implement the UDAF. Currently, Python and JavaScript are supported.
AS function_bodyThe source code of the UDAF.

In the function_body, the code should define several functions to implement the aggregate function.

Required functions:

  • create_state() -> state: Create a new state.
  • accumulate(state, *args) -> state: Accumulate a new value into the state, returning the updated state.

Optional functions:

  • finish(state) -> value: Get the result of the aggregate function. If not defined, the state is returned as the result.
  • retract(state, *args) -> state: Retract a value from the state, returning the updated state. If not defined, the state can not be updated incrementally in materialized views and performance may be affected.

See examples below for more details.

Examples

Python

The following command creates an aggregate function named weighted_avg to calculate the weighted average.

Python UDAF
create aggregate weighted_avg(value int, weight int) returns float language python as $$
def create_state():
    return (0, 0)

def accumulate(state, value, weight):
    if value is None or weight is None:
        return state
    (s, w) = state
    s += value * weight
    w += weight
    return (s, w)

def retract(state, value, weight):
    if value is None or weight is None:
        return state
    (s, w) = state
    s -= value * weight
    w -= weight
    return (s, w)

def finish(state):
    (sum, weight) = state
    if weight == 0:
        return None
    else:
        return sum / weight
$$;

For more details, see Use UDFs in Python.

JavaScript

The following command creates an aggregate function named weighted_avg to calculate the weighted average.

Javascript UDAF
create aggregate weighted_avg(value int, weight int) returns float language javascript as $$
    export function create_state() {
        return { sum: 0, weight: 0 };
    }
    export function accumulate(state, value, weight) {
        if (value == null || weight == null) {
            return state;
        }
        state.sum += value * weight;
        state.weight += weight;
        return state;
    }
    export function retract(state, value, weight) {
        if (value == null || weight == null) {
            return state;
        }
        state.sum -= value * weight;
        state.weight -= weight;
        return state;
    }
    export function finish(state) {
        if (state.weight == 0) {
            return null;
        }
        return state.sum / state.weight;
    }
$$;

For more details, see Use UDFs in JavaScript.

Using UDAFs

After creating aggregate functions, you can use them in SQL queries like any built-in aggregate functions.

Use UDAF
-- call UDAF in a batch query
select weighted_avg(value, weight) from (values (1, 1), (null, 2), (3, 3)) as t(value, weight);
-----RESULT
2.5

-- call UDAF in a materialized view
create table t(value int, weight int);
create materialized view mv as select weighted_avg(value, weight) from t;

insert into t values (1, 1), (null, 2), (3, 3);
flush;

select * from mv;
-----RESULT
2.5

See also