CREATE TABLE instead of CREATE SOURCE and specifying the connection settings and data format.
Syntax
For Avro and Protobuf data, do not specify
schema_definition in the CREATE SOURCE or CREATE TABLE statement. The schema should be provided in a Web location in the option schema.location in ENCODE properties section.Connector parameters
| Field | Notes |
|---|---|
| topic | Required. Address of the Pulsar topic. One source can only correspond to one topic. |
| service.url | Required. Address of the Pulsar service. Typically in the format pulsar:// or pulsar+ssl://<host>:<port> |
| scan.startup.mode | Optional. The offset mode that RisingWave will use to consume data. The two supported modes are earliest (earliest offset) and latest (latest offset). If not specified, the default value earliest will be used. |
| scan.startup.timestamp.millis. | Optional. RisingWave will start to consume data from the specified UNIX timestamp (milliseconds). |
| auth.token | Optional. A token for auth. If both auth.token and oauth are set, only oauth authorization is effective. |
| oauth.issuer.url | Optional. The issuer url for OAuth2. This field must be filled if other oauth fields are specified. |
| oauth.credentials.url | Optional. The path for credential files, starts with file://. This field must be filled if other oauth fields are specified. |
| oauth.audience | Optional. The audience for OAuth2. This field must be filled if other oauth fields are specified. |
| oauth.scope | Optional. The scope for OAuth2. |
| subscription.unacked.resend.delay | Optional. Specifies the delay duration after which the broker will resend unacknowledged messages. Accepts duration values such as '1s', '5m', or '1h'. Defaults to None, meaning the broker will not resend unacknowledged messages. |
Other parameters
| Field | Notes |
|---|---|
| data_format | Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN. |
| data_encode | Supported encodes: JSON, AVRO, PROTOBUF, CSV, BYTES. |
| message | Message name of the main Message in schema definition. Required when data_encode is PROTOBUF. |
| location | Web location of the schema file in http://..., https://..., or S3://... format. Required when data_encode is AVRO or PROTOBUF. Examples:https://\<example\_host>/risingwave/proto-simple-schema.proto,s3://risingwave-demo/schema-location |
| aws.credentials.access_key_id | Optional. The AWS access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path. |
| aws.credentials.secret_access_key | Optional. The AWS secret access key for loading from S3. This field does not need to be filled if oauth.credentials.url is specified to a local path. |
| region | Required if loading descriptors from S3. The AWS service region. |
| aws.credentials.role.arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
| aws.credentials.role.external_id | Optional. The external id used to authorize access to third-party resources. |
Message acknowledgment
Added in v2.7.0.
subscription.unacked.resend.delay parameter to control when the broker resends unacknowledged messages.
Example
Read schemas from locations
RisingWave supports reading schemas from a Web location inhttp://..., https://..., or S3://... format for Pulsar data in Avro or Protobuf format.
For Protobuf, if a schema location is specified, the schema file must be a FileDescriptorSet, which can be compiled from a .proto file with a command like this:
CREATE SOURCE statement.
Example
Here is an example of connecting RisingWave to a Pulsar broker to read data from individual topics.- Avro
- JSON
- Protobuf