RisingWave uses the arrow-udf as its remote UDF framework. The framework provides a Python SDK for defining and running UDFs outside of the RisingWave process.
Run the following command to install arrow-udf:
pip install arrow-udf
The minimum version of RisingWave that supports arrow-udf Python UDFs is 1.10. If you are using an older version of RisingWave, please refer to the historical version of the documentation. If you have used an older version of the RisingWave UDF SDK (risingwave 0.1), we strongly encourage you to upgrade to the latest version. You can refer to the migration guide for instructions.
To define UDFs in Python, you need to create a Python file and define your functions using the udf (for scalar functions) and udtf (for set-returning/table functions) decorators provided by the arrow-udf module.
As an example, let’s define some simple UDFs in a Python file named udf.py:
udf.py
from arrow_udf import udf, udtf, UdfServer# Define a scalar function that returns a single value@udf(input_types=["INT", "INT"], result_type="INT")def gcd(x: int, y: int) -> int: while y != 0: (x, y) = (y, x % y) return x# Define a scalar function to perform some blocking operation, setting the `io_threads` parameter to run multiple function calls concurrently in a thread pool@udf(input_types=["INT"], result_type="INT", io_threads=32)def blocking(x): time.sleep(0.01) return x# Define a scalar function that returns multiple values within a struct@udf(input_types=['VARCHAR'], result_type='STRUCT<key: VARCHAR, value: VARCHAR>')def key_value(pair: str): key, value = pair.split('=') return {'key': key, 'value': value}# Define a table function over a Python generator function@udtf(input_types='INT', result_types='INT')def series(n): for i in range(n): yield i# Define a function that accepts batch input and returns a batch output@udf(input_types=["VARCHAR"], result_type="REAL[]", batch=True)def text_embedding(texts: List[str]) -> List[List[float]]: from openai import OpenAI client = OpenAI("<your-api-key>") embeddings = [ e.embedding for e in client.embeddings.create( model="text-embedding-ada-002", input=texts, encoding_format="float", ).data ] return embeddingsif __name__ == '__main__': # Create a UDF server and register the functions server = UdfServer(location="0.0.0.0:8815") # You can use any available port in your system. Here we use port 8815. server.add_function(gcd) server.add_function(blocking) server.add_function(key_value) server.add_function(series) server.add_function(text_embedding) # Start the UDF server server.serve()
The scalar function gcd, decorated with @udf, takes two integer inputs and returns the greatest common divisor of the two integers.
The scalar function blocking, decorated with @udf. The io_threads parameter specifies the number of threads that the Python UDF will use during execution to enhance processing performance of IO-intensive functions. Please note that multithreading can not speed up compute-intensive functions due to the GIL.
The scalar function key_value, decorated with @udf, takes a single string input and returns a structured output.
The table function series, decorated with @udtf, takes an integer input and yields a sequence of integers from 0 to n-1.
The scalar function text_embedding, decorated with @udf, calls the OpenAI API to generate text embeddings for input texts. The batch=True parameter indicates that the function accepts batch input and returns batch output. Each embedding vector in the returned list should correspond to the input text at the same index.
Finally, the script starts a UDF server using UdfServer and listens for incoming requests on address 0.0.0.0:8815. All defined functions are registered to the server using server.add_function before starting the server using the serve() method. The if __name__ == '__main__': conditional is used to ensure that the server is only started if the script is run directly, rather than being imported as a module.
For more examples of UDFs, such as functions handling complex data types like JSONB, see this test file in RisingWave source code.
In RisingWave, use the CREATE FUNCTION command to declare the functions you defined.
Here are the SQL statements for declaring the functions defined in step 2.
CREATE FUNCTION gcd(int, int) RETURNS intAS gcd USING LINK 'http://localhost:8815';CREATE FUNCTION blocking(int) RETURNS intAS blocking USING LINK 'http://localhost:8815';CREATE FUNCTION key_value(varchar) RETURNS struct<key varchar, value varchar> -- the field names must exactly match the ones in Python decoratorAS key_value USING LINK 'http://localhost:8815';CREATE FUNCTION series(int) RETURNS TABLE (x int)AS series USING LINK 'http://localhost:8815';CREATE FUNCTION text_embedding(varchar) RETURNS real[]AS text_embedding USING LINK 'http://localhost:8815';
The function signature in the CREATE FUNCTION statement must match the signature defined in the Python function decorator. The field names in the STRUCT type must exactly match the ones defined in the Python decorator.
If you are running RisingWave using Docker, you may need to replace the host localhost with host.docker.internal in the USING LINK clause.
Due to the limitations of the Python interpreter’s Global Interpreter Lock (GIL), the UDF server can only utilize a single CPU core when processing requests. If you find that the throughput of the UDF server is insufficient, consider scaling out the UDF server.
How to determine if the UDF server needs scaling?
You can use tools like top to monitor the CPU usage of the UDF server. If the CPU usage is close to 100%, it indicates that the CPU resources of the UDF server are insufficient, and scaling is necessary.
To scale the UDF server, you can launch multiple UDF servers on different ports and use a load balancer to distribute requests among these servers.
The specific code is as follows:
udf.py
from multiprocessing import Pooldef start_server(port: int): """Start a UDF server listening on the specified port.""" server = UdfServer(location=f"localhost:{port}") # add functions ... server.serve()if __name__ == "__main__": """Start multiple servers listening on different ports.""" n = 4 with Pool(n) as p: p.map(start_server, range(8816, 8816 + n))
Then, you can start a load balancer, such as Nginx. It listens on port 8815 and forwards requests to UDF servers on ports 8816-8819.
The STRUCT type now requires field names. The field names must exactly match the ones defined in CREATE FUNCTION. The function that returns a struct type now returns a dictionary instead of a tuple. The field names of the dictionary must match the definition in the signature.