Subscription
Subscription is used to pull data change records for a specific table or materialized view (MV).
The data from a subscription includes both the existing data in the table at the time of subscription creation and the incremental change records in the table after the subscription is created. You can use the method of creating a subscription cursor to retrieve the full data set or the incremental data set after a specified starting point.
This feature allows you to monitor all data changes without relying on external event stores like Kafka. Compared to the Kafka sink or other event store sinks, a subscription requires fewer components and thus, less maintenance.
PUBLIC PREVIEW
This feature is in the public preview stage, meaning it’s nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.
Manage subscription
Use the syntax below to create, drop or alter subscription.
Create subscription
To create a subscription, use the syntax below:
The FROM
clause must specify either a table or a materialized view (mv).
The retention
parameter should be provided as a string in the format of an interval. It represents the duration for which incremental data will be retained. Any incremental data that exceeds the specified retention duration will be automatically deleted and will no longer be accessible.
Drop subscription
To drop a subscription, use the syntax below:
Alter subscription
To rename a subscription, change the owner, or set a new schema, use the syntax below:
Subscription cursor
A subscription cursor is a unit used to consume data from a subscription. In RisingWave, it’s a tool specifically designed to work in conjunction with a subscription, differing from the general cursor.
In RisingWave, the subscription cursor allows you to specify a specific starting point within the data of the subscription. Once the subscription cursor is created, you can use a loop to fetch and consume the data starting from that point onwards. A subscription can have multiple subscription cursors, which can be used to consume different ranges or intervals of data from the subscription.
Syntax
The syntax of creating a subscription cursor is as follows:
The since_clause
is used to specify the starting point for reading data. By setting this clause, you can control the range of data that is returned, allowing you to retrieve only the incremental data or data starting from a specific time or event.
Below are the available choices for since_clause
. If you don’t specify the since_clause
, the returned data will just include the incremental data after declaration, which equals to the first choice below.
since now()/proctime()
: The returned data will include only the incremental data starting from the time of declaration.since begin()
: The returned data will include the oldest incremental data available, typically starting from the beginning of the subscription’s retention period.since unix_ms
: Starts reading from the first time point greater than or equal to the specifiedunix_ms
value. It’s important to note that theunix_ms
value should fall within the range ofnow() - subscription's retention
andnow
.
If you specify FULL
instead of the since_clause
, the subscription cursor starts consuming data from stock.
Fetch from cursor
NOTE
FETCH from cursor function is currently only supported in the PSQL simple query mode. If you are using components like JDBC that default to the extended query mode, please manually set the mode to simple query mode.
FETCH NEXT FROM cursor
After creating a subscription cursor, you can fetch the data by the FETCH NEXT FROM cursor_name
command. Then you will see a result like below:
The op
column in the result stands for the change operations. It has four options: insert
, update_insert
, delete
, and update_delete
. For a single UPDATE statement, the subscription log will contain two separate rows: one with update_insert
and another with update_delete
. This is because RisingWave treats an UPDATE as a delete of the old value followed by an insert of the new value. As for rw_timestamp
, it corresponds to the Unix timestamp in milliseconds when the data was written.
Note that each time FETCH NEXT FROM cursor_name
is called, it will return one row of incremental data from the subscribed table. It does not return all the incremental data at once, but requires the user to repeatedly call this statement to fetch the data.
This method is non-blocking. Even if the current table has no new incremental data, FETCH NEXT FROM cursor_name
will not block, but will return an empty row. When new incremental data is generated, calling this statement again will return the latest row of data.
FETCH n FROM cursor
You also can fetch multiple rows at once from the cursor using the FETCH n FROM cursor_name
command. n
is the number of rows to fetch.
Order of the fetched data
- For data with different
rw_timestamp
, values are returned in the order the events occurred. - For data with the same
rw_timestamp
, the order matches the event sequence if the data belongs to the same primary key in the subscribed materialized view or table. - For data with the same
rw_timestamp
but different primary keys, the order may not reflect the exact event sequence.
Show subscription cursors
To show all subscription cursors in the current session, use the syntax below:
Examples
Let’s create a table t1
and subscribe this table, then create a cursor for this subscription.
After creation, we can use the FETCH NEXT FROM cursor_name
statement to fetch data from this cursor:
Then we can update table t1
and fetch again to view the changes:
We can also create another subscription cursor to specify since_clause
. Let’s use since unix_ms
to rebuild the cursor:
Subscribing via Postgres driver
For this feature, you only need to use the Postgres driver, and no extra dependencies are required.
Here’s an example using Python and psycopg2.
Example output:
Exactly-once delivery
The persistent nature of subscriptions allows the subscriber to resume from a specific point in time (rw_timestamp
) without data loss after a failure recovery. We also guarantee no duplicates in subscriptions, thus ensuring exactly-once delivery.
Persisting the consumption progress
To achieve exactly-once delivery, it’s required to periodically persist the timestamp in storage. We recommend using RisingWave as the store, as no extra component is needed.
First, we need to create a table for storing the progress.
Here’s an example python code for retrieving and updating the consumption progress:
The client needs to retrieve the last progress during bootstrapping and periodically store the progress.
Use case
Potential use cases for subscriptions are as follows. If you have explored more use cases, feel free to share them with us in our Slack channel.
- Real-time alerting/notification: Subscribers can employ sophisticated alerting rules to detect abnormal events and notify downstream applications.
- Event-driven architectures: Develop event-driven systems that react to changes based on specific business logic, such as synchronizing data to microservices.
Was this page helpful?