(deprecated) Optimizing KafkaMirrorMaker2 translated offset resolution for cluster failover scenarios

Adam Kotwasinski
3 min readNov 24, 2020

--

This article is deprecated as MM2 provides this feature as of Kafka 2.7 (KIP-545).

KafkaMirrorMaker2 provides us with a tool to replicate messages from source to target cluster, as well as consumer group offsets.

When it comes to consumer group offsets, KMM2 periodically translated offsets to checkpoints topic (named ${prefix}.checkpoints.internal).
By using RemoteClusterUtils.translateOffsets, the consumers running in the target cluster can resolve the offsets that have been saved by consumers in source cluster, and continue from that point. This allows fine-grained control where the consumers should seek to, and given that this information is kept independently from __consumer_offsets, it never gets overwritten by consumers.

However, there are some minor drawbacks with this approach:

  • application code needs to contain custom logic that decides when the consumer needs to use (normal) saved group offsets, or the data from checkpoint topic;
  • to resolve the translated offsets, the application needs to pull in connect-mirror-client dependency;
  • the process of figuring out offset for a group-partition pair involves reading all messages in the checkpoint topic, what might be wasteful in environments where there are multiple consumers with different group ids (as we would be reading the same messages over and over).

Cluster failover
If we want to use KMM2 for cluster failover scenario, we could optimize out some of these problems.

Let’s assume we are going to useDefaultReplicationPolicy, or any other policy that is going to keep replicated messages in a separate, dedicated topic (we do this to avoid impacting offsets on other topics).

During the failover process we could perform the following steps:

  1. stop all the consumers in source clusters,
  2. perform our work — rewrite data stored in checkpoint topics into __consumer_offsets in target cluster,
  3. start the consumers in target clusters, using the same group ids, consuming from replica partitions.

As the consumers in target cluster are going to have the same group ids as consumers in source cluster, they are going to continue from offsets that have been written by us in step 2.

Implementation pseudocode
The below snippet is not production-quality (does not handle situation when checkpoint records expire while they are being read, or connectivity problems), but could act as a base for tool to be executed during the failover:

  1. in computeConsumerGroupOffsets we read all messages present in checkpoint topic, so we have the most recent offsets for each consumer group-partition pair;
  2. in updateConsumerGroupOffsets, we update the target cluster’s consumer group offsets using AdminClient’s alterConsumerGroupOffsets

We only need to connect to the target cluster, as it stores all the necessary information (as it had been inserted by KMM2).

Disclaimers
It should be noted that this solution is based on multiple assumptions:

  • the consumers in target cluster are NOT active when translation is happening (alterConsumerGroupOffsets requires groups to be empty to succeed);
  • the producers have stopped producing to topics and all messages have replicated before we run our tool (as we get endOffsets only once);
  • the replication policy ensures that replicated records end up in its own dedicated topic (otherwise us modifying consumer group offsets could also skip over locally-produced messages);
  • as a good practice, the consumers should remember to configure auto.offset.reset property — we have inserted the offset data, but the records in replica partitions might have been deleted in the meantime (e.g. by user action or due to Kafka’s time/volume-based retention policies).

Possible further work
Putting the above code into KMM2’s fork so that it alters consumer groups’ offsets (instead of using the checkpoint topic as an intermediary) should be relatively simple — MirrorCheckpointTask.poll could act as a starting point for an investigation (just this time instead of storing records in Kafka, we’d need to perform an action for each of them — batching could be discussed as well, as we’d be making an alter request per commit in source cluster).

--

--