External Python UDFs
This article provides a step-by-step guide for defining and running external Python UDFs, and calling them from RisingWave.
Prerequisites
- Ensure that you have Python (3.8 or later) installed.
- Ensure that you have started and connected to RisingWave.
1. Install the UDF framework
RisingWave uses the arrow-udf as its remote UDF framework. The framework provides a Python SDK for defining and running UDFs outside of the RisingWave process.
Run the following command to install arrow-udf:
The minimum version of RisingWave that supports arrow-udf Python UDFs is 1.10. If you are using an older version of RisingWave, please refer to the historical version of the documentation. If you have used an older version of the RisingWave UDF SDK (risingwave 0.1), we strongly encourage you to upgrade to the latest version. You can refer to the migration guide for instructions.
2. Define functions in a Python file
To define UDFs in Python, you need to create a Python file and define your functions using the udf
(for scalar functions) and udtf
(for set-returning/table functions) decorators provided by the arrow-udf module.
As an example, let’s define some simple UDFs in a Python file named udf.py
:
For more examples of UDFs, such as functions handling complex data types like JSONB, see this test file in RisingWave source code.
3. Start the UDF server
Simply run the Python file to start the UDF server.
The UDF server will start serving requests, allowing you to call the defined UDFs from RisingWave.
4. Declare external functions in RisingWave
In RisingWave, use the CREATE FUNCTION command to declare the functions you defined.
Here are the SQL statements for declaring the functions defined in step 2.
The function signature in the CREATE FUNCTION
statement must match the signature defined in the Python function decorator. The field names in the STRUCT
type must exactly match the ones defined in the Python decorator.
If you are running RisingWave using Docker, you may need to replace the host localhost
with host.docker.internal
in the USING LINK
clause.
5. Use the functions in RisingWave
Once the UDFs are created in RisingWave, you can use them in SQL queries just like any built-in functions. For example:
6. Scale the UDF Server
Due to the limitations of the Python interpreter’s Global Interpreter Lock (GIL), the UDF server can only utilize a single CPU core when processing requests. If you find that the throughput of the UDF server is insufficient, consider scaling out the UDF server.
How to determine if the UDF server needs scaling?
You can use tools like top
to monitor the CPU usage of the UDF server. If the CPU usage is close to 100%, it indicates that the CPU resources of the UDF server are insufficient, and scaling is necessary.
To scale the UDF server, you can launch multiple UDF servers on different ports and use a load balancer to distribute requests among these servers.
The specific code is as follows:
Then, you can start a load balancer, such as Nginx. It listens on port 8815 and forwards requests to UDF servers on ports 8816-8819.
Data Types
The RisingWave Python UDF SDK supports the following data types:
SQL Type | Python Type | Notes |
---|---|---|
BOOLEAN | bool | |
SMALLINT | int | |
INT | int | |
BIGINT | int | |
REAL | float | |
DOUBLE PRECISION | float | |
DECIMAL | decimal.Decimal | |
DATE | datetime.date | |
TIME | datetime.time | |
TIMESTAMP | datetime.datetime | |
INTERVAL | pyarrow.MonthDayNano | Fields can be obtained by months(), days() and nanoseconds() from MonthDayNano |
VARCHAR | str | |
BYTEA | bytes | |
JSONB | Any | Parsed / Serialized by json.loads / json.dumps |
T[] | List[T] | |
STRUCT<> | Dict[str, Any] | |
…others | Not supported yet. |
Migration Guide from risingwave 0.1 to arrow-udf 0.2
If you have used the Python UDF SDK in RisingWave 1.9 or earlier versions, please refer to the following steps for upgrading.
Import the arrow_udf
package instead of risingwave.udf
.
The type aliases FLOAT4
and FLOAT8
are removed and replaced by REAL
and DOUBLE PRECISION
.
The STRUCT
type now requires field names. The field names must exactly match the ones defined in CREATE FUNCTION
. The function that returns a struct type now returns a dictionary instead of a tuple. The field names of the dictionary must match the definition in the signature.
Was this page helpful?