Subscription
Subscription is used to pull data change records for a specific table or materialized view (MV).
The data from a subscription includes both the existing data in the table at the time of subscription creation and the incremental change records in the table after the subscription is created. You can use the method of creating a subscription cursor to retrieve the full data set or the incremental data set after a specified starting point.
This feature allows you to monitor all data changes without relying on external event stores like Kafka. Compared to the Kafka sink or other event store sinks, a subscription requires fewer components and thus, less maintenance.
PUBLIC PREVIEW
This feature is in the public preview stage, meaning it’s nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.
Manage subscription
Use the syntax below to create, drop or alter subscription.
Create subscription
To create a subscription, use the syntax below:
The FROM
clause must specify either a table or a materialized view (mv).
The retention
parameter should be provided as a string in the format of an interval. It represents the duration for which incremental data will be retained. Any incremental data that exceeds the specified retention duration will be automatically deleted and will no longer be accessible.
Drop subscription
To drop a subscription, use the syntax below:
Alter subscription
To rename a subscription, change the owner, or set a new schema, use the syntax below:
Subscription cursor
A subscription cursor is a unit used to consume data from a subscription. In RisingWave, it’s a tool specifically designed to work in conjunction with a subscription, differing from the general cursor.
In RisingWave, the subscription cursor allows you to specify a specific starting point within the data of the subscription. Once the subscription cursor is created, you can use a loop to fetch and consume the data starting from that point onwards. A subscription can have multiple subscription cursors, which can be used to consume different ranges or intervals of data from the subscription.
Syntax
The syntax of creating a subscription cursor is as follows:
The since_clause
is used to specify the starting point for reading data. By setting this clause, you can control the range of data that is returned, allowing you to retrieve only the incremental data or data starting from a specific time or event.
Below are the available choices for since_clause
. If you don’t specify the since_clause
, the returned data will just include the incremental data after declaration, which equals to the first choice below.
since now()/proctime()
: The returned data will include only the incremental data starting from the time of declaration.since begin()
: The returned data will include the oldest incremental data available, typically starting from the beginning of the subscription’s retention period.since unix_ms
: Starts reading from the first time point greater than or equal to the specifiedunix_ms
value. It’s important to note that theunix_ms
value should fall within the range ofnow() - subscription's retention
andnow
.
If you specify FULL
instead of the since_clause
, the subscription cursor starts consuming data from stock.
Fetch from cursor
FETCH from cursor function is supported in the PSQL simple query mode and extended mode.
Non-blocking data fetch
Fetch the next row or up to N rows from the cursor. If fewer than N rows are available, it will return whatever is available immediately without waiting. This also means that if there are no rows available (i.e., the latest data has been reached), an empty result will be returned immediately.
In the example above, the op
column in the result indicates the type of change operations. There are four options: Insert
, UpdateInsert
, Delete
, and UpdateDelete
. For a single UPDATE statement, the subscription log will contain two separate rows: one with UpdateInsert
and another with UpdateDelete
. This is because RisingWave treats an UPDATE as a delete of the old value followed by an insert of the new value. As for rw_timestamp
, it corresponds to the Unix timestamp in milliseconds when the data was written.
Blocking data fetch
Fetch up to N rows from the cursor with a specified timeout. The timeout
value should be a string in the interval format. In this case, the fetch statement will return when either N rows have been fetched or the timeout occurs. If the timeout occurs, whatever has been read up to that point will be returned. Here are two scenarios to trigger the timeout:
-
The cursor has reached the latest data and has been waiting too long for new data to arrive.
-
At least N rows are available for the cursor to read, but retrieving all of them takes an extended period.
To avoid polling for new data frequently with the non-blocking FETCH
, you can set a longer timeout to simulate a scenario where you want the FETCH
to block until new data arrives.
Order of the fetched data
- For data with different
rw_timestamp
, values are returned in the order the events occurred. - For data with the same
rw_timestamp
, the order matches the event sequence if the data belongs to the same primary key in the subscribed materialized view or table. - For data with the same
rw_timestamp
but different primary keys, the order may not reflect the exact event sequence.
Show subscription cursors
To show all subscription cursors in the current session, use the syntax below:
Examples
Let’s create a table t1
and subscribe this table, then create a cursor for this subscription.
After creation, we can use the FETCH NEXT FROM cursor_name
statement to fetch data from this cursor:
Then we can update table t1
and fetch again to view the changes:
We can also create another subscription cursor to specify since_clause
. Let’s use since unix_ms
to rebuild the cursor:
Subscribing via Postgres driver
For this feature, you only need to use the Postgres driver, and no extra dependencies are required.
Here’s an example using Python and psycopg2.
Example output:
Exactly-once delivery
The persistent nature of subscriptions allows the subscriber to resume from a specific point in time (rw_timestamp
) without data loss after a failure recovery. We also guarantee no duplicates in subscriptions, thus ensuring exactly-once delivery.
Persisting the consumption progress
To achieve exactly-once delivery, it’s required to periodically persist the timestamp in storage. We recommend using RisingWave as the store, as no extra component is needed.
First, we need to create a table for storing the progress.
Here’s an example python code for retrieving and updating the consumption progress:
The client needs to retrieve the last progress during bootstrapping and periodically store the progress.
Use case
Potential use cases for subscriptions are as follows. If you have explored more use cases, feel free to share them with us in our Slack channel.
- Real-time alerting/notification: Subscribers can employ sophisticated alerting rules to detect abnormal events and notify downstream applications.
- Event-driven architectures: Develop event-driven systems that react to changes based on specific business logic, such as synchronizing data to microservices.
Was this page helpful?