Overview
This demo shows you how to build an AI agent that connects RisingWave with Anthropic’s LLM. The agent processes natural language queries and returns clean, readable results to users.
The agent includes several key components:
- Table extraction from SQL-like queries
- Dynamic table schema fetching
- MCP tool invocation
- Result parsing and formatting
- Chat loop management between RisingWave and Anthropic
In this tutorial, you’ll learn how to:
- Implement a custom Anthropic agent
- Integrate it with RisingWave MCP
- Perform simple stream processing with RisingWave
For a complete project reference, clone the awesome-stream-processing repository.
Prerequisites
Before you begin, you need:
Step 1: Set up your environment
Create a .env
file with your RisingWave connection configuration:
RISINGWAVE_HOST=0.0.0.0
RISINGWAVE_USER=root
RISINGWAVE_PASSWORD=root
RISINGWAVE_PORT=4566
RISINGWAVE_DATABASE=dev
RISINGWAVE_SSLMODE=disable
RISINGWAVE_TIMEOUT=30
Step 2: Create the agent
Import the required dependencies
Import these modules for your agent:
"""Client for interacting with RisingWave via MCP and Anthropic."""
import os
import sys
import asyncio
import json
import re
from fastmcp import Client
from anthropic import Anthropic
from dotenv import load_dotenv
Create helper functions
Add this helper function to extract table names from queries:
def extract_table_names(query):
"""Extract table names from a SQL query string."""
cleaned = re.sub(r"\b(the|a|an)\b", "", query, flags=re.IGNORECASE)
return re.findall(r"(?:from|in|of|table|view|into)\s+([a-zA-Z0-9_]+)", cleaned, re.IGNORECASE)
Build the main agent class
The RisingWaveMCPAgent
class manages connections and caches. Adjust the environment variables if you’re connecting to a remote RisingWave instance.
class RisingWaveMCPAgent:
"""Agent for interacting with RisingWave via MCP and Anthropic."""
def __init__(self, server_script_path: str):
"""Initialize the agent and set up environment variables."""
self.client = Client(server_script_path)
env = {
"RISINGWAVE_HOST": os.getenv("RISINGWAVE_HOST", "0.0.0.0"),
"RISINGWAVE_USER": os.getenv("RISINGWAVE_USER", "root"),
"RISINGWAVE_PASSWORD": os.getenv("RISINGWAVE_PASSWORD", "root"),
"RISINGWAVE_PORT": os.getenv("RISINGWAVE_PORT", "4566"),
"RISINGWAVE_DATABASE": os.getenv("RISINGWAVE_DATABASE", "dev"),
"RISINGWAVE_SSLMODE": os.getenv("RISINGWAVE_SSLMODE", "disable")
}
self.client.transport.env = env
self.anthropic = Anthropic()
self.conversation = []
self._tools_cache = None
self.table_cache = set()
self.mv_cache = set()
self.table_descriptions = {}
self.mv_descriptions = {}
self._init_done = False
async def initialize_caches(self):
"""Initialize and cache table and materialized view names and their descriptions."""
if self._init_done:
return
# Get table names
try:
tables_result = await self.client.call_tool("show_tables", {})
tables = json.loads(tables_result) if isinstance(tables_result, str) else tables_result
if isinstance(tables, list):
for t in tables:
name = t[0] if isinstance(t, list) else t.get("table_name") or t.get("name")
if name:
self.table_cache.add(name)
# Cache description
try:
desc = await self.client.call_tool(
"describe_table", {"table_name": name}
)
self.table_descriptions[name] = desc
except Exception:
pass
except Exception:
pass
# Get materialized view names
try:
mvs_result = await self.client.call_tool("list_materialized_views", {})
mvs = json.loads(mvs_result) if isinstance(mvs_result, str) else mvs_result
if isinstance(mvs, list):
for mv in mvs:
name = mv[0] if isinstance(mv, list) else mv.get("name") or mv.get("mv_name")
if name:
self.mv_cache.add(name)
# Cache description
try:
desc = await self.client.call_tool(
"describe_materialized_view", {"mv_name": name}
)
self.mv_descriptions[name] = desc
except Exception:
pass
except Exception:
pass
self._init_done = True
async def list_tools(self):
"""List available tools from the client."""
if self._tools_cache is None:
tools = await self.client.list_tools()
self._tools_cache = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
} for tool in tools]
return self._tools_cache
async def __aenter__(self):
"""Async context manager entry."""
await self.client.__aenter__()
return self
async def __aexit__(self, exc_type, exc, tb):
"""Async context manager exit."""
await self.client.__aexit__(exc_type, exc, tb)
### Add core functionality
Add these functions to handle tool usage and provide table context:
```python
async def call_tool(self, tool_name, args):
"""Call a tool by name with arguments."""
return await self.client.call_tool(tool_name, args)
async def fetch_schema_context(self, table_names):
"""Fetch schema context for a list of table names."""
async def fetch(table):
try:
schema = await self.call_tool("describe_table", {"table_name": table})
return f"Schema for table '{table}':\n{schema}"
except Exception as exc:
return f"Could not fetch schema for table '{table}': {exc}"
return await asyncio.gather(*(fetch(table) for table in set(table_names)))
async def handle_tool_use(self, content, final_text):
"""Handle tool use content from the LLM."""
tool_name = content.name
tool_args = content.input
result = await self.call_tool(tool_name, tool_args)
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
final_text.append(result)
self.conversation.append({"role": "user", "content": result})
response = self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=self.conversation,
)
self.handle_llm_response(response, final_text)
def handle_llm_response(self, response, final_text):
"""Handle the LLM response and update conversation."""
if response.content and response.content[0].type == 'text':
final_text.append(response.content[0].text)
self.conversation.append({"role": "assistant", "content": response.content[0].text})
Add these final methods to handle query processing and manage the chat loop:
async def process_query(self, query: str) -> str:
"""Process a user query and return the response."""
await self.initialize_caches()
self.conversation.append({"role": "user", "content": query})
# Use cached table/mv names for extraction
def extract_known_names(q):
"""Extract known table or materialized view names from the query."""
words = set(re.findall(r"\b([a-zA-Z_][a-zA-Z0-9_]*)\b", q))
found = [w for w in words if w in self.table_cache or w in self.mv_cache]
return found
table_names = extract_known_names(query)
schema_contexts = []
for t in table_names:
if t in self.table_descriptions:
schema_contexts.append(
f"Schema for table '{t}':\n{self.table_descriptions[t]}"
)
elif t in self.mv_descriptions:
schema_contexts.append(
f"Schema for materialized view '{t}':\n{self.mv_descriptions[t]}"
)
messages = self.conversation.copy()
for schema in schema_contexts:
messages.append({"role": "user", "content": schema})
tools = await self.list_tools()
final_text = []
while True:
response = self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=messages,
tools=tools
)
tool_used = False
for content in response.content:
if content.type == 'text':
final_text.append(content.text)
self.conversation.append({"role": "assistant", "content": content.text})
elif content.type == 'tool_use':
tool_used = True
await self.handle_tool_use(content, final_text)
messages = self.conversation.copy()
if not tool_used:
break
return "\n".join(final_text)
async def chat_loop(self):
"""Main chat loop for user interaction."""
print("Rising Wave MCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as exc:
print(f"\nError: {str(exc)}")
Add the main function
Add the entry point to start the program:
async def main():
"""Entry point for the client."""
load_dotenv()
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
server_script_path = sys.argv[1]
async with RisingWaveMCPAgent(server_script_path) as client:
await client.chat_loop()
if __name__ == "__main__":
asyncio.run(main())
Step 3: Create test data
Create test data using the load generator tutorial or run this script:
CREATE TABLE users (
id int,
risk int,
cost int,
time timestamp
)
WITH (
connector = 'datagen',
fields.id.length = '1',
fields.id.kind = 'sequence',
fields.id.start = '1000',
fields.risk.length = '1',
fields.risk.kind = 'random',
fields.risk.min = '0',
fields.risk.max = '10',
fields.risk.seed = '5',
fields.cost.length = '1',
fields.cost.kind = 'random',
fields.cost.min = '1',
fields.cost.max = '10000',
fields.cost.seed = '8',
fields.time.kind = 'random',
fields.time.max_past = '5h',
fields.time.max_past_mode = 'relative',
datagen.rows.per.second = '10'
)
FORMAT PLAIN ENCODE JSON;
Step 4: Run and test the agent
Start the agent
Run this command to start the agent:
python client.py risingwave-mcp/src/main.py
Test the agent
Try these example commands:
Get database information:
- Give me the database version
I'll help you get the RisingWave database version information using the `get_database_version` function.
[Calling tool get_database_version with args {}]
The database is running PostgreSQL 13.14.0 with RisingWave version 2.2.0 (Homebrew build).
Explore table schemas:
- Show me the users table structure
Columns:
1. id (integer)
2. risk (integer)
3. cost (integer)
4. time (timestamp without time zone)
5. _row_id (serial) - hidden, primary key
6. _rw_timestamp (timestamp with time zone) - hidden
Additional Information:
- Primary Key: _row_id
- Distribution Key: _row_id
- Table Name: users
Create and query materialized views:
- Track the highest risk scores
I'll help you create a materialized view to track the highest risk scores from the users table. I'll create a view that orders by risk score in descending order.
[Calling tool create_materialized_view with args {'name': 'high_risk_users', 'sql_statement': 'SELECT id, risk, cost, time \nFROM users \nWHERE risk >= 7 \nORDER BY risk DESC'}]
Great! The materialized view 'high_risk_users' has been created. Let me show you the results from this view to see the users with the highest risk scores.
- Sort that mv by highest spending and display
I'll query the materialized view 'high_risk_users' and sort it by cost in descending order (highest spending) using the run_select_query function.
[Calling tool run_select_query with args {'query': 'SELECT * FROM high_risk_users ORDER BY cost DESC;'}]
Here's an analysis of the high-risk users sorted by spending:
Top 5 highest spending high-risk users:
1. ID: 1398 - Risk: 8, Cost: $9,999
2. ID: 1021 - Risk: 10, Cost: $9,967
3. ID: 1277 - Risk: 8, Cost: $9,949
4. ID: 1642 - Risk: 8, Cost: $9,913
5. ID: 2048 - Risk: 10, Cost: $9,855
Key observations:
- There are 451 users classified as high-risk
- Spending ranges from $11 (lowest) to $9,999 (highest)
- Some users have maximum risk score (10) but varying spending levels
- The view includes users with risk scores 7 and above
Experiment with your own prompts and data! You can also use any of the demos to test the agent.
Clean up
When you finish testing:
- Stop the agent: Type
quit
in the agent interface.
- Clean up test data (optional): Ask the agent to “Drop the users table”.
- Stop RisingWave: Stop the RisingWave service if you started it for this demo.