LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
      • Google BigQuery
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Creating your Kafka Connector Configuration
  • Managing your Connector

Was this helpful?

Export as PDF
  1. Tutorials

Creating & managing a connector

This page describes managing a basic connector instance in your Connect cluster.

Creating your Kafka Connector Configuration

To deploy a connector into the Kafka Connect Cluster, you must follow the steps below:

  1. You need to have the Jars in your Kafka Connect Cluster plugin.path

  2. Each connector has mandatory configurations that must be deployed and validated; other configurations are optional. Always read the connector documentation first.

  3. You can deploy the connector using Kafka Connect API or Lenses to manage it for you.

Sample of Connector, AWS S3 Sink Connector from Stream Rector:

# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=3
name=sink-s3-orders-stream-reactor
topics=orders

# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000

Let's drill down this connector configuration and what will happen when I deploy:

# connector configuration
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=1
name=sink-s3-orders-stream-reactor
topics=orders
  1. connector.class is the plugin we will use.

  2. task.max is how many tasks will be executed in the Kafka Connect Cluster. In this example, we will have 1 task; my topic has 9 partitions, so in this case, in the consumer group of this connector, we will have 1 instance. This one task will consume 9 partitions. To scale is just to increase the number of tasks.

  3. name of the connector on the Kafka Connect Cluster must be a unique name for each Kafka Connect Cluster.

  4. topics the topic name will be consumed, and all data will be written in AWS S3, as our example describes.

# converter configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
  1. value.converter is the format type used to deserialize or serialize the data in value. In this case, our value will be in json format.

  2. key.converter is the format type used to deserialize or serialize the data in key. In this case, our key will be in string format.

  3. key.converter.schemas.enable is a field where we tell Kafka Connect if we want to include the value schema in the message. in our example, false we don't want to include the value schema.

  4. value.converter.schemas.enable is a field where we tell Kafka Connect if we want to include the key schema in the message. in our example, false we don't want to include the key schema.

# specific connector configuration for S3
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.access.key=****
connect.s3.aws.secret.key=****
connect.s3.aws.region=****
connect.s3.kcql=insert into `owshq-topics:stream` select * from orders STOREAS `json` WITH_FLUSH_COUNT = 200 WITH_FLUSH_SIZE = 200 WITH_FLUSH_INTERVAL = 1000
  1. connect.s3.aws.auth.mode is what is the type of authentication we will use to connect to the AWS S3 bucket.

  2. connect.s3.aws.access.key is the access key to authenticate into the AWS S3 bucket.

  3. connect.s3.aws.secret.key is the secret key to authenticate into the AWS S3 bucket.

  4. connect.s3.aws.region which region the AWS S3 bucket is deployed.

  5. connect.s3.kcql We use the Kafka Connect Query for configuration, bucket name, folder, which format will be stored, and frequency of adding new files into the bucket.

Managing your Connector

After deploying your Kafka Connector into your Kafka Connect Cluster, it will be managed by the Kafka Connect Cluster.

To better show how Kafka Connect manages your connectors, we will use Lenses UI.

The image below is the Lenses Connectors list:

In the following image, we will delve into the Kafka Connector details.

  • Consumer Group, when we use Lenses, we can see which consumer group is reading and consuming that topic.

  • Connector tasks, we can see which tasks are open, the status, and how many records are in and out of that topic.

PreviousBackup & RestoreNextCloud Storage Examples

Last updated 8 months ago

Was this helpful?

For deep configuration of AWS S3 Sink connect, click

here
Simple flow of our connector reading from Kafka and writting into S3