Overview

In fast-paced financial markets, regulatory agencies and trading firms are constantly monitoring trades to flag irregular activity. Behaviors like spoofing, where traders place deceptive orders, or sudden large price spikes, are particularly concerning. As trades are happening every second, being able to detect and react instantly to suspicious behavior is crucial to maintain fair and transparent operations.

By monitoring and analyzing bid-ask spreads, and rolling volumes between assets and trades on the fly, firms can instantly detect potential risks. For example, a tight bid-ask spread with a sudden decrease in rolling volume hints at spoofing, and a sharp price increase within a short time window indicates a spike in volatility.

In this tutorial, you will learn how to monitor market and trade activities in real time. You will enrich the raw trade and market data with calculated metrics like high trading volume and rapid price fluctuations.

Prerequisites

  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment. For detailed instructions, see Download PostgreSQL.
  • Install and run RisingWave. For detailed instructions on how to quickly get started, see the Quick start guide.
  • Ensure that a Python environment is set up and install the psycopg2 library.

Step 1: Set up the data source tables

Once RisingWave is installed and deployed, run the two SQL queries below to set up the tables. You will insert data into these tables to simulate live data streams.

  1. The table trade_data tracks key details about individual trades, such as the buyer, seller, volume, and price of the trade.

    CREATE TABLE trade_data (
        trade_id INT,
        asset_id INT,
        timestamp TIMESTAMPTZ,
        price NUMERIC,
        volume INT,
        buyer_id INT,
        seller_id INT
    );
    
  2. The market_data table tracks information related to financial assets, such as the bid price, ask price, and the trading volume over a rolling time period..

    CREATE TABLE market_data (
        asset_id INT,
        timestamp TIMESTAMPTZ,
        bid_price NUMERIC,
        ask_price NUMERIC,
        price NUMERIC,
        rolling_volume INT
    );
    

Step 2: Run the data generator

To keep this demo simple, a Python script is used to generate and insert data into the tables created above.

Clone the awesome-stream-processing repository.

git clone https://github.com/risingwavelabs/awesome-stream-processing.git

Navigate to the market_surveillance folder.

cd awesome-stream-processing/tree/main/02-simple-demos/capital_markets/market_surveillance

Run the data_generator.py file. This Python script utilizes the psycopg2 library to establish a connection with RisingWave so you can generate and insert synthetic data into the tables positions and market_data.

If you are not running RisingWave locally or using default credentials, update the connection parameters accordingly:

default_params = {
    "dbname": "dev",
    "user": "root",
    "password": "",
    "host": "localhost",
    "port": "4566"
}

Step 3: Create materialized views

In this demo, you will create multiple materialized views to help analyze market activity and flag suspicious trades.

Materialized views contain the results of a view expression and are stored in the RisingWave database. The results of a materialized view are computed incrementally and updated whenever new events arrive and do not require to be refreshed. When you query from a materialized view, it will return the most up-to-date computation results.

Identify unusual volume trades

The unusual_volume materialized view indicates if a trade has a higher than average trading volume within a 10-minute window. TUMBLE() is used to to split everything into non-overlapping 10-minute windows. GROUP BY is used to group the data by the unique trade ID, asset ID, volume, and window start time. Then PARTITION BY is used to ensure that the average volume is calculated separately for each asset.

If the trade’s volume is 1.5 times greater than the average volume of each asset over the past ten-minutes, it is marked as an unusual trade.

CREATE MATERIALIZED VIEW unusual_volume AS
SELECT
    trade_id,
    asset_id,
    volume,
    CASE WHEN volume > avg_volume * 1.5 THEN TRUE ELSE FALSE END AS unusual_volume,
    window_start AS timestamp
FROM (
    SELECT
        trade_id,
        asset_id,
        volume,
        AVG(volume) OVER (PARTITION BY asset_id) as avg_volume,
        window_start
    FROM TUMBLE(trade_data, timestamp, INTERVAL '10 MINUTES')
    GROUP BY
        trade_id,
        asset_id,
        volume,
        window_start
);

You can query from position_overview to see the results. High volume trades are indicated in the unusual_volume column.

SELECT * FROM unusual_volume LIMIT 5;
 trade_id | asset_id | volume | unusual_volume |      timestamp      
----------+----------+--------+----------------+---------------------
    11633 |        1 |    943 | t              | 2024-11-26 15:20:00
    11880 |        1 |     93 | f              | 2024-11-26 15:20:00
    12972 |        1 |    604 | f              | 2024-11-26 15:20:00
    13964 |        1 |    181 | f              | 2024-11-26 15:20:00
    15789 |        1 |    127 | f              | 2024-11-26 15:20:00

Monitor price spikes

The price_spike materialized view analyzes the price fluctuation of assets within a five-minute window to detect potential price spikes. For each asset, calculate the percent change between the highest and lower prices within a five-minute window.

A price spike for the asset is detected if the percentage change exceeds 5%.

CREATE MATERIALIZED VIEW price_spike AS
SELECT
    asset_id,
    ROUND((MAX(price) - MIN(price)) / MIN(price) * 100, 2) AS price_change_pct,
    CASE 
        WHEN ROUND((MAX(price) - MIN(price)) / MIN(price) * 100, 2) > 5 THEN TRUE
        ELSE FALSE
    END AS price_spike,
    window_start AS timestamp
FROM
    TUMBLE(market_data, timestamp, INTERVAL '5 MINUTES')
GROUP BY
    asset_id,
    window_start;

You can query from price_spike to see the results. The if_price_spike column denotes if there was a price spike for the asset.

SELECT * FROM price_spike;
 asset_id | price_change_pct | price_spike |      timestamp      
----------+------------------+-------------+---------------------
        3 |           192.05 | t           | 2024-11-26 15:20:00
        5 |           185.41 | t           | 2024-11-26 15:20:00
        1 |           184.32 | t           | 2024-11-26 15:20:00
        4 |           193.79 | t           | 2024-11-26 15:20:00
        2 |           186.83 | t           | 2024-11-26 15:20:00

Flag spoofing activity

The spoofing_detection materialized view detects potential spoofing activity by analyzing the bid-ask price difference and the trading volume.

The following two conditions must be met to flag spoofing activity:

  • The absolute difference between the bid price and ask price is less than 0.2.
  • The current rolling volume is less than 80% of the average rolling volume over the past ten minutes.
CREATE MATERIALIZED VIEW spoofing_detection AS
SELECT
    asset_id,
    window_start AS timestamp,
    CASE
    WHEN ABS(AVG(bid_price) - AVG(ask_price)) < 0.2 AND 
        SUM(rolling_volume) < AVG(SUM(rolling_volume)) OVER (PARTITION BY asset_id) * 0.8
        THEN TRUE
        ELSE FALSE
    END AS potential_spoofing
FROM TUMBLE(market_data, timestamp, INTERVAL '10 MINUTES')
GROUP BY
    asset_id,
    window_start;

You can query from spoofing_detection to see the results.

SELECT * FROM spoofing_detection LIMIT 5;
 asset_id |      timestamp      | potential_spoofing 
----------+---------------------+--------------------
        1 | 2024-11-26 15:20:00 | f
        5 | 2024-11-26 15:20:00 | f
        2 | 2024-11-26 15:20:00 | f
        4 | 2024-11-26 15:20:00 | f
        3 | 2024-11-26 15:20:00 | f

When finished, press Ctrl+C to close the connection between RisingWave and psycopg2.

Summary

In this tutorial, you learn:

  • How to use PARTITION BY to calculate the average volume separately for each asset