This article provides a step-by-step guide for defining and running external Python UDFs, and calling them from RisingWave.
udf
(for scalar functions) and udtf
(for set-returning/table functions) decorators provided by the arrow-udf module.
As an example, let’s define some simple UDFs in a Python file named udf.py
:
See code explanation
gcd
, decorated with @udf
, takes two integer inputs and returns the greatest common divisor of the two integers.The scalar function blocking
, decorated with @udf
. The io_threads
parameter specifies the number of threads that the Python UDF will use during execution to enhance processing performance of IO-intensive functions. Please note that multithreading can not speed up compute-intensive functions due to the GIL.The scalar function key_value
, decorated with @udf
, takes a single string input and returns a structured output.The table function series
, decorated with @udtf
, takes an integer input and yields a sequence of integers from 0 to n-1
.The scalar function text_embedding
, decorated with @udf
, calls the OpenAI API to generate text embeddings for input texts. The batch=True
parameter indicates that the function accepts batch input and returns batch output. Each embedding vector in the returned list should correspond to the input text at the same index.Finally, the script starts a UDF server using UdfServer
and listens for incoming requests on address 0.0.0.0:8815
. All defined functions are registered to the server using server.add_function
before starting the server using the serve()
method. The if __name__ == '__main__':
conditional is used to ensure that the server is only started if the script is run directly, rather than being imported as a module.CREATE FUNCTION
statement must match the signature defined in the Python function decorator. The field names in the STRUCT
type must exactly match the ones defined in the Python decorator.
If you are running RisingWave using Docker, you may need to replace the host localhost
with host.docker.internal
in the USING LINK
clause.
top
to monitor the CPU usage of the UDF server. If the CPU usage is close to 100%, it indicates that the CPU resources of the UDF server are insufficient, and scaling is necessary.SQL Type | Python Type | Notes |
---|---|---|
BOOLEAN | bool | |
SMALLINT | int | |
INT | int | |
BIGINT | int | |
REAL | float | |
DOUBLE PRECISION | float | |
DECIMAL | decimal.Decimal | |
DATE | datetime.date | |
TIME | datetime.time | |
TIMESTAMP | datetime.datetime | |
INTERVAL | pyarrow.MonthDayNano | Fields can be obtained by months(), days() and nanoseconds() from MonthDayNano |
VARCHAR | str | |
BYTEA | bytes | |
JSONB | Any | Parsed / Serialized by json.loads / json.dumps |
T[] | List[T] | |
STRUCT<> | Dict[str, Any] | |
…others | Not supported yet. |
arrow_udf
package instead of risingwave.udf
.
FLOAT4
and FLOAT8
are removed and replaced by REAL
and DOUBLE PRECISION
.
STRUCT
type now requires field names. The field names must exactly match the ones defined in CREATE FUNCTION
. The function that returns a struct type now returns a dictionary instead of a tuple. The field names of the dictionary must match the definition in the signature.