(deprecated) Optimizing KafkaMirrorMaker2 translated offset resolution for cluster failover scenarios

This article is deprecated as MM2 provides this feature as of Kafka 2.7 (KIP-545).

KafkaMirrorMaker2 provides us with a tool to replicate messages from source to target cluster, as well as consumer group offsets.

When it comes to consumer group offsets, KMM2 periodically translated offsets to checkpoints topic (named ${prefix}.checkpoints.internal).
By using RemoteClusterUtils.translateOffsets, the consumers running in the target cluster can resolve the offsets that have been saved by consumers in source cluster, and continue from that point. This allows fine-grained control where the consumers should seek to, and given that this information is kept independently from __consumer_offsets, it never gets overwritten by consumers.

However, there are some minor drawbacks with this approach:

  • application code needs to contain custom logic that decides when the consumer needs to use (normal) saved group offsets, or the data from checkpoint topic;

Cluster failover
If we want to use KMM2 for cluster failover scenario, we could optimize out some of these problems.

Let’s assume we are going to useDefaultReplicationPolicy, or any other policy that is going to keep replicated messages in a separate, dedicated topic (we do this to avoid impacting offsets on other topics).

During the failover process we could perform the following steps:

  1. stop all the consumers in source clusters,

As the consumers in target cluster are going to have the same group ids as consumers in source cluster, they are going to continue from offsets that have been written by us in step 2.

Implementation pseudocode
The below snippet is not production-quality (does not handle situation when checkpoint records expire while they are being read, or connectivity problems), but could act as a base for tool to be executed during the failover:

  1. in computeConsumerGroupOffsets we read all messages present in checkpoint topic, so we have the most recent offsets for each consumer group-partition pair;

We only need to connect to the target cluster, as it stores all the necessary information (as it had been inserted by KMM2).

It should be noted that this solution is based on multiple assumptions:

  • the consumers in target cluster are NOT active when translation is happening (alterConsumerGroupOffsets requires groups to be empty to succeed);

Possible further work
Putting the above code into KMM2’s fork so that it alters consumer groups’ offsets (instead of using the checkpoint topic as an intermediary) should be relatively simple — MirrorCheckpointTask.poll could act as a starting point for an investigation (just this time instead of storing records in Kafka, we’d need to perform an action for each of them — batching could be discussed as well, as we’d be making an alter request per commit in source cluster).



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store