Use UDFs in Python
This article provides a step-by-step guide for installing the RisingWave UDF API, defining functions in a Python file, starting the UDF server, and declaring and using UDFs in RisingWave.
Prerequisites
- Ensure that you have Python (3.8 or later) installed on your computer.
- Ensure that you have started and connected to RisingWave.
1. Install the RisingWave UDF API for Python
Run the following command to download and install the RisingWave UDF API package and its dependencies.
The current Python UDF SDK is supported since version 1.10 and is not supported in older versions. If you are using an older version of RisingWave, please refer to the historical version of the documentation. If you have used an older version of the RisingWave UDF SDK (risingwave 0.1), we strongly encourage you to update to the latest version. You can refer to the migration guide for upgrading. Older versions are still supported but will not receive new features or bug fixes.
2. Define your functions in a Python file
To better demonstrate this step, we have prepared a sample script for you to try out. Please create a Python file with the name udf.py
and insert the script below.
New sample functions are frequently added to udf.py
, such as JSONB functions. See the source file.
Some of the sample functions are still being tested and may not be fully functional or optimized.
3. Start the UDF server
- In a terminal window, navigate to the directory where
udf.py
is saved. - Run this command to execute
udf.py
.
The UDF server will start running, allowing you to call the defined UDFs from RisingWave.
4. Declare your functions in RisingWave
In RisingWave, use the CREATE FUNCTION command to declare the functions you defined.
Here are the SQL statements for declaring the four UDFs defined in step 2.
5. Use your functions in RisingWave
Once the UDFs are created in RisingWave, you can use them in SQL queries just like any built-in functions.
Example
6. Scale the UDF Server
Due to the limitations of the Python interpreter’s Global Interpreter Lock (GIL), the UDF server can only utilize a single CPU core when processing requests. If you find that the throughput of the UDF server is insufficient, consider scaling out the UDF server.
How to determine if the UDF server needs scaling?
You can use tools like top
to monitor the CPU usage of the UDF server. If the CPU usage is close to 100%, it indicates that the CPU resources of the UDF server are insufficient, and scaling is necessary.
To scale the UDF server, you can launch multiple UDF servers on different ports and use a load balancer to distribute requests among these servers.
The specific code is as follows:
Then, you can start a load balancer, such as Nginx. It listens on port 8815 and forwards requests to UDF servers on ports 8816-8819.
Data Types
The RisingWave Python UDF SDK supports the following data types:
SQL Type | Python Type | Notes |
---|---|---|
BOOLEAN | bool | |
SMALLINT | int | |
INT | int | |
BIGINT | int | |
REAL | float | |
DOUBLE PRECISION | float | |
DECIMAL | decimal.Decimal | |
DATE | datetime.date | |
TIME | datetime.time | |
TIMESTAMP | datetime.datetime | |
INTERVAL | MonthDayNano / (int, int, int) | Fields can be obtained by months(), days() and nanoseconds() from MonthDayNano |
VARCHAR | str | |
BYTEA | bytes | |
JSONB | any | Parsed / Serialized by json.loads / json.dumps |
T[] | list[T] | |
STRUCT<> | dict | |
…others | Not supported yet. |
Migration Guide from risingwave 0.1 to arrow-udf 0.2
If you have used the Python UDF SDK in RisingWave 1.9 or earlier versions, please refer to the following steps for upgrading.
Import the arrow_udf
package instead of risingwave.udf
.
The type aliases FLOAT4
and FLOAT8
are removed and replaced by REAL
and DOUBLE PRECISION
.
The STRUCT
type now requires field names. The field names must exactly match the ones defined in CREATE FUNCTION
. The function that returns a struct type now returns a dictionary instead of a tuple. The field names of the dictionary must match the definition in the signature.