LogoLogo
HomeProductsDownload Community Edition
  • Lenses DevX
  • Kafka Connectors
  • Overview
  • Understanding Kafka Connect
  • Connectors
    • Install
    • Sources
      • AWS S3
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • GCP PubSub
      • GCP Storage
      • FTP
      • JMS
      • MQTT
    • Sinks
      • AWS S3
      • Azure CosmosDB
      • Azure Data Lake Gen2
      • Azure Event Hubs
      • Azure Service Bus
      • Cassandra
      • Elasticsearch
      • GCP PubSub
      • GCP Storage
      • HTTP
      • InfluxDB
      • JMS
      • MongoDB
      • MQTT
      • Redis
      • Google BigQuery
  • Secret Providers
    • Install
    • AWS Secret Manager
    • Azure KeyVault
    • Environment
    • Hashicorp Vault
    • AES256
  • Single Message Transforms
    • Overview
    • InsertFieldTimestampHeaders
    • InsertRecordTimestampHeaders
    • InsertRollingFieldTimestampHeaders
    • InsertRollingRecordTimestampHeaders
    • InsertRollingWallclock
    • InsertRollingWallclockHeaders
    • InsertSourcePartitionOrOffsetValue
    • InsertWallclock
    • InsertWallclockHeaders
    • InsertWallclockDateTimePart
    • TimestampConverter
  • Tutorials
    • Backup & Restore
    • Creating & managing a connector
    • Cloud Storage Examples
      • AWS S3 Source Examples
      • AWS S3 Sink Time Based Partitioning
      • GCP Source
      • GCP Sink Time Based Partitioning
    • Http Sink Templating
    • Sink converters & different data formats
    • Source converters with incoming JSON or Avro
    • Loading XML from Cloud storage
    • Loading ragged width files
    • Using the MQTT Connector with RabbitMQ
    • Using Error Policies
    • Using dead letter queues
  • Contributing
    • Developing a connector
    • Utilities
    • Testing
  • Lenses Connectors Support
  • Downloads
  • Release notes
    • Stream Reactor
    • Secret Providers
    • Single Message Transforms
Powered by GitBook
LogoLogo

Resources

  • Privacy
  • Cookies
  • Terms & Conditions
  • Community EULA

2024 © Lenses.io Ltd. Apache, Apache Kafka, Kafka and associated open source project names are trademarks of the Apache Software Foundation.

On this page
  • Connector Class
  • Example
  • KCQL support
  • Cache mode
  • Sorted Sets
  • Multiple Sorted Sets
  • Geospatial add
  • Streams
  • PubSub
  • Kafka payload support
  • Error policies
  • Option Reference

Was this helpful?

Export as PDF
  1. Connectors
  2. Sinks

Redis

This page describes the usage of the Stream Reactor Redis Sink Connector.

PreviousMQTTNextGoogle BigQuery

Last updated 8 months ago

Was this helpful?

Connector Class

io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector

Example

For more examples see the .

name=redis
connector.class=io.lenses.streamreactor.connect.redis.sink.RedisSinkConnector
tasks.max=1
topics=redis
connect.redis.host=redis
connect.redis.port=6379
connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM

KCQL support

You can specify multiple KCQL statements separated by ; to have a connector sink multiple topics. The connector properties topics or topics.regex are required to be set to a value that matches the KCQL statements.

The following KCQL is supported:

[INSERT INTO <redis-cache>]
SELECT FIELD, ...
FROM <kafka-topic>
[PK FIELD]
[STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]

Cache mode

The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:

{ "symbol": "USDGBP" , "price": 0.7943 }
{ "symbol": "EURGBP" , "price": 0.8597 }

You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:

SELECT price from yahoo-fx PK symbol

This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:

Key=EURGBP  Value={ "price": 0.7943 }

Composite keys are supported with the PK clause, a delimiter can be set with the optional configuration property connect.redis.pk.delimiter.

Sorted Sets

To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:

INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60

This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.

The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.

Multiple Sorted Sets

The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store in the sorted-sets. Set KCQL clause to define the filed using PK (primary key)

SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)

Notice we have dropped the INSERT clause.

The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:

INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60

This will create a key with names FX-USDGBP , FX-EURGBP etc.

The TTL statement allows setting a time to live on the sorted set. If not specified TTL is set.

Geospatial add

To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:

INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADD

Streams

To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:

INSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAM

PubSub

To insert a message from a Kafka topic to a Redis PubSub use the following KCQL syntax:

SELECT * FROM topic STOREAS PubSub (channel=myfield)

The channel to write to in Redis is determined by a field in the payload of the Kafka message set in the KCQL statement, in this case, a field called myfield.

Kafka payload support

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)

  • Schema.Struct and JSON

  • No Schema and JSON

Error policies

Option Reference

Name
Description
Type
Default Value

connect.redis.pk.delimiter

Specifies the redis primary key delimiter

string

.

ssl.provider

The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

string

ssl.protocol

The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.

string

TLS

ssl.truststore.location

The location of the trust store file.

string

ssl.keystore.password

The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

password

ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

string

ssl.truststore.password

The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

password

ssl.keymanager.algorithm

The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

string

SunX509

ssl.trustmanager.algorithm

The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

string

PKIX

ssl.keystore.type

The file format of the key store file. This is optional for client.

string

JKS

ssl.cipher.suites

A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.

list

ssl.endpoint.identification.algorithm

The endpoint identification algorithm to validate server hostname using server certificate.

string

https

ssl.truststore.type

The file format of the trust store file.

string

JKS

ssl.enabled.protocols

The list of protocols enabled for SSL connections.

list

[TLSv1.2, TLSv1.1, TLSv1]

ssl.key.password

The password of the private key in the key store file. This is optional for client.

password

ssl.secure.random.implementation

The SecureRandom PRNG implementation to use for SSL cryptography operations.

string

connect.redis.kcql

KCQL expression describing field selection and routes.

string

connect.redis.host

Specifies the redis server

string

connect.redis.port

Specifies the redis connection port

int

connect.redis.password

Provides the password for the redis connection.

password

connect.redis.ssl.enabled

Enables ssl for the redis connection

boolean

false

connect.redis.error.policy

Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically

string

THROW

connect.redis.retry.interval

The time in milliseconds between retries.

int

60000

connect.redis.max.retries

The maximum number of times to try the write again.

int

20

connect.progress.enabled

Enables the output for how many records have been processed

boolean

false

The connector supports .

tutorials
Error policies