Kafka mesh filter in Envoy

Following up with ideas discussed in original Kafka-Envoy protocol filter issue, I finished initial development for Kafka-mesh filter in Envoy. It is now available in contrib builds.

Right now, we allow downstream producers to send records to Envoy instance, which acts as a proxy for collection of Kafka clusters.
The received records are then forwarded to the upstream Kafka clusters according to filter’s configuration.

From the downstream producers’ point-of-view, the flow is pretty simple:

  1. Kafka producers (Java, Python, etc.) send records to Envoy (the filter contains enough code to establish communication, first it needs to advertise itself it’s a Kafka server through api-versions + metadata requests).
  2. Kafka-mesh filter instance receives the records, parses them, forwards them to rules which decide where the records should go.
  3. The records are forwarded to producers dedicated for each of clusters.
  4. librdkafka producers send the records, receive the confirmations, and notify the filter that the response is ready to be sent.

Each of Envoy’s worker threads maintains a collection of Kafka producers connected to upstream Kafka clusters. This means that records from downstream producers that are handled by the same Envoy worker will be processed by the same producer.

If the producer has not yet been initialized, it is going to be created with custom configuration provided to the filter. This allows us to set up custom attributes to e.g. increase linger time so the upstream-pointing producer can have improved throughput.

The forwarding rules present in configuration let the filter make the decision which upstream cluster should receive the records. Right now, for each of the received records we take a look at its target topic name and use a cluster that’s referenced by the first rule with a matching prefix.

Kafka requests supported

To allow basic communication with downstream Kafka producers, we need to implement support for following requests:

  • api-versions — to let the client know which request versions the “Envoy broker” is capable of understanding;
  • metadata — to provide upstream clients with information where the partition leaders are present (in this case Envoy instance is going to be a partition leader for all of them, as we want to receive all requests there);
  • produce — to capture the incoming records, forward them upstream to correct clusters, and send the response offsets to downstream clients.

Example deployment

Deploying Kafka mesh filter is very simple — we need to provide the following configuration pieces:

  • advertised address (through advertised_host and advertised_port properties) — analogous to Kafka broker configuration (advertised.listeners ) we need to make sure that clients will come to Envoy with all their connections;
  • list of upstream Kafka clusters (upstream_clusters )— with bootstrap_servers property telling us where to connect, partition_count to provide number of partitions for each topic hosted in that cluster (as we need to provide that information to downstream Kafka producers), and any custom producer configuration in producer_config ;
  • forwarding rules (forwarding_rules) — that bind topic name prefixes with target clusters where records should be sent to.

In the below example, configuration references three upstream clusters:

  • c1 that’s accessible viakafka1n* addresses, with all topics hosted there having 1 partitions
  • c2 that’s accessible via kafka2n* addresses, with all topics hosted there having 1 partition;
  • c3 that’s accessible via kafka3n* addressed, with all topics hosted there having 5 partitions; the producers that will use this cluster will also use custom values foracks and linger.ms properties.

To make a decision which upstream cluster to use, we simply look at records’ topic names and pick the first cluster that has the matching topic prefix.

  • topics starting with app use cluster c1,
  • topics starting with a use cluster c2,
  • topics starting with b use cluster c3.

we are going to get these results:

  • records for topics apples , app-something will go to c1,
  • records for topics apricots , analogue will go to c2,
  • records for topics bananas will go to c3,
  • records for topics cherries will not be forwarded anywhere (the metadata for this topic is going to be empty, what should cause a producer exception; however if records are actually received (what should never happen) then the connection will be closed).
example config

Also, we can see that in this example we are putting Kafka-broker-filter in front of Kafka-mesh-filter. As Kafka-mesh-filter performs the role of Kafka broker in this deployment, it is possible to capture request/response metrics.

Kafka clusters do not require custom configuration apart from ensuring that topics hosted in these clusters have the number of partitions matching the Envoy configuration. An easy way to achieve this would be to use the num.partitions property.

Future development — consumers

Right now, there’s no support for consumers. Kafka consumers by definition keep some state, so we already need to make a decision at this stage — how should multiple consumers requesting the same filter be treated?

  • One choice is to have a shared consumer per upstream cluster — in that case a single consumer instance would service multiple FetchRequests. This implies that messages would be distributed across multiple connections from downstream (something similar is done by Kafka REST proxy).
    A single interaction from downstream (e.g. a request to fetch messages for topics ‘apples’ and ‘bananas’ would interact with consumer dedicated for cluster 1 (which handles all topics named ‘app*’) and its analogue for cluster 2.
    This behaviour would be somewhat similar to Kafka’s consumer groups — the messages would be distributed across all subscribers, when they poll for new stuff — but without any partition-tracking/rebalance mechanism, as trivial implementation might just serve the records on first come-first serve basis.
  • The alternative is to create consumer(s) per connection from downstream (effectively a connection’s attribute). That would mean that every connection from downstream will need to have its own Kafka consumer “attribute”.

Given that both choices make sense (they just handle different use cases) it might be preferable to implement them both and leave the behaviour up to configuration. In both cases the downstream users will need to participate in discussion about Envoy’s consumers configuration (for example consumer groups could be configured in filter).
In any case, the configurable buffer size might be necessary to keep some messages (in Envoy) so they can be served faster (or actually if we receive N messages from upstream, but downstream wanted only one).

Future development — producers

Currently, to provide mesh capability, we decompose the received record batch from downstream producers into separate records, and then submit them to upstream-facing ones. As the Kafka brokers append batches of records, this means that we could have a race condition, when multiple consumers’ original batches get interwoven.
The work involved here could be to figure out librdkafka’s internals and how to submit a whole pre-formatted batch, instead of using the typical send API.

Another thing is that records headers need to be captured — right now we forward only record key and value.

Future development — state tracking

In current version we need to pass configuration properties to Envoy to let it know how many partitions are present in a topic (in metadata request handling). This could be achieved through embedding Kafka admin-client inside Envoy and periodically refreshing its state.

Software developer, Java mostly, C & C++ sometimes (https://www.linkedin.com/in/adam-kotwasinski/)