Redis is an open-source, in-memory data structure store, often referred to as a data structure server. RisingWave sinks data to Redis in the form of strings storing key-value pairs in the specified format (JSON or TEMPLATE), so a primary key must always be provided.

You can test out this process on your own device by using the redis-sink demo in the integration_test directory of the RisingWave repository.

Prerequisites

Before sinking data from RisingWave to Redis, please ensure the following:

  • The Redis database you want to sink to is accessible from RisingWave.
  • Ensure you have an upstream materialized view or source in RisingWave that you can sink data from.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='redis',
   connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
    key = 'value' ) ]
[KEY ENCODE key_encode [(...)]]
;

Parameters

NameDescription
redis.urlRequired. Choose either the Redis cluster address or a non-cluster Redis address.
  • If the address is a cluster address, it should be in the form of a JSON array, like redis.url= '["redis://redis-server:6379/"]'.
  • If the address is a non-cluster address, it should be in the form of a string, like redis.url= 'redis://redis-server:6379/'.
primary_keyRequired. The primary keys of the sink. If necessary, use , to delimit the primary key columns.

FORMAT and ENCODE options

These options should be set in FORMAT data_format ENCODE data_encode (key = 'value'), instead of the WITH clause.

FieldNotes
data_formatData format. Allowed formats:
  • PLAIN: Output data with insert operations.
  • UPSERT: Output data as a changelog stream.
data_encodeData encoding. Supported encodings:
  • JSON:
    • date: number of days since the Common Era (CE).
    • interval: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S format string.
    • time without time zone: number of milliseconds past the last midnight.
    • timestamp: number of milliseconds since the Epoch.
  • TEMPLATE: converts data to the string specified by key_format/value_format.
force_append_onlyIf true, forces the sink to be PLAIN (also known as append-only), even if it cannot be.
key_formatSpecify the format for the key as a string.
value_formatSpecify the format for the value as a string.
key_encodeOptional.
  • When specified, the key encode can only be TEXT, and the primary key should be one and only one of the following types: varchar, bool, smallint, int, and bigint;
  • When absent, both key and value will use the same setting of ENCODE data_encode ( ... ).
redis_value_typeOptional. Controls how data is written to Redis. For details, see examples below.
  • string (Default): Key-value storage. key_format and value_format are required.
  • pubsub: Publishes messages to Redis Pub/Sub channels. value_format is required.
  • geospatial: Stores data using Redis’ geospatial index. key_format is required.
channelOptional. Required if redis_value_type = 'pubsub' and channel_column is not set. The name of the Redis Pub/Sub channel to publish messages to.
channel_columnOptional. Required if redis_value_type = 'pubsub' and channel is not set. The values from this column will be used as the names of the Redis Pub/Sub channels to publish messages to. Must be type of VARCHAR.
longitudeOptional. Required if redis_value_type = 'geospatial'. Contains the longitude value. Must be of type FLOAT, REAL, or VARCHAR.
latitudeOptional. Required if redis_value_type = 'geospatial'. Contains the latitude value. Must be of type FLOAT, REAL, or VARCHAR.
memberOptional. Required when redis_value_type = 'geospatial'. Contains the member names for the geospatial set. Must be of type VARCHAR and part of the primary key.

Example

This section provides examples for sinking data from RisingWave to Redis, covering key-value storage, geospatial data storage, and Pub/Sub messaging.

Key-value storage

This approach is suitable when you want to use Redis as a fast data store or cache, storing data as key-value pairs. Specify redis_value_type = 'string' or omit this parameter, as string is the default.

Assume we create a materialized view, bhv_mv, from a source.

CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
    user_id,
    target_id,
    event_timestamp
FROM
    source_1;

We can sink data from bhv_mv to Redis by creating a sink. Here, data_encode is JSON.

CREATE SINK redis_sink
FROM bhv_mv WITH (
    connector = 'redis',
    primary_key = 'user_id',
    redis.url= 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true'
);

We can sink data from bhv_mv to Redis by creating a sink. Here, data_encode is TEMPLATE, so key_format and value_format must be defined.

CREATE SINK redis_sink_2
FROM bhv_mv WITH (
    primary_key = 'user_id',
    connector = 'redis',
    redis.url= 'redis://127.0.0.1:6379/',
) FORMAT PLAIN ENCODE TEMPLATE (
    force_append_only='true',
    key_format = 'UserID:{user_id}',
    value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'
);

Geospatial data storage

This approach allows RisingWave to store geospatial data (longitude, latitude, member) in Redis. Make sure to specify FORMAT UPSERT ENCODE TEMPLATE, redis_value_type = 'geospatial', key_format, longitude, latitude, and member.

Assume we have a table t1:

CREATE TABLE t1(v1 float, v2 float, v3 varchar, v4 varchar);

We can sink geospatial data to Redis:

CREATE SINK s1
FROM
    t1 WITH (
    primary_key = 'v3,v4',
    connector = 'redis',
    redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='geospatial', 
longitude = 'v1', 
latitude = 'v2', 
member = 'v3',
key_format = 'abcd3:{v4}'
);

Now insert the geospatial data into the table t1:

INSERT INTO t1 VALUES (1.1, 1.1, 'test','test');

You can retrieve the geospatial information from Redis:

GEOPOS abcd3:test test
------
1) 1) "1.10000044298171997"
   2) "1.10000000482478555"

For more information, see Geospatial indexing.

Pub/Sub messaging

This approach allows RisingWave to publish messages to Redis channels. Make sure to specify FORMAT UPSERT ENCODE TEMPLATE, redis_value_type = 'pubsub', and value_format. At least one of channel or channel_column must also be specified.

Assume we have a table t1:

CREATE TABLE t1(v1 float, v2 float, v3 varchar, v4 varchar);

We can sink data from t1 as messages to a predefined Redis Pub/Sub channel:

CREATE SINK s1
FROM
    t1 WITH (
    primary_key = 'v3,v4',
    connector = 'redis',
    redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='pubsub', 
channel= 'test123',
value_format = 'abcd3:{v4}'
);

Alternatively, you can publish to a dynamic channel using channel_column:

CREATE SINK s1
FROM
    t1 WITH (
    primary_key = 'v3,v4',
    connector = 'redis',
    redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='pubsub', 
channel_column = 'v3',
value_format = 'abcd3:{v4}'
);