Skip to main content

Ingest data from Pulsar

You can ingest data from Pulsar into RisingWave by using the Pulsar source connector in RisingWave.

Public Preview

This feature is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.

When creating a source, you can choose to persist the data from the source in RisingWave by using CREATE TABLE instead of CREATE SOURCE and specifying the connection settings and data format.

Syntax

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name 
[ schema_definition ]
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
WITH (
connector='pulsar',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
info

For Avro and Protobuf data, do not specify schema_definition in the CREATE SOURCE or CREATE TABLE statement. The schema should be provided in a Web location in the option schema.location in ENCODE properties section.

note

RisingWave performs primary key constraint checks on tables with connector settings but not on regular sources. If you need the checks to be performed, please create a table with connector settings.

For a table with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.

Connector parameters

FieldNotes
topicRequired. Address of the Pulsar topic. One source can only correspond to one topic.
service.urlRequired. Address of the Pulsar service. Typically in the format pulsar:// or pulsar+ssl://<host>:<port>
scan.startup.modeOptional. The offset mode that RisingWave will use to consume data. The two supported modes are earliest (earliest offset) and latest (latest offset). If not specified, the default value earliest will be used.
scan.startup.timestamp.millis.Optional. RisingWave will start to consume data from the specified UNIX timestamp (milliseconds).
auth.tokenOptional. A token for auth. If both auth.token and oauth are set, only oauth authorization is effective.
oauth.issuer.urlOptional. The issuer url for OAuth2. This field must be filled if other oauth fields are specified.
oauth.credentials.urlOptional. The path for credential files, starts with file://. This field must be filled if other oauth fields are specified.
oauth.audienceOptional. The audience for OAuth2. This field must be filled if other oauth fields are specified.
oauth.scopeOptional. The scope for OAuth2.

Other parameters

FieldNotes
data_formatSupported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN.
data_encodeSupported encodes: JSON, AVRO, PROTOBUF, CSV, BYTES.
messageMessage name of the main Message in schema definition. Required when data_encode is PROTOBUF.
locationWeb location of the schema file in http://..., https://..., or S3://... format. Required when data_encode is AVRO or PROTOBUF. Examples:
https://<example_host>/risingwave/proto-simple-schema.proto
s3://risingwave-demo/schema-location
aws.credentials.access_key_idOptional. The AWS access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path.
aws.credentials.secret_access_keyOptional. The AWS secret access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path.
regionRequired if loading descriptors from S3. The AWS service region.
aws.credentials.role.arnOptional. The Amazon Resource Name (ARN) of the role to assume.
aws.credentials.role.external_idOptional. The external id used to authorize access to third-party resources.

Read schemas from locations

RisingWave supports reading schemas from a Web location in http://..., https://..., or S3://... format for Pulsar data in Avro or Protobuf format.

For Protobuf, if a schema location is specified, the schema file must be a FileDescriptorSet, which can be compiled from a .proto file with a command like this:

protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.proto

To specify a schema location, add this clause to a CREATE SOURCE statement.

ROW SCHEMA LOCATION 'location'

If a primary key also needs to be defined, use the table constraint syntax.

CREATE TABLE table1 (PRIMARY KEY(id)) 

Example

Here is an example of connecting RisingWave to a Pulsar broker to read data from individual topics.

CREATE {TABLE | SOURCE} IF NOT EXISTS source_abc 
WITH (
connector='pulsar',
topic='demo_topic',
service.url='pulsar://localhost:6650/',
oauth.issuer.url='https://auth.streamnative.cloud/',
oauth.credentials.url='s3://bucket_name/your_key_file.file',
oauth.audience='urn:sn:pulsar:o-d6fgh:instance-0',
aws.credentials.access_key_id='aws.credentials.access_key_id',
aws.credentials.secret_access_key='aws.credentials.secret_access_key',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message',
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);

Help us make this doc better!