You can ingest data from Pulsar into RisingWave by using the Pulsar source connector in RisingWave.
CREATE TABLE
instead of CREATE SOURCE
and specifying the connection settings and data format.
schema_definition
in the CREATE SOURCE
or CREATE TABLE
statement. The schema should be provided in a Web location in the option schema.location
in ENCODE properties
section.Field | Notes |
---|---|
topic | Required. Address of the Pulsar topic. One source can only correspond to one topic. |
service.url | Required. Address of the Pulsar service. Typically in the format pulsar:// or pulsar+ssl://<host>:<port> |
scan.startup.mode | Optional. The offset mode that RisingWave will use to consume data. The two supported modes are earliest (earliest offset) and latest (latest offset). If not specified, the default value earliest will be used. |
scan.startup.timestamp.millis. | Optional. RisingWave will start to consume data from the specified UNIX timestamp (milliseconds). |
auth.token | Optional. A token for auth. If both auth.token and oauth are set, only oauth authorization is effective. |
oauth.issuer.url | Optional. The issuer url for OAuth2. This field must be filled if other oauth fields are specified. |
oauth.credentials.url | Optional. The path for credential files, starts with file:// . This field must be filled if other oauth fields are specified. |
oauth.audience | Optional. The audience for OAuth2. This field must be filled if other oauth fields are specified. |
oauth.scope | Optional. The scope for OAuth2. |
Field | Notes |
---|---|
data_format | Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN. |
data_encode | Supported encodes: JSON, AVRO, PROTOBUF, CSV, BYTES. |
message | Message name of the main Message in schema definition. Required when data_encode is PROTOBUF. |
location | Web location of the schema file in http://... , https://... , or S3://... format. Required when data_encode is AVRO or PROTOBUF . Examples:https://\<example\_host>/risingwave/proto-simple-schema.proto ,s3://risingwave-demo/schema-location |
aws.credentials.access_key_id | Optional. The AWS access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path. |
aws.credentials.secret_access_key | Optional. The AWS secret access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path. |
region | Required if loading descriptors from S3. The AWS service region. |
aws.credentials.role.arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
aws.credentials.role.external_id | Optional. The external id used to authorize access to third-party resources. |
http://...
, https://...
, or S3://...
format for Pulsar data in Avro or Protobuf format.
For Protobuf, if a schema location is specified, the schema file must be a FileDescriptorSet
, which can be compiled from a .proto
file with a command like this:
CREATE SOURCE
statement.