Managing Kafka Offsets when using the Kafka Consumer input adapter

Managing Kafka Offsets when using the Kafka Consumer input adapter

book

Article ID: KB0076903

calendar_today

Updated On:

Products Versions
TIBCO Streaming 10

Description

How can I guarantee the Kafka Consumer adapter reads from the last offset and does not skip any messages?

Issue/Introduction

Adapter settings and considerations

Resolution

Summary:

Have each Kafka Consumer adapter store the lowest fully-processed topic, partition, and offset in a persistent store like a disk-based Query Table, a Query Table in Transactional Memory, or JDBC, and when subscribing to the Kafka topic set the command tuple to use these values:
  command = subscribe
  topic = topic-name
  pattern = null
  partition = null
  time = null
  offset = lowest remembered offset value for all partitions 
  clientId = null (instead set "client.id" Advanced Config adapter setting)
  brokers = null
  advancedConfig = null (instead set "client.id" Advanced Config adapter setting)


It is not possible to ensure at-least-once message processing by letting Kafka commit messages by itself using its own commit scheduling.

Discussion:

The Kafka bus by default sends up to 500 messages at a time to a client, and then automatically commits those messages as "read" and will not send them again unless specifically asked. A Kafka client may receive this group of 500 messages and then encounter an error before all messages are processed. The unprocessed messages are then lost if using "enable.auto.commit=true". Since the Kafka broker cannot know what messages the client has fully processed, this is why the client must be responsible for commits.

A Kafka Consumer client may use setting "enable.auto.commit=false" and perform commits at intervals, or may let Kafka commit at will but remember the offsets it needs to and specify these offsets when re-subscribing.

There is a problem when the Kafka client commits too frequently. When using "enable.auto.commit=false" the client must commit the messages it has read, but in practice we've seen that committing every message (for example, when using the Kafka Consumer Commit adapter) overloads the Kafka broker and will disconnect the client. The alternative is to commit after several tens or hundreds of messages, or once or twice within any one-second time interval. If an error occurs and the client reconnects, then it will receive duplicate messages that have already been processed, but were left uncommitted. This is often acceptable since either the client can keep a record of what has already been processed, or duplicate processing does not corrupt the business result (like updating a database row with identical data, so none of the values change).

The "subscribe" command summarized above is the alternative to have the client both remember what message offsets it processed, and when subscribing use the Kafka Consumer adapter control-stream to both subscribe to the Kafka topic and specify the "offset" where to start reading in the topic. Remembering across application errors is accomplished using a Query Table which is either disk-based or stored in a cluster's Transactional Memory so that the offset value can be retrieved when the client restarts.

Typically the Kafka topic is partitioned, so when a client subscribes it will be assigned to one or more partitions and each partition has its own offset. A client remembering the last read offset needs to remember the partition as well, and may need to subscribe to the lowest offset of any partition previously read from in order to not miss a message on any partition. In a well-functioning Kafka bus, the offsets for each of a topic's partitions will be very close to each other, often only different by one.

When a client is running in a cluster, then it is likely also taking advantage of load balancing. In this case every client instance will be running concurrently and will have the same Kafka "group.id" as the rest of the clients. In this case, when a Kafka client subscribes to a a topic the broker pauses all subscriptions and performs a "rebalance" which results in all client receiving a new assignment to the partitions. A client is not guaranteed to be given the set of partitions it had before the new client joined. This means that not only does each client need to remember which offset it has last fully processed for each partition, but needs to share that information with all the other clients. This information can be shared between clients continuously using a Query Table in Transactional Memory. This permits a client to verify the lowest offset read by all clients and subscribe from that offset, resulting in some duplicates (which can be verified as duplicate and dropped by querying the Query Table in Transactional Memory) and ensure no messages are missed regardless of which client instance processed messages from that Kafka partition previously.

When using this strategy in a cluster, instead of relying on committing offsets back to Kafka, the clients record offsets into the Query Table in Transactional Memory. The clients still need to use the Kafka Consumer Commit adapter, but infrequently.

A typical set of Advanced Config settings in the Kafka Consumer, Advanced Options tab, are (example):
    auto.offset.reset = latest
    client.id = Streaming + getNodeName()
    enable.auto.commit = false
    group.id = StreamingClients

All clients in the same group must use the same Advanced Config, differing only in the value used for "client.id".

With these settings, the first client in the group will read from the latest available message in the topic and following messages. The first client will also be assigned to read from all topic partitions. The second client will read from the commit offset for any partitions it is assigned after the Kafka rebalance since once a group has subscribed the "auto.offset.reset" setting is ignored. Since all clients have the same "group.id" Kafka will assign a subset of the partitions to each client. Each client is named according to their Streaming cluster node-name, allowing the Kafka clients to distinguish themselves to the Kafka server and typically get a stable set of partitions before and after each rebalance.

When a Kafka client disconnects or unsubscribes, Kafka performs another rebalance to re-assign all partitions to the remaining clients. In this case, Kafka will proceed to deliver messages from the last uncommitted offset, re-sending some set of message, which will lead to some duplicates but no missed messages. For this reason it is still important to use the Kafka Consumer Commit adapter to commit offsets at intervals even if the clients are remembering offsets themselves.

Note that it is possible for a given Kafka Consumer to subscribe to multiple topics, each with their own partitioning and offsets. In this case, keep track of the topic, partition, and offset together if self-managing offsets and commits. It will not be possible to guarantee at-least-once processing if allowing Kafka to manage the commits itself.