MQTT
This page describes the usage of the Stream Reactor MQTT Sink Connector.
Last updated
Was this helpful?
This page describes the usage of the Stream Reactor MQTT Sink Connector.
Last updated
Was this helpful?
For more examples see the .
You can specify multiple KCQL statements separated by ;
to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.
The following KCQL is supported:
Examples:
The connector can route the messages to specific MQTT targets. These are the possible options:
Given Constant Topic:
Route messages to a specified MQTT topic using a constant value:
Using a Field Value for Topic:
Direct messages to an MQTT topic based on a specified field's value. Example expression: fieldA.fieldB.fieldC
.
Utilizing Message Key as Topic:
Determine the MQTT topic by the incoming message key, expected to be a string.
Leveraging Kafka Message Topic:
Set the MQTT topic using the incoming Kafka message’s original topic.
The sink publishes each Kafka record to MQTT according to the type of the record’s value:
Binary (byte[]
)
Forwarded unchanged (“pass-through”).
String
Published as its UTF-8 byte sequence.
Connect Struct
produced by Avro, Protobuf, or JSON Schema converters
Converted to JSON, then published as the JSON string’s UTF-8 bytes.
java.util.Map
or other Java Collection
Serialized to JSON and published as the JSON string’s UTF-8 bytes.
In short, non-binary objects are first turned into a JSON string; everything that reaches MQTT is ultimately a sequence of bytes.
connect.mqtt.hosts
Contains the MQTT connection end points.
string
connect.mqtt.username
Contains the Mqtt connection user name
string
connect.mqtt.password
Contains the Mqtt connection password
password
connect.mqtt.service.quality
Specifies the Mqtt quality of service
int
connect.mqtt.timeout
Provides the time interval to establish the mqtt connection
int
3000
connect.mqtt.clean
connect.mqtt.clean
boolean
true
connect.mqtt.keep.alive
The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.
int
5000
connect.mqtt.client.id
Contains the Mqtt session client id
string
connect.mqtt.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically
string
THROW
connect.mqtt.retry.interval
The time in milliseconds between retries.
int
60000
connect.mqtt.max.retries
The maximum number of times to try the write again.
int
20
connect.mqtt.retained.messages
Specifies the Mqtt retained flag.
boolean
false
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.
boolean
false
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter
string
connect.mqtt.kcql
Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics
string
connect.progress.enabled
Enables the output for how many records have been processed
boolean
false
connect.mqtt.ssl.ca.cert
Provides the path to the CA certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.cert
Provides the path to the certificate file to use with the Mqtt connection
string
connect.mqtt.ssl.key
Certificate private [config] key file path.
string
Version 9 introduces two breaking changes that affect how you route data and project fields in KCQL. Review the sections below and update your configurations before restarting the connector.
WITHTARGET is no longer used
Before (v < 9.0.0)
After (v ≥ 9.0.0)
INSERT INTO SELECT * FROM control.boxes.test WITHTARGET ${path}
INSERT INTO ${path} SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')
Migration step
Delete every WITHTARGET …
clause.
Move the placeholder (or literal) that held the target path into the INSERT INTO
expression.
Add mqtt.target.from.field=true
to the KCQL PROPERTIES
list.
Before (v < 9.0.0)
After (v ≥ 9.0.0)
Handled directly in the KCQL SELECT
list:
SELECT id, temp AS temperature …
The connector passes the full record. Any projection, renaming, or value transformation must be done with a Kafka Connect Single Message Transformer (SMT), KStreams, or another preprocessing step.
Migration step
Remove field lists and aliases from KCQL.
Attach an SMT such as org.apache.kafka.connect.transforms.ExtractField$Value
, org.apache.kafka.connect.transforms.MaskField$Value
, or your own custom SMT to perform the same logic.
The connector supports .