You can ingest data from NATS JetStream into RisingWave by using the NATS source connector in RisingWave.
subscribe
permission for the subject.CREATE TABLE
instead of CREATE SOURCE
and specifying the connection settings and data format.
.
), greater than (>
) or asterisks (*
).NUL
or LPT1
.Foo
and foo
would collide on Windows or macOS systems.Field | Notes |
---|---|
server_url | Required. URLs of the NATS JetStream server, in the format of address:port. If multiple addresses are specified, use commas to separate them. |
subject | Required. NATS subject that you want to ingest data from. To specify more than one subjects, use a comma. |
stream | Required. NATS stream that you want to ingest data from. |
connect_mode | Required. Authentication mode for the connection. Allowed values: plain: No authentication. user_and_password: Use user name and password for authentication. For this option, username and password must be specified. credential: Use JSON Web Token (JWT) and NKeys for authentication. For this option, jwt and nkey must be specified. |
jwt and nkey | JWT and NKEY for authentication. For details, see JWT and NKeys. |
username and password | Conditional. The client user name and password. Required when connect_mode is user_and_password. |
scan.startup.mode | Optional. The offset mode that RisingWave will use to consume data. The supported modes are:
earliest will be used. |
scan.startup.timestamp.millis | Conditional. Required when scan.startup.mode is timestamp. RisingWave will start to consume data from the specified UNIX timestamp. |
data_encode | Supported encodes: JSON, PROTOBUF, BYTES. |
consumer.deliver_subject | Optional. Subject to deliver messages to. |
consumer.durable_name | Required. Durable name for the consumer. |
consumer.name | Optional. Name of the consumer. |
consumer.description | Optional. Description of the consumer. |
consumer.deliver_policy | Optional. Policy on how messages are delivered. |
consumer.ack_policy | Optional. Acknowledgment policy for message processing (e.g., None, All, Explicit). |
consumer.ack_wait.sec | Optional. Time to wait for acknowledgment before considering a message as undelivered. |
consumer.max_deliver | Optional. Maximum number of times a message will be delivered. |
consumer.filter_subject | Optional. Filter for subjects that the consumer will process. |
consumer.filter_subjects | Optional. List of subjects that the consumer will filter on. |
consumer.replay_policy | Optional. Policy for replaying messages (e.g., Instant, Original). |
consumer.rate_limit | Optional. Rate limit for message delivery in bits per second. |
consumer.sample_frequency | Optional. Frequency for sampling messages, ranging from 0 to 100. |
consumer.max_waiting | Optional. Maximum number of messages that can be waiting for acknowledgment. |
consumer.max_ack_pending | Optional. Maximum number of acknowledgments that can be pending. |
consumer.headers_only | Optional. If true, only message headers will be delivered. |
consumer.max_batch | Optional. Maximum number of messages to process in a single batch. |
consumer.max_bytes | Optional. Maximum number of bytes to receive in a single batch. |
consumer.max_expires.sec | Optional. Maximum expiration time for a message in seconds. |
consumer.inactive_threshold.sec | Optional. Time in seconds before a consumer is considered inactive. |
consumer.num.replicas | Optional. Number of replicas for the consumer. |
consumer.memory_storage | Optional. If true, messages will be stored in memory. |
consumer.backoff.sec | Optional. Backoff time in seconds for retrying message delivery. |