RisingWave provides a Python SDK risingwave-py
(currently in public preview) to help users develop event-driven applications.
This SDK provides a simple way to perform ad-hoc queries, subscribe to changes, and define event handlers for tables and materialized views, making it easier to integrate real-time data into applications.
Use risingwave-py
to connect to RisingWave
risingwave-py is a RisingWave Python SDK that provides the following capabilities:
- Interact with RisingWave via Pandas DataFrame.
- Subscribe and process changes from RisingWave tables or materialized views.
- Run SQL commands supported in RisingWave.
Run RisingWave
To learn about how to run RisingWave, see Run RisingWave.
Connect to RisingWave
To connect to RisingWave via risingwave-py
:
from risingwave import RisingWave, RisingWaveConnOptions
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
rw = RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))
with rw.getconn() as conn:
conn.insert(...)
conn.fetch(...)
conn.execute(...)
conn.mv(...)
conn.on_change(...)
rw.insert(...)
rw.fetch(...)
rw.execute(...)
rw.mv(...)
rw.on_change(...)
Ingestion into RisingWave
Load a Pandas DataFrame into RisingWave:
from datetime import datetime
import pandas as pd
df = pd.DataFrame(
{
"product": ["foo", "bar"],
"price": [123.4, 456.7],
"ts": [datetime.strptime("2023-10-05 14:30:00", "%Y-%m-%d %H:%M:%S"),
datetime.strptime("2023-10-05 14:31:20", "%Y-%m-%d %H:%M:%S")],
}
)
rw.insert(table_name="test", data=df)
Load data into RisingWave from external systems:
rw.execute("""
CREATE TABLE IF NOT EXISTS source_abc
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_topic'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
)""")
For supported sources and the SQL syntax, see this topic.
Query from RisingWave
from risingwave import OutputFormat
result: pd.DataFrame = rw.fetch("""
SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
FROM tumble(test, ts, interval '10 seconds')
GROUP BY window_start, window_end, product""",
format=OutputFormat.DATAFRAME)
print(result)
Event-driven processing with RisingWave
Event-driven applications depend on real-time data processing to react to events as they occur. With risingwave-py
, you can define materialized views using SQL and run them in RisingWave. Behind the scenes, events are processed continuously, and the results are incrementally maintained.
In the following example, test_mv
is created to incrementally maintain the result of the defined SQL as events are ingested in to the test
table.
mv = rw.mv(name="test_mv",
stmt="""SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
FROM tumble(test, ts, interval '10 seconds')
GROUP BY window_start, window_end, product""")
In addition to using SQL to do ad-hoc query on tables and materialized views. With risingwave-py
, You can also subscribe changes from table / materialized view and define handler of the change events from table / materialized view for you applications.
def simple_event_handler(event: pd.DataFrame):
for _, row in event.iterrows():
if (row["op"] == "UpdateInsert" or row["op"] == "Insert") and row["avg_price"] >= 300:
print(
f"{row['window_start']} - {row['window_end']}: {row['product']} avg price {row['avg_price']} exceeds 300")
import threading
threading.Thread(
target=lambda: mv.on_change(
handler = simple_event_handler,
output_format=OutputFormat.DATAFRAME,
persist_progress=True,
max_batch_size = 10)
).start()
threading.Thread(
target=lambda: rw.on_change(
subscribe_from="test",
handler = lambda data: print(data),
output_format=OutputFormat.RAW,
persist_progress=False,
max_batch_size = 5)
).start()
For more details, please refer to the risingwave-py
GitHub repo.