This guide describes how to sink data from RisingWave to the MQTT topic using the MQTT sink connector in RisingWave.
iot_sensor_data
table in RisingWave that stores data from various IoT devices at a given timestamp, including temperature and humidity readings, along with a status field indicating whether the device is in a normal or abnormal state. For more information to learn about MQTT and get started with it, refer to the MQTT guide.
mqtt_sink
to forward data from iot_sensor_data
to an MQTT server. It configures the MQTT connector, server URL, target topic, data type, message retention, quality of service, and JSON encoding.
Field | Notes |
---|---|
url | Required. The URL of the broker to connect to, e.g., tcp://localhost. Must be prefixed with tcp:// , mqtt:// , ssl:// , or mqtts:// to denote the protocol. mqtts:// and ssl:// use native certificates if no CA is specified. |
qos | Optional. The quality of service for publishing messages. Defaults to at_most_once. Options include at_most_once, at_least_once, or exactly_once. |
username | Optional. Username for the MQTT broker. |
password | Optional. Password for the MQTT broker. |
client_prefix | Optional. Prefix for the MQTT client ID. Defaults to “risingwave”. |
clean_start | Optional. Determines if all states from queues are removed when the client disconnects.
|
inflight_messages | Optional. Maximum number of inflight messages. Defaults to 100. |
tls.client_cert | Optional. Path to the client’s certificate file (PEM) or a string with the certificate content. Required for client authentication. Can use fs:// prefix for file paths. |
tls.client_key | Optional. Path to the client’s private key file (PEM) or a string with the private key content. Required for client authentication. Can use fs:// prefix for file paths. |
topic | Optional. The topic name to subscribe or publish to. Can include wildcard topics, e.g., /topic/# . |
topic.field | Optional. Enables dynamic writing to multiple topics based on the value of a specified column. If set, the target topic is the value of this column. |
retain | Optional. Whether the message should be retained by the broker. |
r#type | Required. Type identifier. |