Overview

This demo shows you how to build an AI agent that connects RisingWave with Anthropic’s LLM. The agent processes natural language queries and returns clean, readable results to users.

The agent includes several key components:

  • Table extraction from SQL-like queries
  • Dynamic table schema fetching
  • MCP tool invocation
  • Result parsing and formatting
  • Chat loop management between RisingWave and Anthropic

In this tutorial, you’ll learn how to:

  • Implement a custom Anthropic agent
  • Integrate it with RisingWave MCP
  • Perform simple stream processing with RisingWave

For a complete project reference, clone the awesome-stream-processing repository.

Prerequisites

Before you begin, you need:

Step 1: Set up your environment

Create a .env file with your RisingWave connection configuration:

RISINGWAVE_HOST=0.0.0.0
RISINGWAVE_USER=root
RISINGWAVE_PASSWORD=root
RISINGWAVE_PORT=4566
RISINGWAVE_DATABASE=dev
RISINGWAVE_SSLMODE=disable
RISINGWAVE_TIMEOUT=30

Step 2: Create the agent

Import the required dependencies

Import these modules for your agent:

"""Client for interacting with RisingWave via MCP and Anthropic."""

import os
import sys
import asyncio
import json
import re
from fastmcp import Client
from anthropic import Anthropic
from dotenv import load_dotenv

Create helper functions

Add this helper function to extract table names from queries:

def extract_table_names(query):
    """Extract table names from a SQL query string."""
    cleaned = re.sub(r"\b(the|a|an)\b", "", query, flags=re.IGNORECASE)
    return re.findall(r"(?:from|in|of|table|view|into)\s+([a-zA-Z0-9_]+)", cleaned, re.IGNORECASE)

Build the main agent class

The RisingWaveMCPAgent class manages connections and caches. Adjust the environment variables if you’re connecting to a remote RisingWave instance.

class RisingWaveMCPAgent:
    """Agent for interacting with RisingWave via MCP and Anthropic."""

    def __init__(self, server_script_path: str):
        """Initialize the agent and set up environment variables."""
        self.client = Client(server_script_path)
        env = {
            "RISINGWAVE_HOST": os.getenv("RISINGWAVE_HOST", "0.0.0.0"),
            "RISINGWAVE_USER": os.getenv("RISINGWAVE_USER", "root"),
            "RISINGWAVE_PASSWORD": os.getenv("RISINGWAVE_PASSWORD", "root"),
            "RISINGWAVE_PORT": os.getenv("RISINGWAVE_PORT", "4566"),
            "RISINGWAVE_DATABASE": os.getenv("RISINGWAVE_DATABASE", "dev"),
            "RISINGWAVE_SSLMODE": os.getenv("RISINGWAVE_SSLMODE", "disable")
        }
        self.client.transport.env = env
        self.anthropic = Anthropic()
        self.conversation = []
        self._tools_cache = None
        self.table_cache = set()
        self.mv_cache = set()
        self.table_descriptions = {}
        self.mv_descriptions = {}
        self._init_done = False

    async def initialize_caches(self):
        """Initialize and cache table and materialized view names and their descriptions."""
        if self._init_done:
            return
        # Get table names
        try:
            tables_result = await self.client.call_tool("show_tables", {})
            tables = json.loads(tables_result) if isinstance(tables_result, str) else tables_result
            if isinstance(tables, list):
                for t in tables:
                    name = t[0] if isinstance(t, list) else t.get("table_name") or t.get("name")
                    if name:
                        self.table_cache.add(name)
                        # Cache description
                        try:
                            desc = await self.client.call_tool(
                                "describe_table", {"table_name": name}
                            )
                            self.table_descriptions[name] = desc
                        except Exception:
                            pass
        except Exception:
            pass
        # Get materialized view names
        try:
            mvs_result = await self.client.call_tool("list_materialized_views", {})
            mvs = json.loads(mvs_result) if isinstance(mvs_result, str) else mvs_result
            if isinstance(mvs, list):
                for mv in mvs:
                    name = mv[0] if isinstance(mv, list) else mv.get("name") or mv.get("mv_name")
                    if name:
                        self.mv_cache.add(name)
                        # Cache description
                        try:
                            desc = await self.client.call_tool(
                                "describe_materialized_view", {"mv_name": name}
                            )
                            self.mv_descriptions[name] = desc
                        except Exception:
                            pass
        except Exception:
            pass
        self._init_done = True

    async def list_tools(self):
        """List available tools from the client."""
        if self._tools_cache is None:
            tools = await self.client.list_tools()
            self._tools_cache = [{
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.inputSchema
            } for tool in tools]
        return self._tools_cache

    async def __aenter__(self):
        """Async context manager entry."""
        await self.client.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        """Async context manager exit."""
        await self.client.__aexit__(exc_type, exc, tb)

