Overview

But analyzing these messages is imperative as it allows businesses to make more strategic business decisions by understanding the values of their consumers and competitors. A streaming system would be helpful in this case as it enables companies to update with trending topics constantly.

To keep track of topics, social media platforms, such as Twitter, use hashtags to indicate what their post is about. The number of times a hashtag is used tracks audience engagement. If a hashtag is used often, it suggests that a particular topic is popular. And if we track how often a hashtag is used over time, we can determine if audience engagement is increasing or decreasing.

In this tutorial, you will learn how to extract valuable insights from text data using RisingWave. We have set up a demo cluster for this tutorial, so you can easily try it out.

Prerequisites

  • Ensure you have Docker and Docker Compose installed in your environment. Note that Docker Compose is included in Docker Desktop for Windows and macOS. If you use Docker Desktop, ensure that it is running before launching the demo cluster.
  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment. For detailed instructions, see Download PostgreSQL.

Step 1: Launch the demo cluster

In the demo cluster, we packaged RisingWave and a workload generator. The workload generator will start to generate random traffic and feed them into Kafka as soon as the cluster is started.

First, clone the risingwave repository to the environment.

git clone https://github.com/risingwavelabs/risingwave.git

Now navigate to the integration_tests/twitter directory and start the demo cluster from the docker compose file.

cd risingwave/integration_tests/twitter
docker compose up -d

COMMAND NOT FOUND?

The default command-line syntax in Compose V2 starts with docker compose. See details in the Docker docs.

If you’re using Compose V1, use docker-compose instead.

Necessary RisingWave components will be started, including the compute node, metadata node, and MinIO. The workload generator will start to generate random data and feed them into Kafka topics. In this demo cluster, data of materialized views will be stored in the MinIO instance.

Now connect to RisingWave to manage data streams and perform data analysis.

psql -h localhost -p 4566 -d dev -U root

Step 2: Connect RisingWave to data streams

This tutorial will use RisingWave to consume data streams and perform data analysis. Tweets will be used as sample data so we can query the most popular hashtags on a given day to keep track of trending topics.

Below are the schemas for tweets and Twitter users. In the tweet schema, text contains the content of a tweet, and created_at contains the date and time when a tweet was posted. Hashtags will be extracted from text.

{
    "data": {
        "created_at": "2020-02-12T17:09:56.000Z",
        "id": "1227640996038684673",
        "text": "Doctors: Googling stuff online does not make you a doctor\n\nDevelopers: https://t.co/mrju5ypPkb",
        "lang": "English"
    },
    "author": {
        "created_at": "2013-12-14T04:35:55.000Z",
        "id": "2244994945",
        "name": "Singularity Data",
        "username": "singularitty"
    }
}

Connect to the data stream with the following SQL statement.

CREATE SOURCE twitter (
    data STRUCT < created_at TIMESTAMP WITH TIME ZONE,
    id VARCHAR,
    text VARCHAR,
    lang VARCHAR >,
    author STRUCT < created_at TIMESTAMP WITH TIME ZONE,
    id VARCHAR,
    name VARCHAR,
    username VARCHAR,
    followers INT >
) WITH (
    connector = 'kafka',
    topic = 'twitter',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Note that the SQL statement uses the STRUCT data type. For details about the STRUCT data type, please see Data types.

Step 3: Define a materialized view and analyze data

This tutorial will create a materialized view that tracks how often each hashtag is used daily.

To do so, start by extracting all the hashtags used within a tweet by using the regexp_matches function. For instance, if given the following tweet:

Struggling with the high cost of scaling? Fret not! #RisingWave cares about performance and cost-efficiency. We use a tiered architecture that fully utilizes the #cloud resources to give the users fine-grained control over cost and performance.

The regexp_matches function will find all the text in the tweet that matches the RegEx pattern #\w+. This extracts all the hashtags from the tweet and stores them in an array.

       hashtag        |        created_at
----------------------+--------------------------
[#RisingWave, #cloud] | 2022-05-18 17:00:00+00:00

Then the unnest function will separate each item in the array into separate rows.

   hashtag   |        created_at
----------------------------------------
 #RisingWave | 2022-05-18 17:00:00+00:00
   #cloud    | 2022-05-18 17:00:00+00:00

Finally, we can group by hashtag and window_start to count how many times each hashtag was used daily.

CREATE MATERIALIZED VIEW hot_hashtags AS WITH tags AS (
    SELECT
        unnest(regexp_matches((data).text, '#\w+', 'g')) AS hashtag,
        (data).created_at AS created_at
    FROM
        twitter
)
SELECT
    hashtag,
    COUNT(*) AS hashtag_occurrences,
    window_start
FROM
    TUMBLE(tags, created_at, INTERVAL '1 day')
GROUP BY
    hashtag,
    window_start;

Step 4: Query the results

We can query the ten most often used hashtags.

SELECT * FROM hot_hashtags
ORDER BY hashtag_occurrences DESC
LIMIT 10;

The results may look like this:

  hashtag  | hashtag_occurrences |       window_start
------------------------------------------------------------
   #Multi  |         262         | 2022-08-18 00:00:00+00:00
   #zero   |         198         | 2022-08-18 00:00:00+00:00
 knowledge |         150         | 2022-08-18 00:00:00+00:00
   #Open   |         148         | 2022-08-18 00:00:00+00:00
   #User   |         142         | 2022-08-18 00:00:00+00:00
  #Cross   |         141         | 2022-08-18 00:00:00+00:00
  #local   |         139         | 2022-08-18 00:00:00+00:00
  #client  |         138         | 2022-08-18 00:00:00+00:00
  #system  |         135         | 2022-08-18 00:00:00+00:00
    #Re    |         132         | 2022-08-18 00:00:00+00:00

Most used hashtags from different dates will be shown if the workload generator runs for multiple days.

When you finish, run the following command to disconnect RisingWave.

\q

Optional: To remove the containers and the data generated, use the following command.

docker compose down -v

Summary

In this tutorial, we learn:

  • How to define a nested table with RisingWave.
  • How to extract character combinations from a string using regular expressions.