Kafka mesh filter in Envoy

  1. Kafka producers (Java, Python, etc.) send records to Envoy (the filter contains enough code to establish communication, first it needs to advertise itself it’s a Kafka server through api-versions + metadata requests).
  2. Kafka-mesh filter instance receives the records, parses them, forwards them to rules which decide where the records should go.
  3. The records are forwarded to producers dedicated for each of clusters.
  4. librdkafka producers send the records, receive the confirmations, and notify the filter that the response is ready to be sent.
entities involved in processing a single produce request

Kafka requests supported

To allow basic communication with downstream Kafka producers, we need to implement support for following requests:

  • api-versions — to let the client know which request versions the “Envoy broker” is capable of understanding;
  • metadata — to provide upstream clients with information where the partition leaders are present (in this case Envoy instance is going to be a partition leader for all of them, as we want to receive all requests there);
  • produce — to capture the incoming records, forward them upstream to correct clusters, and send the response offsets to downstream clients.

Example deployment

Deploying Kafka mesh filter is very simple — we need to provide the following configuration pieces:

  • advertised address (through advertised_host and advertised_port properties) — analogous to Kafka broker configuration (advertised.listeners ) we need to make sure that clients will come to Envoy with all their connections;
  • list of upstream Kafka clusters (upstream_clusters )— with bootstrap_servers property telling us where to connect, partition_count to provide number of partitions for each topic hosted in that cluster (as we need to provide that information to downstream Kafka producers), and any custom producer configuration in producer_config ;
  • forwarding rules (forwarding_rules) — that bind topic name prefixes with target clusters where records should be sent to.
  • c1 that’s accessible viakafka1n* addresses, with all topics hosted there having 1 partitions
  • c2 that’s accessible via kafka2n* addresses, with all topics hosted there having 1 partition;
  • c3 that’s accessible via kafka3n* addressed, with all topics hosted there having 5 partitions; the producers that will use this cluster will also use custom values foracks and linger.ms properties.
  • topics starting with app use cluster c1,
  • topics starting with a use cluster c2,
  • topics starting with b use cluster c3.
  • records for topics apples , app-something will go to c1,
  • records for topics apricots , analogue will go to c2,
  • records for topics bananas will go to c3,
  • records for topics cherries will not be forwarded anywhere (the metadata for this topic is going to be empty, what should cause a producer exception; however if records are actually received (what should never happen) then the connection will be closed).
example config

Future development — consumers

Right now, there’s no support for consumers. Kafka consumers by definition keep some state, so we already need to make a decision at this stage — how should multiple consumers requesting the same filter be treated?

  • One choice is to have a shared consumer per upstream cluster — in that case a single consumer instance would service multiple FetchRequests. This implies that messages would be distributed across multiple connections from downstream (something similar is done by Kafka REST proxy).
    A single interaction from downstream (e.g. a request to fetch messages for topics ‘apples’ and ‘bananas’ would interact with consumer dedicated for cluster 1 (which handles all topics named ‘app*’) and its analogue for cluster 2.
    This behaviour would be somewhat similar to Kafka’s consumer groups — the messages would be distributed across all subscribers, when they poll for new stuff — but without any partition-tracking/rebalance mechanism, as trivial implementation might just serve the records on first come-first serve basis.
  • The alternative is to create consumer(s) per connection from downstream (effectively a connection’s attribute). That would mean that every connection from downstream will need to have its own Kafka consumer “attribute”.

Future development — producers

Currently, to provide mesh capability, we decompose the received record batch from downstream producers into separate records, and then submit them to upstream-facing ones. As the Kafka brokers append batches of records, this means that we could have a race condition, when multiple consumers’ original batches get interwoven.
The work involved here could be to figure out librdkafka’s internals and how to submit a whole pre-formatted batch, instead of using the typical send API.

Future development — state tracking

In current version we need to pass configuration properties to Envoy to let it know how many partitions are present in a topic (in metadata request handling). This could be achieved through embedding Kafka admin-client inside Envoy and periodically refreshing its state.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store