Skip to main content

Sink data from RisingWave to Elasticsearch

You can deliver the data that has been ingested and transformed in RisingWave to Elasticsearch to serve searches or analytics.

This guide describes how to sink data from RisingWave to Elasticsearch using the Elasticsearch sink connector in RisingWave.

Elasticsearch is a distributed, RESTful search and analytics engine capable of addressing a growing number of use cases. It centrally stores your data for lightning fast search, fine‑tuned relevancy, and powerful analytics that scale with ease.

Beta Feature

The Elasticsearch sink connector in RisingWave is currently in Beta. Please contact us if you encounter any issues or have feedback.

note

The Elasticsearch sink connector in RisingWave provides at-least-once delivery semantics. Events may be redelivered in case of failures.

Prerequisites

  • Ensure the Elasticsearch cluster (version 7.x or 8.x) is accessible from RisingWave.

  • If you are running RisingWave locally from binaries, make sure that you have JDK 11 or later versions is installed in your environment.

Create a Elasticsearch sink

Use the following syntax to create a Elasticsearch sink. Once a sink is created, any insert or update to the sink will be streamed to the specified Elasticsearch endpoint.

CREATE SINK sink_name
[ FROM sink_from | AS select_query ]
WITH (
connector = 'elasticsearch',
type = '<type>',
index = '<your Elasticsearch index>',
url = 'http://<ES hostname>:<ES port>',
username = '<your ES username>',
password = '<your password>',
delimiter='<delimiter>'
);

Parameters

ParameterDescription
sink_nameName of the sink to be created.
sink_fromA clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified.
AS select_queryA SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See SELECT for the syntax and examples of the SELECT command.
indexRequired. Name of the Elasticsearch index that you want to write data to.
urlRequired. URL of the Elasticsearch REST API endpoint.
usernameOptional. elastic user name for accessing the Elasticsearch endpoint. It must be used with password.
passwordOptional. Password for accessing the Elasticseaerch endpoint. It must be used with username.
delimiterOptional. Delimiter for Elasticsearch ID when the sink's primary key has multiple columns.

Notes about Elasticsearch ID

If the sink has a primary key (normally it is inherited from a materialized view), RisingWave will use the primary key as the Elasticsearch ID. If the sink doesn't have a primary key (in the case that the materialized view is append-only), RisingWave will use the first column in the sink definition as the Elasticsearch ID.

Data type mapping

ElasticSearch uses a mechanism called dynamic field mapping to dynamically create fields and determine their types automatically. It treats all integer types as long and all floating-point types as float. To ensure data types in RisingWave are mapped to the data types in Elasticsearch correctly, we recommend that you specify the mapping via index templates or dynamic templates before creating the sink.

RisingWave Data TypeElasticSearch Field Type
booleanboolean
smallintlong
integerlong
bigintlong
numericfloat
realfloat
double precisionfloat
character varyingtext
byteatext
datedate
time without time zonetext
timestamp without time zonetext
timestamp with time zonetext
intervaltext
structNot supported
arrayarray
JSONBtext

Help us make this doc better!