LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
      • Google BigQuery
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Connector Class
  • Example
  • KCQL Support
  • Dynamic targets
  • Kafka payload support
  • Error policies
  • Option Reference
  • Migration

Was this helpful?

Export as PDF
  1. Connectors
  2. Sinks

MQTT

This page describes the usage of the Stream Reactor MQTT Sink Connector.

PreviousMongoDBNextRedis

Last updated 29 days ago

Was this helpful?

Connector Class

io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector

Example

For more examples see the .

name=mqtt
connector.class=io.lenses.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=orders
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_sink_id
connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders

KCQL Support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

INSERT
INTO <mqtt-topic>
SELECT * //no field projection supported
FROM <kafka-topic>
//no WHERE clause supported

Examples:

-- Insert into /landoop/demo all fields from kafka_topicA
INSERT INTO `/landoop/demo` SELECT * FROM kafka_topicA

-- Insert into /landoop/demo all fields from dynamic field
INSERT INTO `<field path>` SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')

Dynamic targets

The connector can route the messages to specific MQTT targets. These are the possible options:

  1. Given Constant Topic:

    Route messages to a specified MQTT topic using a constant value:

    INSERT INTO lenses-io-demo ...
  2. Using a Field Value for Topic:

    Direct messages to an MQTT topic based on a specified field's value. Example expression: fieldA.fieldB.fieldC.

    INSERT INTO `<path to field>` 
    SELECT * FROM control.boxes.test 
    PROPERTIES('mqtt.target.from.field'='true')
  3. Utilizing Message Key as Topic:

    Determine the MQTT topic by the incoming message key, expected to be a string.

    INSERT INTO `_key` 
    SELECT ...
  4. Leveraging Kafka Message Topic:

    Set the MQTT topic using the incoming Kafka message’s original topic.

    INSERT INTO `_topic` 
    SELECT ...

Kafka payload support

The sink publishes each Kafka record to MQTT according to the type of the record’s value:

Payload type
Action taken by the sink

Binary (byte[])

Forwarded unchanged (“pass-through”).

String

Published as its UTF-8 byte sequence.

Connect Struct produced by Avro, Protobuf, or JSON Schema converters

Converted to JSON, then published as the JSON string’s UTF-8 bytes.

java.util.Map or other Java Collection

Serialized to JSON and published as the JSON string’s UTF-8 bytes.

In short, non-binary objects are first turned into a JSON string; everything that reaches MQTT is ultimately a sequence of bytes.

Error policies

Option Reference

Name
Description
Type
Default Value

connect.mqtt.hosts

Contains the MQTT connection end points.

string

connect.mqtt.username

Contains the Mqtt connection user name

string

connect.mqtt.password

Contains the Mqtt connection password

password

connect.mqtt.service.quality

Specifies the Mqtt quality of service

int

connect.mqtt.timeout

Provides the time interval to establish the mqtt connection

int

3000

connect.mqtt.clean

connect.mqtt.clean

boolean

true

connect.mqtt.keep.alive

The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message.

int

5000

connect.mqtt.client.id

Contains the Mqtt session client id

string

connect.mqtt.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.mqtt.retry.interval

The time in milliseconds between retries.

int

60000

connect.mqtt.max.retries

The maximum number of times to try the write again.

int

20

connect.mqtt.retained.messages

Specifies the Mqtt retained flag.

boolean

false

connect.mqtt.converter.throw.on.error

If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.

boolean

false

connect.converter.avro.schemas

If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter

string

connect.mqtt.kcql

Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics

string

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

connect.mqtt.ssl.ca.cert

Provides the path to the CA certificate file to use with the Mqtt connection

string

connect.mqtt.ssl.cert

Provides the path to the certificate file to use with the Mqtt connection

string

connect.mqtt.ssl.key

Certificate private [config] key file path.

string

Migration

Version 9 introduces two breaking changes that affect how you route data and project fields in KCQL. Review the sections below and update your configurations before restarting the connector.

WITHTARGET is no longer used

Before (v < 9.0.0)

After (v ≥ 9.0.0)

INSERT INTO SELECT * FROM control.boxes.test WITHTARGET ${path}

INSERT INTO ${path} SELECT * FROM control.boxes.test PROPERTIES('mqtt.target.from.field'='true')

Migration step

  1. Delete every WITHTARGET … clause.

  2. Move the placeholder (or literal) that held the target path into the INSERT INTO expression.

  3. Add mqtt.target.from.field=true to the KCQL PROPERTIES list.

KCQL field projections are ignored

Before (v < 9.0.0)

After (v ≥ 9.0.0)

Handled directly in the KCQL SELECT list: SELECT id, temp AS temperature …

The connector passes the full record. Any projection, renaming, or value transformation must be done with a Kafka Connect Single Message Transformer (SMT), KStreams, or another preprocessing step.

Migration step

  • Remove field lists and aliases from KCQL.

  • Attach an SMT such as org.apache.kafka.connect.transforms.ExtractField$Value, org.apache.kafka.connect.transforms.MaskField$Value, or your own custom SMT to perform the same logic.

The connector supports .

tutorials
Error policies