Kafka onboarding primer
This article quickly summarizes what is Kafka and what features it brings.
It is intended as an initial onboarding document for teams that might be interested in Kafka as their data streaming / messaging solution.
Kafka is a streaming solution for sending and receiving records. The records are stored in Kafka topics that are hosted by Kafka clusters. The topics are divided into partitions that are the basic unit of work division for producers and consumers.
On this level, Kafka is nothing special — it’s yet another solution for sending some messages upstream and then receiving them downstream — things that could be expected of any messaging solution.
However Kafka provides some features that lean towards higher adoption in the industry, such as:
- high performance — achieved by Kafka’s zero-copy implementation and no need for record translation (when a broker receives the records they are mem-copied as-is, without any transformation);
- re-reading capability — being closer to a remote record array than a pure messaging solution, a consumer can re-read the same record multiple times;
- durability — a single partition can be replicated across multiple clusters in an installation (replication factor); given that records do not disappear after being consumed (unlike “pure” messaging like AMQP) there are cluster-level time- and size-based policies responsible for deleting the old data.
To follow up with what is said on the official site, Kafka can be used in various ways.
The first obvious one is messaging — where records are messages, queues are topics/partitions, and the concept of having a message consumed is expressed by a consumer’s offset.
The following usecases lean towards a thick data pipeline, making good use of Kafka’s high throughput capabilities:
- website activity tracking — where we might want to use separate activity types;
- metrics pipeline — all services in our stack would use a Kafka producer to send metrics/health/topology information to Kafka, which would then be consumed by aggregation/plotting services;
- log aggregation — instead of working with files and having them collected by sidecar services, the services themselves could emit the logs directly to Kafka; also, as Kafka is durable, the logs could be persisted there — with e.g. various services / log types being stored in different topics, optimizing access;
- external commit/audit log — a transaction log of deltas applied to some external system — Kafka’s durability, persistence and seek capability allow for easy access to history based on an offset, timestamp or some other partitionable attribute.
Given that Kafka is so successful, it has spawned a rich ecosystem around itself:
- official Apache Kafka clients — Java client, librdkafka (C & C++ client);
- wrappers / frameworks: spring-kafka, quarkus-kafka, alpakka-kafka, smallrye connector;
- Kafka-streams: Kafka Streams library provides an easy way of performing typical stream operations like map, filter, join, group-by, being capable of emitting the results to Kafka again or to another sink;
- Kafka Connect: a framework service for defining source and sink connectors that allows us to move data into and out of Kafka — the example connectors are Redis or Postgres, and it is possible to write custom ones.
Kafka installation (called a cluster) is composed of 1+ Kafka brokers that store the partitions.
A single Kafka topic is composed of 1+ partitions. The partitions can be replicated across the brokers in the cluster (what gives us increased reliability in case of broker loss).
In the above diagram, we see a very simple Kafka cluster composed of 3 brokers. The partitions are distributed across the brokers, with partition leaders (that are the replication sources) marked in yellow. The replication from implemented by the following broker continuously requesting the data from the leader.
Kafka topic and partition
A Kafka topic is composed of 1+ partitions, with partitions being the primary work-assignment unit for Kafka producers and consumer.
Kafka topics can be created manually (by using the shell tools or Admin client), or automatically when producers send records to partitions or consumers start consuming from them (impl detail: when they request the partition metadata the server can create them).
As mentioned in my other article, a Kafka partition can be treated as an append-only remote record array:
- producers append records only at the end of partition,
- consumers can read records from any offset thanks to seek API (which is similar to any file-based seek operation).
A Kafka record is basically a tuple of key (sometimes used by Producers to select a topic partition during send operations), value and a collection of headers.
In case of received records, we also receive timestamp information.
Kafka records are received and stored by Kafka brokers as-is, what improves the performance, especially with zero-copy implementation.
Java Kafka API provides some simple serialization support for keys and values (e.g. String or ByteArray serializers).
There are also more complex solutions such as support for Avro or Protobuf — the users can also decide to write their own implementation (what just requires to implement the corresponding Serializer and Deserializer interfaces).
Topic compaction is a cluster-side mechanism that makes sure that for a given single partition, only the most recent record with a given key persists.
As it is invoked periodically on the server-side, the application might still receive the “old” messages that have not yet been compacted — so using compaction as a mechanism to simulate “already consumed records” by resending message with the same id might need to be aware of this.
A Kafka producer is appends the records to the end of partitions.
The default Partitioner implementation uses a record key in that case:
- if partition provided explicitly — use the partition provided,
- if key present — use hash(key) % partition count,
- if no partition nor key present — use the same partition for a single batch.
With a partitioner we can make sure that all messages with specific characteristics (such as a key) always end up in a particular partition, and then use this knowledge while setting up consumers.
It is important to note that the partitioning schemas that depend on number of partitions might be impacted if the number of partitions changes — some migration steps might be implemented on the consumer side if that is the case.
During the send process, producer communicates only with a partition leader, but it is possible to customize the guarantees received with acks property:
- leader replica only (acks = 1) — the record gets appended only to leader’s log, with replication happening in parallel;
- all replicas (acks = all / -1) — producer returns only after all required replicas have received the record;
- no confirmation (acks = 0) — with no guarantees, in this particular case the producer does not receive the offset information.
In case of acks=all, there is also a broker/topic property min.insync.replicas that can used to fine-tune the guarantees.
Kafka producer is capable of idempotent production of messages, which is achieved by it keeping an internal sequence number that gets reconciled with the broker to prevent double-writes.
A Kafka consumer is responsible for polling records from the partitions it is assigned to.
Partition assignment can be either explicit (assign API) or can use consumer group management (subscribe API, with the requirement that consumer specifies the group.id property).
When the consumer is constructed, we can specify the offset that should be initially used if there is nothing stored in Kafka (see Consumer offset storage in Kafka section) — auto.offset.reset: to start reading from the beginning, end, or throw.
The consumer provides a simple seek API (which is quite similar to any file-based one) — this allows the consumer to receive records from any position in a partition, allowing for possible re-reading of the same record. It is possible to seek to a specific offset, beginning/end of partition; consumer also provides a way to transform a record timestamp to an offset).
Internally, the consumer just uses this offset with every Fetch request it sends to the upstream while requesting the records.
It is important to note that Consumers do not have any key-awareness — it is not possible to easily get e.g. “all records with key X” — a typical implementation would be to get all records from partition (or partitions, depending on the partitioning schema) that contains these records and then filter out the non-matching ones.
Consumer groups is a Kafka mechanism that allows for automatic distribution of partitions across consumer group members.
To use this feature, we need to pass the group.id parameter to the consumer and use subscribe API. As a result, the topic partitions will be distributed among all consumer group members (obviously, if there are more members than partitions then some consumers will get nothing — increasing number of partitions or decreasing number of consumers might be warranted then).
Kafka provides auto-balancing if group members join or die (this is driven by internal heartbeat process). The consumers must periodically invoke poll() (max.poll.interval.ms) to let the cluster know that they are still alive — this might be relevant in case of very long processing scenarios.
The partition assignment strategy is configurable with partition.assignment.strategy property (impl detail: this means that all members of the group should have the same strategy, as any of them could be requested to compute the assignments — the assignments are not computed by brokers).
In general, this feature is excellent when the records produced to a topic can be processed by any consumer — as we can easily scale out the usecase by adding more partitions and consumers.
Having some kind of affinity might warrant extra attention to partitioning / assignment strategies.
It should be noted that it is not necessary to use subscription API — a consumer with a group id can use assign API just as well, and then use Kafka’s offset storage capability (see below).
Consumer offset storage in Kafka
A consumer that is a member of a consumer group can store its offsets in Kafka (in an internal topic __consumer_offsets). What is stored is effectively a tuple
(group.id, partition, offset) that can be read e.g. during consumer initialization.
Storing the offset has an influence on record delivery semantics:
- at least once — if we commit offset after processing (as our consumer could fail before committing),
- at most once — if we commit offset before processing (as our consumer could fail after committing but before we do processing).
If we need exactly-once delivery semantics, we might need to store the consumer offset in an external system — the same that keeps the transaction for record processing.
A special case of this is when the target system is also Kafka (i.e. we receive a record from Kafka, process it, and send records to Kafka) — see the below Transactions section for details.
Apart from idempotence, Kafka producer is capable of transaction support, allowing for sending multiple records to multiple partitions in the scope of single transaction.
Additionally, there is an API to make the producer store consumer group offsets, what allows us to create consume-process-produce pipelines, where we consume with a Kafka consumer, do transformations, and then send to Kafka — all within a scope of a single transaction.
On the consumer side, consumers can also specify whether they want to receive only committed records (isolation.level, by default reading uncommitted messages) — it should be noted that due to returning messages in offset order, this can block a consumer until transactions finish.
Kafka Java library also provides an Admin object that can be used for performing management operations on Kafka, such as:
- creating, altering and deleting topics,
- deleting records from a partition,
- altering consumer group offsets.
Some of these APIs might require a cluster to be running a particular version of Kafka.
Admin/Producer/Consumer have a metrics() method that provides access to low-level metrics (e.g. underlying request rate, bytes-in, bytes-out, connection-rate).