Kafka consumer proxy filter in Envoy
Following up with ideas discussed in original Kafka-Envoy protocol filter issue, I finished initial development of record-distributing Kafka consumer proxy filter in Envoy. It is now available in contrib builds.
Right now, we allow downstream Kafka consumers to connect to Envoy, which then acts as a proxy for collection of Kafka clusters.
Envoy receives records and then distributes them among the downstream connections (what implies that it is stateful).
How the filter works is pretty simple:
- Native Kafka consumers request data from Envoy, by reaching out to the mesh-filter (the communication here is the same as if the client had been reaching out to an ordinary Kafka cluster).
- Fetch requests that represent clients’ interest in data are matched by the records that have been received from upstream Kafka clusters — and if there are no records to be matched, they are kept until they time out.
- In parallel, Envoy consumes records from multiple upstream Kafka clusters, and distributes them among all the awaiting Fetch requests.
Which cluster is being consumed from depends on the filter configuration (forwarding rules, analogous to the producer case).
Kafka consumer — downstream perspective
Kafka consumer finds out which broker should receive the requests by sending the Metadata requests. They are used in the beginning of consumer operation (and are also used to handle situations when cluster topology changes due to partition leader migrations).
Next, the consumer needs to know its position, so it can potentially send a ListOffset request with the partitions it is interested in.
Kafka consumers operate in a pull model by continuously sending Fetch requests to partition leaders which contain the records (this implies that Kafka servers do not notify servers that new records available — all of the communication is just continuous requests for more data).
The Fetch request contains the partitions we want to consume from, the start offset (which is increased as we keep reading, and can be modified by e.g. consumer’s offset API), min/max limits on number of bytes received (allowing Kafka to tune its performance) and a timeout (which allows a Kafka broker to optimise traffic by sending more records in a single response.
Envoy — upstream perspective
The initial communication from the downstream consumer follows the same model as with the producers — the Envoy instance is going to pretend that it actually is a Kafka broker, and is capable of handling consumer-related requests — this is done with ApiVersions and Metadata requests.
When a ListOffsets request is received for any partition, Envoy is always going to respond with a dummy value of 0.
As the filter distributes the records across multiple downstream connections, there is currently no point in pretending that consumers have any kind of control over which records are going to be received.
The consumer code is resilient enough to handle the situation when records received do not start with the offset requested, as the same situation could have happened even during normal operation (e.g. due to partition segments being finally removed).
Handling of he Fetch request provides core of the filter functionality.
The request contains the partitions that the client want to receive the data from, the timeout, and maximum response size. There is also the offset information for each of the partitions, but we are going to ignore it, as the record distribution is done by the filter instance (so downstream’s request for particular offsets are just going to be ignored).
When the Fetch request arrives, one of the following can happen:
- there are stored records that match the request’s specification, what allows the filter to send a Fetch response immediately;
- there is nothing in the buffer, so the Fetch request is stored until the records appear or the timeout happens (just like it would have happened in ordinary deployment when reading from the end of partition).
To get the records from the upstream clusters, Envoy consumer proxy uses librdkafka to create embedded consumers.
The embedded consumers get created when we receive a Fetch request for a partition we are not consuming from — the extra configuration for the librdkafka consumer is processed in an analogous way to the producer code.
The embedded consumers keep poll()-ing for new records as long as there is interest in those records — what actually means if there are any Fetch requests that are interested in records for these partitions.
In case of a degenerate scenario, when a record is received but there are are no Fetch requests waiting for it (because e.g. it timed out before records were received), the record gets stored to be later passed to the next inbound Fetch request — these records are then going to be used when handling a new matching request.
This scenario is easy to replicate if the embedded consumer can receive multiple records in one go (e.g. 100), while the downstream consumers are interested in a few records only.
In short, the following invariant always holds true: for a given partition, it either has Fetch callbacks waiting for records, or it has records to be passed to future Fetch callbacks — but never both at the same time.
As every new connection from downstream clients creates a new mesh filter instance, the entity maintaining Fetch callbacks and unconsumed records is a singleton.
Future development
Current implementation described above is pretty basic, and could be improved along the following lines:
- Fetch request’s data handling — right now the decisions when the response is ready to be sent downstream are very inflexible, with constant number of records to be collected;
- the same applied to Fetch timeout handling (fetch.max.wait.ms) or other consumer configuration such as max.partition.fetch.bytes — the parameters that are present in the incoming request could be used to improve the filter’s behaviour;
- when a record is received from upstream Kafka cluster, we select the matching Fetch request by iterating over the vector of callbacks — what results in records being delivered first to the oldest request — a different kind of structure (e.g. mapping callback to number of requests already received) could be used to make it more configurable (and/or fair, if wanted).
Also, this functionality is a very opinionated take on how Kafka could be accessed — the other alternative would be to develop a proper upstream connection (instead of relying on the embedded librdkafka producers/consumers) and do the upstream communication as needed.
In a trivial approach, a single downstream Fetch would map to one or more Fetches going to upstream clusters, and their responses combined in a fashion similar to what SharedConsumerManagerImpl does.
Obviously, before this could be done, we’d need to get the upstream connections ready, to the correct nodes — so the proper implementation of ApiVersions / Metadata requests would need to follow at a minimum (ignoring encryption / authentication features).
So long story short, there is still a lot that can be developed!