SQLMesh provides a structured and maintainable approach to managing end-to-end streaming pipelines. As the control plane for RisingWave, it simplifies the workflow by enabling local definition of transformations, planning and reviewing changes before deployment, and running optional audits to ensure data quality.

This guide explains how to use RisingWave for streaming data and SQLMesh for managing transformations to aggregate website click events. Each event contains a timestamp, event_type, and value.

It aims to calculate a rolling 5-minute count and sum of value for each event_type using a tumbling window. Then, the logic will be modified to calculate an average, showcasing how SQLMesh handles the change.

Set up RisingWave and ingest sample data

Begin by starting RisingWave and manually inserting sample data into a base table.

  1. Start RisingWave via Docker.

    docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest single_node
    

    The message of RisingWave standalone mode is ready means that RisingWave has started successfully.

  2. Connect via psql (or another Postgres-compatible SQL client) and prepare the data.

    # Use localhost when connecting from your machine to the mapped port
    psql -h localhost -p 4566 -d dev -U root
    

    Inside psql, run the following:

    -- Create the source table to hold incoming data
    CREATE TABLE click_events (
        event_id INT PRIMARY KEY,
        event_type VARCHAR,
        event_value INT,
        event_timestamp TIMESTAMPTZ
    );
    
    -- Insert seven rows of sample data
    INSERT INTO click_events (event_id, event_type, event_value, event_timestamp) VALUES
    (1, 'page_view', 10, '2023-10-27 10:00:30+00'),
    (2, 'add_to_cart', 150, '2023-10-27 10:01:15+00'),
    (3, 'page_view', 12, '2023-10-27 10:02:05+00'),
    (4, 'page_view', 11, '2023-10-27 10:05:40+00'),
    (5, 'purchase', 250, '2023-10-27 10:06:20+00'),
    (6, 'page_view', 15, '2023-10-27 10:11:00+00'),
    (7, 'add_to_cart', 99, '2023-10-27 10:12:30+00');
    
    -- Optional: Verify inserts
    SELECT COUNT(*) FROM click_events;
    
    -- Exit psql
    \q
    

Set up SQLMesh project

Set up the SQLMesh environment to manage our RisingWave transformations.

  1. Create a project directory and Python virtual environment.

    mkdir sqlmesh-risingwave-demo
    cd sqlmesh-risingwave-demo
    python3 -m venv venv
    source venv/bin/activate # Adjust activation for your OS/shell
    
  2. Install the SQLMesh RisingWave adapter:

    pip install "sqlmesh[risingwave]"
    
  3. Initialize the SQLMesh Project specifically for RisingWave:

    sqlmesh init risingwave
    

    This creates necessary folders and configuration files.

  4. Configure the connection in config.yaml file (created by init). Ensure the following matches or update your config.yaml.

    gateways:
    risingwave: # Gateway name used by default
        connection:
        type: risingwave
        host: localhost
        user: root
        port: 4566 # Match the Docker port mapping
        database: dev
    
    default_gateway: risingwave
    
    model_defaults:
    dialect: risingwave
    # start: <YYYY-MM-DD> # Optional: Default start date for backfills if needed
    

Define and deploy a streaming query (v1)

