LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
      • Google BigQuery
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Connector Class
  • Example
  • KCQL support
  • Payload support
  • Authentication
  • Fine-tunning the Kafka Connector
  • Option Reference

Was this helpful?

Export as PDF
  1. Connectors
  2. Sources

Azure Event Hubs

This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.

PreviousAzure Data Lake Gen2NextAzure Service Bus

Last updated 1 month ago

Was this helpful?

A Kafka Connect source connector to read events from Azure Event Hubs and push them to Kafka.

In order to leverage Kafka API in your Event Hubs it has to be at least on Standard Pricing Tier. More info .

Connector Class

io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector

Example

For more examples see the .

Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):

name=AzureEventHubsSourceConnector
connector.class=io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector
tasks.max=1
connect.eventhubs.kcql=INSERT INTO azureoutput SELECT * FROM inputhub;
connect.eventhubs.source.connection.settings.bootstrap.servers=MYNAMESPACE.servicebus.windows.net:9093
connect.eventhubs.source.connection.settings.sasl.mechanism=PLAIN
connect.eventhubs.source.connection.settings.security.protocol=SASL_SSL
connect.eventhubs.source.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING;EntityPath=inputhub";

KCQL support

You can specify multiple KCQL statements separated by ; to have the connector sink into multiple topics. However, you can not route the same source to different topics, for this use a separate connector instance.

The following KCQL is supported:

INSERT INTO <your-kafka-topic>
SELECT *
FROM <your-event-hub>;

The selection of fields from the Event Hubs message is not supported.

Payload support

As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.

Authentication

You can connect to Azure EventHubs passing specific JAAS parameters in configuration.

connect.eventhubs.connection.settings.bootstrap.servers=NAMESPACENAME.servicebus.windows.net:9093
connect.eventhubs.connection.settings.sasl.mechanism=PLAIN
connect.eventhubs.connection.settings.security.protocol=SASL_SSL
connect.eventhubs.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Fine-tunning the Kafka Connector

The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.connection.settings and when user specifies a property with it, it will be automatically passed to the Consumer.

User wants to fine-tune how much data records comes through the network at once. He specifies below property as part of his configuration for Azure Event Hubs Connector before starting it.

connect.eventhubs.connection.settings.max.poll.records = 100

It means that internal Kafka Consumer will poll at most 100 records at time (as max.poll.records is passed directly to it)

There are certain exceptions to this rule as couple of those are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:

  • client.id - Connector sets it by itself

  • group.id - Connector sets it by itself

  • key.deserializer - Connector transitions bytes 1-to-1

  • value.deserializer - Connector transitions bytes 1-to-1

  • enable.auto.commit - connector automatically sets it to false and checks what offsets are committed in output topic instead

Option Reference

Name
Description
Type
Default Value

connect.eventhubs.source.connection.settings.bootstrap.servers

Specifies the Event Hubs server location.

string

connect.eventhubs.source.close.timeout

Amount of time (in seconds) for Consumer to close.

int

30

connect.eventhubs.source.default.offset

Specifies whether by default we should consume from earliest (default) or latest offset.

string

earliest

connect.eventhubs.kcql

Comma-separated output KCQL queries

string

Learn more about different methods of connecting to Event Hubs on . The only caveat is to add connector-specific prefix like in example above. See Fine-tunning the Kafka Connector for more info.

here
tutorials
Azure website