RisingWave provides a Python SDK risingwave-py
to help users develop event-driven applications.
This SDK provides a simple way to perform ad-hoc queries, subscribe to changes, and define event handlers for tables and materialized views, making it easier to integrate real-time data into applications.
Use risingwave-py
to connect to RisingWave
risingwave-py is a RisingWave Python SDK that provides the following capabilities:
- Interact with RisingWave via Pandas DataFrame.
- Subscribe and process changes from RisingWave tables or materialized views.
- Run SQL commands supported in RisingWave.
Run RisingWave
To learn about how to run RisingWave, see Run RisingWave.
Connect to RisingWave
To connect to RisingWave via risingwave-py
:
from risingwave import RisingWave, RisingWaveConnOptions
# Connect to RisingWave instance on localhost with named parameters
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
# Connect to RisingWave instance on localhost with connection string
rw = RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))
# You can create a new SQL connection and execute operations under the with statement.
# This is the recommended way for python sdk usage.
with rw.getconn() as conn:
conn.insert(...)
conn.fetch(...)
conn.execute(...)
conn.mv(...)
conn.on_change(...)
# You can also use the existing connection created by RisingWave object to execute operations.
# This will be used in the later sections for simplicity.
rw.insert(...)
rw.fetch(...)
rw.execute(...)
rw.mv(...)
rw.on_change(...)
Ingestion into RisingWave
Load a Pandas DataFrame into RisingWave:
from datetime import datetime
import pandas as pd
df = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [123.4, 456.7],
"ts": [datetime.strptime("2023-10-05 14:30:00", "%Y-%m-%d %H:%M:%S"),
datetime.strptime("2023-10-05 14:31:20", "%Y-%m-%d %H:%M:%S")],
}
)
# A test table will be created if not exist in risingwave with the correct schema
rw.insert(table_name="test", data=df)
# You can provide an optional force_flush parameter and set it to True
# if you would the inserted data to be visible in fetch query immediately.
# Otherwise, data will be inserted in batches asynchronously for better performance.
# rw.insert(table_name="test", data=df, force_flush = True)
Load data into RisingWave from external systems:
# Create a table and load data from upstream kafka
rw.execute("""
CREATE TABLE IF NOT EXISTS source_abc
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_topic'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
)""")
For supported sources and the SQL syntax, see this topic.
Query from RisingWave
from risingwave import OutputFormat
result: pd.DataFrame = rw.fetch("""
SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
FROM tumble(test, ts, interval '10 seconds')
GROUP BY window_start, window_end, product""",
format=OutputFormat.DATAFRAME)
print(result)
# Output:
# window_start window_end product avg_price
# 0 2023-10-05 14:31:20 2023-10-05 14:31:30 bar 457.0
# 1 2023-10-05 14:30:00 2023-10-05 14:30:10 foo 123.0
# You can also use OutputFormat.RAW to get back list of tuples as the query results
# rw.fetch("...", format=OutputFormat.RAW)
# [(datetime.datetime(2023, 10, 5, 14, 31, 20), datetime.datetime(2023, 10, 5, 14, 31, 30), 'bar', 457.0),
# (datetime.datetime(2023, 10, 5, 14, 30), datetime.datetime(2023, 10, 5, 14, 30, 10), 'foo', 123.0)]
Event-driven processing with RisingWave
Event-driven applications depend on real-time data processing to react to events as they occur. With risingwave-py
, you can define materialized views using SQL and run them in RisingWave. Behind the scenes, events are processed continuously, and the results are incrementally maintained.
In the following example, test_mv
is created to incrementally maintain the result of the defined SQL as events are ingested in to the test
table.
mv = rw.mv(name="test_mv",
stmt="""SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
FROM tumble(test, ts, interval '10 seconds')
GROUP BY window_start, window_end, product""")
In addition to using SQL to do ad-hoc query on tables and materialized views. With risingwave-py
, You can also subscribe changes from table / materialized view and define handler of the change events from table / materialized view for you applications.
# Write your own handler
# the event will contains all fields of the subscribed MV/Table plus two additional columns:
# - op: varchar. The change operations. Valid values: [Insert, UpdateInsert, Delete, UpdateDelete].
# The reason why we have UpdateInsert and UpdateDelete is because RisingWave treats an UPDATE
# as a delete of the old value followed by an insert of the new value.
# - rw_timestamp: bigint. The Unix timestamp in milliseconds when the data was written.
def simple_event_handler(event: pd.DataFrame):
for _, row in event.iterrows():
# Trigger an action (e.g. place an order via REST API) when the avg_price exceeds 300
if (row["op"] == "UpdateInsert" or row["op"] == "Insert") and row["avg_price"] >= 300:
print(
f"{row['window_start']} - {row['window_end']}: {row['product']} avg price {row['avg_price']} exceeds 300")
# ...
import threading
# Subscribe changes of a materialized view and feed it to your own handler.
# Run on_change in a separate thread without blocking the main thread.
threading.Thread(
target=lambda: mv.on_change(
# Pass your handler here
handler = simple_event_handler,
# Support DATAFRAME and RAW tuples here
output_format=OutputFormat.DATAFRAME,
# If set to True, progress of the subscription will be saved and can be recovered on python application crashed
persist_progress=True,
# Maximal number of rows returned each time
max_batch_size = 10)
).start()
# Subscribe changes of a table and print them to console.
# Run on_change in a separate thread without blocking the main thread.
threading.Thread(
target=lambda: rw.on_change(
subscribe_from="test",
handler = lambda data: print(data),
output_format=OutputFormat.RAW,
persist_progress=False,
max_batch_size = 5)
).start()
# Future inserted data into the base table will be reflected in the subscriptions
# df = pd.DataFrame(
# {
# "product": ["foo", "bar"],
# "price": [1000.2, 5000.4],
# "ts": [datetime.strptime("2023-10-05 17:30:00", "%Y-%m-%d %H:%M:%S"),
# datetime.strptime("2023-10-05 17:31:20", "%Y-%m-%d %H:%M:%S")],
# }
# )
# rw.insert(table_name="test", data=df)
For more details, please refer to the risingwave-py
GitHub repo.