> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Market data enrichment and transformation

> Transform raw market data in real-time to provide insights into market trends, asset health, and trade opportunities.

## Overview

Understanding the fast-moving market is pivotal to making informed trading decisions. The market is constantly influenced by many external factors that are not reflected in the raw market data, such as geopolitical events, economic indicators, and indisutry-specific news. These factors often create rapid fluctuations that are not immediately apparent in raw market data. To make sense of these shifts, traders need external data to better understand the context behinds these price movements.

Compiling and analyzing this information in real-time is key to understanding the market and making better trading decisions. With real-time data analysis, traders can process and join raw market data and external signals as they happen. By combining these data streams, traders gain a comprehensive, up-to-the-second view of market conditions. This allows them to act quickly and confidently, making quick adjustments to maximize profits and mitigate risks.

In this tutorial, you will learn how to join market data with external data to gain a holistic view of each asset.

## Prerequisites

* Ensure that the [PostgreSQL](https://www.postgresql.org/docs/current/app-psql.html) interactive terminal, `psql`, is installed in your environment. For detailed instructions, see [Download PostgreSQL](https://www.postgresql.org/download/).
* Install and run RisingWave. For detailed instructions on how to quickly get started, see the [Quick start](/get-started/quickstart/) guide.
* Ensure that a Python environment is set up and install the [psycopg2](https://pypi.org/project/psycopg2/) library.

## Step 1: Set up the data source tables

Once RisingWave is installed and deployed, run the two SQL queries below to set up the tables. You will insert data into these tables to simulate live data streams.

1. The table `raw_market_data` tracks raw market data of each asset, such as the price, volume, and bid-ask price.

   ```sql theme={null}
   CREATE TABLE raw_market_data (
       asset_id INT,
       timestamp TIMESTAMPTZ,
       price NUMERIC,
       volume INT,
       bid_price NUMERIC,
       ask_price NUMERIC
   );
   ```
2. The table `enrichment_data` contains external data that adds context to the raw market data. It includes additional metrics such as historical volatility, sector performance, and sentiment scores.

   ```sql theme={null}
   CREATE TABLE enrichment_data (
       asset_id INT,
       sector VARCHAR,
       historical_volatility NUMERIC,
       sector_performance NUMERIC,
       sentiment_score NUMERIC,
       timestamp TIMESTAMPTZ
   );
   ```

## Step 2: Run the data generator

To keep this demo simple, a Python script is used to generate and insert data into the tables created above.

Clone the [awesome-stream-processing](https://github.com/risingwavelabs/awesome-stream-processing) repository.

```bash theme={null}
git clone https://github.com/risingwavelabs/awesome-stream-processing.git
```

Navigate to the [market\_data\_enrichment](https://github.com/risingwavelabs/awesome-stream-processing/tree/main/02-simple-demos/capital_markets/market_data_enrichment) folder.

```bash theme={null}
cd awesome-stream-processing/tree/main/02-simple-demos/capital_markets/market_data_enrichment
```

Run the `data_generator.py` file. This Python script utilizes the `psycopg2` library to establish a connection with RisingWave so you can generate and insert synthetic data into the tables `positions` and `market_data`.

If you are not running RisingWave locally or using default credentials, update the connection parameters accordingly:

```python theme={null}
default_params = {
    "dbname": "dev",
    "user": "root",
    "password": "",
    "host": "localhost",
    "port": "4566"
}
```

## Step 3: Create materialized views

In this demo, you will create three materialized views to better understand the market.

Materialized views contain the results of a view expression and are stored in the RisingWave database. The results of a materialized view are computed incrementally and updated whenever new events arrive and do not require to be refreshed. When you query from a materialized view, it will return the most up-to-date computation results.

### Calculate bid-ask spread

The `avg_price_bid_ask_spread` materialized view calculates the average price and average bid-ask spread for each asset in five-minute time windows by using `TUMBLE()` and grouping by the `assed_id` and the time window.

To learn more about `TUMBLE()`, see [Time windows](/processing/sql/time-windows/).

```sql theme={null}
CREATE MATERIALIZED VIEW avg_price_bid_ask_spread AS
SELECT
    asset_id,
    ROUND(AVG(price), 2) AS average_price,
    ROUND(AVG(ask_price - bid_price), 2) AS bid_ask_spread,
    window_end
FROM
    TUMBLE(raw_market_data, timestamp, '5 minutes')
GROUP BY asset_id, window_end;
```

You can query from `avg_price_bid_ask_spread` to see the results.

```sql theme={null}
SELECT * FROM avg_price_bid_ask_spread LIMIT 5;
```

```
 asset_id | average_price | bid_ask_spread |     window_end      
----------+---------------+----------------+---------------------
        2 |        106.55 |           0.58 | 2024-11-19 16:20:00
        5 |         98.08 |           0.60 | 2024-11-19 16:25:00
        1 |         93.39 |           0.61 | 2024-11-19 16:20:00
        3 |        100.96 |           0.60 | 2024-11-19 16:25:00
        4 |         99.56 |           0.64 | 2024-11-19 16:20:00
```

### Calculate rolling price volatility

The `rolling_volatility` materialized view uses the `stddev_samp` function to calculate the rolling price volatility within five-minute time windows by using `TUMBLE()` and grouping by the `assed_id` and the time window.

```sql theme={null}
CREATE MATERIALIZED VIEW rolling_volatility AS
SELECT
    asset_id,
    ROUND(stddev_samp(price), 2) AS rolling_volatility,
    window_end
FROM
    TUMBLE(raw_market_data, timestamp, '5 minutes')
    GROUP BY asset_id, window_end;
```

You can query from `rolling_volatility` to see the results.

```sql theme={null}
SELECT * FROM rolling_volatility LIMIT 5;
```

```
 asset_id | rolling_volatility |     window_end      
----------+--------------------+---------------------
        1 |              27.98 | 2024-11-19 16:35:00
        4 |              29.55 | 2024-11-19 16:35:00
        5 |              28.78 | 2024-11-19 16:30:00
        2 |              28.76 | 2024-11-19 16:20:00
        5 |              27.60 | 2024-11-19 16:25:00
```

### Obtain a comprehensive view of each asset

The `enriched_market_data` materialized view combines the transformed market data with the enrichment data. `TUMBLE()` is used to group the data from `enrichment_data` into five-minute time windows for each asset. Then it is joined with the volatility and bid-ask spread data.

By combining these data sources, you can obtain a more holistic view of each asset, empowering you to make more informed market decisions.

```sql theme={null}
CREATE MATERIALIZED VIEW enriched_market_data AS
SELECT
    bas.asset_id,
    ed.sector,
    bas.average_price,
    bas.bid_ask_spread,
    rv.rolling_volatility,
    ed.avg_historical_volatility,
    ed.avg_sector_performance,
    ed.avg_sentiment_score,
    rv.window_end
FROM
    avg_price_bid_ask_spread AS bas
JOIN
    rolling_volatility AS rv
ON
    bas.asset_id = rv.asset_id AND
    bas.window_end = rv.window_end
JOIN (
    SELECT asset_id,
        sector,
        window_end, 
        AVG(historical_volatility) AS avg_historical_volatility,
        AVG(sector_performance) AS avg_sector_performance,
        AVG(sentiment_score) AS avg_sentiment_score
    FROM TUMBLE(enrichment_data, timestamp, '5 minutes')
    GROUP BY asset_id, sector, window_end
) AS ed
ON bas.asset_id = ed.asset_id AND
   bas.window_end = ed.window_end;
```

You can query from `enriched_market_data` to see the results.

```sql theme={null}
SELECT * FROM enriched_market_data LIMIT 5;
```

```
 asset_id |   sector   | average_price | bid_ask_spread | rolling_volatility |   avg_historical_volatility    |     avg_sector_performance      |       avg_sentiment_score       |     window_end      
----------+------------+---------------+----------------+--------------------+--------------------------------+---------------------------------+---------------------------------+---------------------
        1 | Energy     |         99.75 |           0.61 |              27.83 |                      0.2940625 |                        -0.00375 |                       0.0940625 | 2024-11-19 16:30:00
        4 | Technology |        100.62 |           0.60 |              30.52 | 0.3102702702702702702702702703 |  0.0045945945945945945945945946 | -0.0683783783783783783783783784 | 2024-11-19 16:30:00
        5 | Energy     |        100.24 |           0.60 |              28.80 | 0.2890697674418604651162790698 |   0.004186046511627906976744186 |  0.1609302325581395348837209302 | 2024-11-19 16:35:00
        2 | Energy     |        106.55 |           0.58 |              28.76 | 0.2922222222222222222222222222 |                           -0.01 | -0.2955555555555555555555555556 | 2024-11-19 16:20:00
        3 | Energy     |         98.77 |           0.64 |              29.45 | 0.2894594594594594594594594595 |  0.0035135135135135135135135135 |                           -0.10 | 2024-11-19 16:30:00
```

When finished, press `Ctrl+C` to close the connection between RisingWave and `psycopg2`.

## Summary

In this tutorial, you learn:

* How to get time-windowed aggregate results by using the tumble time window function.
* How to join data sources with materialized views.