### Add core functionality

Add these functions to handle tool usage and provide table context:

```python
    async def call_tool(self, tool_name, args):
        """Call a tool by name with arguments."""
        return await self.client.call_tool(tool_name, args)

    async def fetch_schema_context(self, table_names):
        """Fetch schema context for a list of table names."""
        async def fetch(table):
            try:
                schema = await self.call_tool("describe_table", {"table_name": table})
                return f"Schema for table '{table}':\n{schema}"
            except Exception as exc:
                return f"Could not fetch schema for table '{table}': {exc}"
        return await asyncio.gather(*(fetch(table) for table in set(table_names)))

    async def handle_tool_use(self, content, final_text):
        """Handle tool use content from the LLM."""
        tool_name = content.name
        tool_args = content.input
        result = await self.call_tool(tool_name, tool_args)
        final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
        final_text.append(result)
        self.conversation.append({"role": "user", "content": result})

        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            messages=self.conversation,
        )
        self.handle_llm_response(response, final_text)

    def handle_llm_response(self, response, final_text):
        """Handle the LLM response and update conversation."""
        if response.content and response.content[0].type == 'text':
            final_text.append(response.content[0].text)
            self.conversation.append({"role": "assistant", "content": response.content[0].text})

Add these final methods to handle query processing and manage the chat loop:

    async def process_query(self, query: str) -> str:
        """Process a user query and return the response."""
        await self.initialize_caches()
        self.conversation.append({"role": "user", "content": query})

        # Use cached table/mv names for extraction
        def extract_known_names(q):
            """Extract known table or materialized view names from the query."""
            words = set(re.findall(r"\b([a-zA-Z_][a-zA-Z0-9_]*)\b", q))
            found = [w for w in words if w in self.table_cache or w in self.mv_cache]
            return found
        table_names = extract_known_names(query)
        schema_contexts = []
        for t in table_names:
            if t in self.table_descriptions:
                schema_contexts.append(
                    f"Schema for table '{t}':\n{self.table_descriptions[t]}"
                )
            elif t in self.mv_descriptions:
                schema_contexts.append(
                    f"Schema for materialized view '{t}':\n{self.mv_descriptions[t]}"
                )

        messages = self.conversation.copy()
        for schema in schema_contexts:
            messages.append({"role": "user", "content": schema})

        tools = await self.list_tools()

        final_text = []
        while True:
            response = self.anthropic.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=4096,
                messages=messages,
                tools=tools
            )

            tool_used = False
            for content in response.content:
                if content.type == 'text':
                    final_text.append(content.text)
                    self.conversation.append({"role": "assistant", "content": content.text})
                elif content.type == 'tool_use':
                    tool_used = True
                    await self.handle_tool_use(content, final_text)
                    messages = self.conversation.copy()
            if not tool_used:
                break

        return "\n".join(final_text)

    async def chat_loop(self):
        """Main chat loop for user interaction."""
        print("Rising Wave MCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        while True:
            try:
                query = input("\nQuery: ").strip()
                if query.lower() == 'quit':
                    break
                response = await self.process_query(query)
                print("\n" + response)
            except Exception as exc:
                print(f"\nError: {str(exc)}")

Add the main function

Add the entry point to start the program:

async def main():
    """Entry point for the client."""
    load_dotenv()
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    server_script_path = sys.argv[1]
    async with RisingWaveMCPAgent(server_script_path) as client:
        await client.chat_loop()


if __name__ == "__main__":
    asyncio.run(main())

Step 3: Create test data

Create test data using the load generator tutorial or run this script:

CREATE TABLE users (
  id int,
  risk int,
  cost int,
  time timestamp
)
WITH (
  connector = 'datagen',

  fields.id.length = '1',
  fields.id.kind = 'sequence',
  fields.id.start = '1000',

  fields.risk.length = '1',
  fields.risk.kind = 'random',
  fields.risk.min = '0',
  fields.risk.max = '10',
  fields.risk.seed = '5',

  fields.cost.length = '1',
  fields.cost.kind = 'random',
  fields.cost.min = '1',
  fields.cost.max = '10000',
  fields.cost.seed = '8',

  fields.time.kind = 'random',
  fields.time.max_past = '5h',
  fields.time.max_past_mode = 'relative',

  datagen.rows.per.second = '10'
)
FORMAT PLAIN ENCODE JSON;

Step 4: Run and test the agent

Start the agent

Run this command to start the agent:

python client.py risingwave-mcp/src/main.py

Test the agent

Try these example commands:

Get database information:

  • Give me the database version
I'll help you get the RisingWave database version information using the `get_database_version` function.
[Calling tool get_database_version with args {}]
The database is running PostgreSQL 13.14.0 with RisingWave version 2.2.0 (Homebrew build).

Explore table schemas:

  • Show me the users table structure
Columns:
1. id (integer)
2. risk (integer)
3. cost (integer)
4. time (timestamp without time zone)
5. _row_id (serial) - hidden, primary key
6. _rw_timestamp (timestamp with time zone) - hidden

Additional Information:
- Primary Key: _row_id
- Distribution Key: _row_id
- Table Name: users

Create and query materialized views:

  • Track the highest risk scores
I'll help you create a materialized view to track the highest risk scores from the users table. I'll create a view that orders by risk score in descending order.
[Calling tool create_materialized_view with args {'name': 'high_risk_users', 'sql_statement': 'SELECT id, risk, cost, time \nFROM users \nWHERE risk >= 7 \nORDER BY risk DESC'}]
Great! The materialized view 'high_risk_users' has been created. Let me show you the results from this view to see the users with the highest risk scores.
  • Sort that mv by highest spending and display
I'll query the materialized view 'high_risk_users' and sort it by cost in descending order (highest spending) using the run_select_query function.
[Calling tool run_select_query with args {'query': 'SELECT * FROM high_risk_users ORDER BY cost DESC;'}]
Here's an analysis of the high-risk users sorted by spending:

Top 5 highest spending high-risk users:
1. ID: 1398 - Risk: 8, Cost: $9,999
2. ID: 1021 - Risk: 10, Cost: $9,967
3. ID: 1277 - Risk: 8, Cost: $9,949
4. ID: 1642 - Risk: 8, Cost: $9,913
5. ID: 2048 - Risk: 10, Cost: $9,855

Key observations:
- There are 451 users classified as high-risk
- Spending ranges from $11 (lowest) to $9,999 (highest)
- Some users have maximum risk score (10) but varying spending levels
- The view includes users with risk scores 7 and above

Experiment with your own prompts and data! You can also use any of the demos to test the agent.

Clean up

When you finish testing:

  1. Stop the agent: Type quit in the agent interface.
  2. Clean up test data (optional): Ask the agent to “Drop the users table”.
  3. Stop RisingWave: Stop the RisingWave service if you started it for this demo.