Create the first transformation model and deploy it.

  1. Create the SQLMesh model file models/event_summary_tumbling.sql:

    -- models/event_summary_tumbling.sql
    MODEL (
      name reporting.event_summary_tumbling,
      kind VIEW (materialized = true), -- This makes it a RisingWave MV
      owner data_team,
      description 'Summarizes event counts and total value in 5-minute tumbling windows.'
    );
    
    SELECT
        window_start,
        window_end,
        event_type,
        COUNT(*) AS event_count,
        SUM(event_value) AS total_value
    FROM TUMBLE(
        click_events,         -- Read from the base table
        event_timestamp,            -- Time column for windowing
        INTERVAL '5 minutes'        -- Tumbling window size
    )
    GROUP BY
        window_start,
        window_end,
        event_type;
    
  2. Plan and apply the model using SQLMesh:

    sqlmesh plan
    

    SQLMesh will detect the new model (reporting.event_summary_tumbling) and show a plan to create it in the prod environment (which maps to a schema like sqlmesh_prod or prod in RisingWave, often including the model name itself, e.g., sqlmesh__reporting). It will also detect the default models created by init; you can choose to apply changes only for your new model if desired, or apply all.

    # Example Output Snippet:
    # Summary of differences:
    # Models:
    # └── Added:
    #     └── reporting.event_summary_tumbling
    #     └── sqlmesh_example.full_model (if applying init models)
    #     └── ... (other init models)
    #
    # Apply - Apply the plan. [y/n]: y  <-- Type 'y' and press Enter
    

    SQLMesh executes the CREATE MATERIALIZED VIEW statement(s) in RisingWave.

  3. Verify the MV creation and content via psql .

    The exact schema and MV name includes a hash for versioning. You can use SHOW to find it. Since reporting is used as model schema, the materialized view should be in the sqlmesh__reporting schema.

    show materialized views from sqlmesh__reporting;
    -- Result below
                        Name                      
    -----------------------------------------------
    reporting__event_summary_tumbling__3036268106
    (1 row)
    

    You should see the aggregated results similar to below.

    SQLMesh aggregated results

Modify the MV schema with SQLMesh (v2)

Now we can change the aggregation logic and see how SQLMesh manages the update safely.

  1. Modify the model file models/event_summary_tumbling.sql to calculate the average value and distinct count:

    -- models/event_summary_tumbling.sql
    MODEL (
    name reporting.event_summary_tumbling, -- Keep the same name
    kind VIEW (materialized true),
    owner data_team,
    -- Update description to reflect change
    description 'Calculates average event value and distinct event count in 5-minute tumbling windows.'
    );
    
    SELECT
        window_start,
        window_end,
        event_type,
        COUNT(DISTINCT event_id) AS distinct_event_count, -- Changed aggregation
        AVG(event_value) AS average_value          -- Changed aggregation
    FROM TUMBLE(
        click_events,
        event_timestamp,
        INTERVAL '5 minutes'
    )
    GROUP BY
        window_start,
        window_end,
        event_type;
    
  2. Run sqlmesh plan again to see the impact.

    SQLMesh detects the change in the definition of reporting.event_summary_tumbling. Because the logic changed, it plans to:

    • Create a new version of the Materialized View in RisingWave with a different physical name (a new hash suffix).
    • It does not immediately drop the old version, allowing for validation or zero-downtime promotion strategies (though in this simple apply, the old one might eventually be cleaned up depending on settings).

    Modified results

  3. Verify the new MV version via psql:

    -- List MVs again in the schema
    SHOW MATERIALIZED VIEWS FROM sqlmesh__reporting; -- Replace schema if needed
    -- You should now see TWO MVs related to event_summary_tumbling,
    -- one with the old hash and one with a NEW hash.
    
    -- Query the NEWER MV version (find its exact name)
    SELECT * FROM sqlmesh__reporting."reporting__event_summary_tumbling__<new_hash>" ORDER BY window_start, event_type;
    

    The results should show the distinct_event_count and average_value columns, reflecting the v2 logic. The original v1 MV still exists momentarily. This demonstrates SQLMesh’s safe, versioned deployment approach.

Integrate sink and source management

This guide manually created the click_events table in RisingWave for simplicity. However, SQLMesh provides pre- and post-statements within model definitions, allowing you to embed DDL statements that run before or after the main model logic executes.

For example, you could place a CREATE SOURCE IF NOT EXISTS ... statement in pre-statement (that is, an SQL query prior to the SELECT statement), and / or a CREATE SINK IF NOT EXISTS ... in a post-statement. This approach keeps the setup/teardown logic closer to the relevant transformation step within your SQLMesh project.

SQLMesh’s core focus is managing the transformation logic (the SELECT statements) and dependencies across your entire pipeline (often multiple models). While pre- and post-statements offer integration points, managing complex CREATE SOURCE/SINK statements with intricate connector configurations might still be cleaner handled outside the model definitions (e.g., separate setup scripts or specialized infrastructure tools) to maintain clarity.

Additional resources