Kafka mesh filter in Envoy

Following up with ideas discussed in original Kafka-Envoy protocol filter issue, I finished initial development for Kafka-mesh filter in Envoy. It is now available in contrib builds.

Right now, we allow downstream producers to send records to Envoy instance, which acts as a proxy for collection of Kafka clusters.
The received records are then forwarded to the upstream Kafka clusters according to filter’s configuration.

From the downstream producers’ point-of-view, the flow is pretty simple:

  1. Kafka producers (Java, Python, etc.) send records to Envoy (the filter contains enough code to establish communication, first it needs to advertise itself it’s a Kafka server through api-versions + metadata requests).

Each of Envoy’s worker threads maintains a collection of Kafka producers connected to upstream Kafka clusters. This means that records from downstream producers that are handled by the same Envoy worker will be processed by the same producer.

entities involved in processing a single produce request

If the producer has not yet been initialized, it is going to be created with custom configuration provided to the filter. This allows us to set up custom attributes to e.g. increase linger time so the upstream-pointing producer can have improved throughput.

The forwarding rules present in configuration let the filter make the decision which upstream cluster should receive the records. Right now, for each of the received records we take a look at its target topic name and use a cluster that’s referenced by the first rule with a matching prefix.

Kafka requests supported

To allow basic communication with downstream Kafka producers, we need to implement support for following requests:

  • api-versions — to let the client know which request versions the “Envoy broker” is capable of understanding;

Example deployment

Deploying Kafka mesh filter is very simple — we need to provide the following configuration pieces:

  • advertised address (through advertised_host and advertised_port properties) — analogous to Kafka broker configuration (advertised.listeners ) we need to make sure that clients will come to Envoy with all their connections;

In the below example, configuration references three upstream clusters:

  • c1 that’s accessible viakafka1n* addresses, with all topics hosted there having 1 partitions

To make a decision which upstream cluster to use, we simply look at records’ topic names and pick the first cluster that has the matching topic prefix.

  • topics starting with app use cluster c1,

we are going to get these results:

  • records for topics apples , app-something will go to c1,
example config

Also, we can see that in this example we are putting Kafka-broker-filter in front of Kafka-mesh-filter. As Kafka-mesh-filter performs the role of Kafka broker in this deployment, it is possible to capture request/response metrics.

Kafka clusters do not require custom configuration apart from ensuring that topics hosted in these clusters have the number of partitions matching the Envoy configuration. An easy way to achieve this would be to use the num.partitions property.

Future development — consumers

Right now, there’s no support for consumers. Kafka consumers by definition keep some state, so we already need to make a decision at this stage — how should multiple consumers requesting the same filter be treated?

  • One choice is to have a shared consumer per upstream cluster — in that case a single consumer instance would service multiple FetchRequests. This implies that messages would be distributed across multiple connections from downstream (something similar is done by Kafka REST proxy).
    A single interaction from downstream (e.g. a request to fetch messages for topics ‘apples’ and ‘bananas’ would interact with consumer dedicated for cluster 1 (which handles all topics named ‘app*’) and its analogue for cluster 2.
    This behaviour would be somewhat similar to Kafka’s consumer groups — the messages would be distributed across all subscribers, when they poll for new stuff — but without any partition-tracking/rebalance mechanism, as trivial implementation might just serve the records on first come-first serve basis.

Given that both choices make sense (they just handle different use cases) it might be preferable to implement them both and leave the behaviour up to configuration. In both cases the downstream users will need to participate in discussion about Envoy’s consumers configuration (for example consumer groups could be configured in filter).
In any case, the configurable buffer size might be necessary to keep some messages (in Envoy) so they can be served faster (or actually if we receive N messages from upstream, but downstream wanted only one).

Future development — producers

Currently, to provide mesh capability, we decompose the received record batch from downstream producers into separate records, and then submit them to upstream-facing ones. As the Kafka brokers append batches of records, this means that we could have a race condition, when multiple consumers’ original batches get interwoven.
The work involved here could be to figure out librdkafka’s internals and how to submit a whole pre-formatted batch, instead of using the typical send API.

Another thing is that records headers need to be captured — right now we forward only record key and value.

Future development — state tracking

In current version we need to pass configuration properties to Envoy to let it know how many partitions are present in a topic (in metadata request handling). This could be achieved through embedding Kafka admin-client inside Envoy and periodically refreshing its state.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store