New Relic runs a large streaming platform that ingests and processes several petabytes of observability data per day. Core to this platform is Apache Kafka, a powerful open-source distributed event streaming technology. New Relic has been using Apache Kafka for over 10 years. During that time the team has learned how to maximize efficiency and reliability while also ensuring data is processed at low latency.

One of the keys to efficiently running Apache Kafka is appropriately tuning the Consumers, the pieces of your stream processing applications that receive data. To understand how to tune your Kafka Consumers you must first ensure you have the right instrumentation.

This article shows how to instrument Kafka Consumers, and then explains how to interpret those metrics to detect and remediate a few common issues.

Instrumentation

Before tuning your Kafka Consumer you need high quality instrumentation to diagnose their current performance and resource usage. This article is focused on tuning the Kafka Consumer provided by the official Apache Kafka Java Client because that’s primarily used at New Relic, but the insights and analysis apply to the Kafka Clients written in other languages too. The Apache Kafka Java Client tracks many key Kafka Consumer metrics and combined with the New Relic Java agent you can automatically collect those Kafka Consumer metrics, you can automatically collect those Kafka Consumer metrics. To make things even easier, those metrics are now visible in a curated New Relic Kafka UI for any services that use Kafka Clients. Here’s what that looks like:

New Relic Kafka UI screenshot

Note that for the Apache Kafka clients in other languages based on librdkafka you will need to follow that language's documentation on how to harvest the internal metrics. The librdkafka metrics also don’t match those produced by the Java client. Through the rest of this guide we’ll refer to the Java client metrics only and the New Relic Kafka UI above to inform tuning decisions.

Understanding Kafka Consumer fetch batching

The fetch rate of a Kafka Consumer is largely based on how records are grouped together in a fetch and how often you fetch, so let’s take some time to explain that mechanism. For a more gentle introduction to Kafka, try the Confluent documentation.

Kafka Consumers send fetch requests to Kafka Brokers, asking for records from one or more topics and one or more partitions within each topic. Each fetch response can contain a batch of record batches for each topic and partition tuple that was in the request. Kafka Consumers may send multiple fetches in parallel but only one fetch to each broker that has data it’s subscribed or assigned to. Here's a diagram that shows the structure of a fetch response and includes some of the configurations that we'll talk about next:

Structure of a Kafka Consumer Fetch response

There are four key configuration settings that control the size and latency of fetch responses:

The descriptions linked above in the documentation are worth a read. Here’s a simplified summary of those descriptions:

The fetch.max.bytes setting controls the maximum amount of data in a fetch response, assuming you don't have individual record batches larger than this value. The max.partition.fetch.bytes controls the maximum amount of data returned per partition within a fetch, again assuming you don't have individual record batches that are larger than this value. The key points here are: a single fetch response can have data returned from multiple partitions in the response and these values aren't absolute maximums since a single record batch larger than the max will still be consumed to ensure progress is made.

The fetch.min.bytes setting controls how much data must be in a fetch response before it will be returned by the broker. And finally, the fetch.max.wait.ms determines how long a broker will hold onto a fetch request waiting to reach the fetch.min.bytes threshold before responding.

It’s important to tune these settings to avoid unintended scaling, throughput, and cost implications. For example:

  • An application may underutilize CPU resources if throughput is limited by the Kafka Consumer.
  • An application may scale out faster than necessary if throughput is limited by the Kafka Consumer.
  • An application may put excess load on the Kafka Brokers, and waste Kafka Broker CPU, if it polls for data too frequently.

Monitoring and tuning Kafka Consumer fetch batching

Now that we know the settings that control fetch size and latency, let’s take a look at the metrics that show the batching performance of a Kafka Consumer. Using the New Relic Kafka UI mentioned above and clicking on the Consumer tab, then scrolling down a bit, we come to a section titled Fetching with five metrics: average fetch latency, max fetch latency, average fetch request rate, average fetch size, and max fetch size.

Fetching metrics in the New Relic Kafka UI

Let's talk about two common scenarios that you can quickly detect from these metrics.

Leaving data on the table: Hitting max partition fetch size

If the average fetch size (fetch-size-avg) metric is consistently around 1 MiB * number of partitions per fetch request(we explain how to calculate partitions per fetch request later on), it’s likely your consumer is hitting the max.partition.fetch.bytes limit. The default value for max.partition.fetch.bytes is 1 MiB, which means the broker will send you a response with at most 1 MiB per partition even if it has more data for the partition(s) you’re asking for. The result of this is you will end up issuing significantly more requests and your throughput could be bound by the round trip time to your broker, because Kafka Consumers only allow one fetch in-flight (or buffered) per broker.

Another way to confirm this scenario is to take a look at your average fetch request rate (fetch-rate) (Note: the value of fetch-rate is across all brokers your consumer is fetching from, not per broker). A reasonable fetch rate is somewhere around 1 to 5 requests per second per broker because it corresponds to 200 to 1,000 milliseconds of fetch latency. If your average fetch size is consistently hitting a limit and your average fetch request rate is higher than five requests per second per broker, you’re likely creating many small fetch requests instead of fewer large ones. This not only limits the throughput of your application, it can very quickly consume a large amount of your Kafka Broker capacity due to the high volume request rate which directly translates to significant costs.

Increasing max.partition.fetch.size

The obvious solution here is to increase the value of max.partition.fetch.size. But what should we set it to? One thing to notice is that the default value for fetch.max.bytes is 50 MiB! That’s a good perspective on what’s considered a safe size for the entire fetch response. Here’s another way to think of this: To fully utilize a 50 MiB fetch with the default configuration you would need to be consuming 50 partitions per fetch, which is a lot.

If you know the number of partitions per fetch request you can increase max.partition.fetch.size to a value that will get you much closer to 50 MiB per fetch:

max.partition.fetch.size = 50MiB / partitions per fetch

Note that the number of partitions per fetch is likely not constant, as your group membership changes or the number of brokers in the cluster changes this value can fluctuate, which is OK. If you know what the average number of assigned partitions per consumer is (use the assigned-partitions metric) and you know how many brokers are in your cluster, you can estimate the number of partitions per fetch:

average partitions per fetch = min(ceil(assigned-partitions / broker count), assigned-partitions)

This is an estimate because it assumes partitions are evenly distributed around your cluster and that’s something worth verifying.

Now that we know the average partitions per fetch, we can use it to calculate our desired max.partition.fetch.size:

max.partition.fetch.size = fetch.max.bytes / average partitions per fetch

Increasing max.partition.fetch.size to the above value can help bring down your request rate and prevent your consumer being the bottleneck in your application. This will save costs and resource usage on your critical Kafka Broker infrastructure and increase the performance of your application. If your request rate is still high, consider increasing fetch.max.bytes too. As with any configuration changes, make sure to properly game day and load test their impact.

Memory check!

Increasing max.partition.fetch.size or fetch.max.bytes can drastically increase the amount of data held in memory by your consumer, especially when your business logic is back pressuring. This is a good thing for maximizing throughput, but we need to make sure we allocate enough memory to our Java virtual machine (JVM) to avoid running out of memory. To estimate how much memory an application needs for fetched records we can use the following metrics:

In the worst case your Kafka Consumer can have a fetch response with data from every partition you’re assigned held in memory simultaneously. Note that the Apache Kafka Consumer has natural back pressuring here by limiting the number of in-flight (and unconsumed) fetches from a broker to one. You can estimate your memory use with the following:

fetched records max memory bytes = max.partition.fetch.size * assigned-partitions / compression-rate

Unfortunately, Kafka Consumers don’t track a compression-rate metric like Kafka Producers do. If you have metrics from the upstream producers you can use those to get the compression rate. Alternatively the incoming-byte-rate and bytes-consumed-rate of your Kafka Consumer can be used to estimate the average compression rate:

compression-rate = incoming-byte-rate / bytes-consumed-rate

Note that incoming-byte-rate is a global metric and bytes-consumed-rate is a per-topic metric. If you consume from multiple topics you will need to sum each topic bytes-consumed-rate to calculate the global average compression rate.

Make sure to test and analyze your consumer memory use under the heaviest load conditions to ensure you don't run out of memory.

Not waiting for more data: Low min bytes fetch size

The previous example covered a scenario where you have a high throughput topic and you're not asking for as much data as you could be. But what if you have a low throughput topic? It turns out you can waste even more Kafka resources with low throughput topics. If the average fetch size for your topic is very low (< 100 KB) and the average fetch latency is very low (< 100 ms) your consumer is likely doing a very poor job of batching and just returning a response on the first record batch it receives. Again take a look at your average request rate, if it is significantly higher than 1 to 5 requests per second per broker it is putting a lot of load onto the Kafka brokers.

Why is this happening though? It turns out the default value for fetch.min.bytes is 1 byte! That setting is good if your primary concern is millisecond latency but not good if you’re willing to sacrifice some latency to save big on cost. The default value of 1 byte means the first record batch to arrive on a low throughput topic is likely to be the only one returned in the response.

What’s a good setting for fetch.min.bytes to bring down the request rate and increase the size of your batches?

The first thing to do is decide what your latency tolerance is when consuming. By default fetch.max.wait.ms is configured to be 500 ms, but let's imagine you wanted to limit the latency introduced by your consumer to 250 ms. Step 1 would be to change our fetch.max.wait.ms to be 250ms.

Next we need to calculate what our current consumption throughput is per broker for a single application instance. If you have node level metrics enabled in your agent configuration you can use the incoming-byte-rate in the consumer-node-metrics scope to see the bytes consumed per second per broker filtered to a single host. Here's an example New Relic Query Language (NRQL) timeslice query:

SELECT average(newrelic.timeslice.value) FROM Metric
WHERE appName = 'my-stream-processor'
AND clientId = 'my-stream-processor.event-consumer'
AND host = 'my-stream-processor-5597b54c6d-n5gm9'
WITH METRIC_FORMAT 'MessageBroker/Kafka/Internal/consumer-node-metrics/node/{nodeId}/client/{clientId}/incoming-byte-rate'
FACET nodeId

Example incoming-byte-rate query faceted by nodeId

In the screenshot above we're consuming at most 1 KiB/s from each broker for this one application instance. Now that we know our throughput and our latency tolerance, we can pick a reasonable value for the fetch.min.bytes.

fetch.min.bytes >= average throughput per second * max latency seconds

For the scenario I described that results in setting fetch.min.bytes to 256 or more:

256B = 1KiB/s * .2s

In many scenarios it might be easier to pick a very high value for fetch.min.bytes and simply rely on fetch.max.wait.ms to control the latency tolerance. The calculation above is useful, though, since it gives you an idea of how high you would need to set fetch.min.bytes.

By increasing fetch.min.bytes for consumers of low throughput topics and tuning your value of fetch.max.wait.ms you can greatly reduce the request rate load on your Kafka Brokers with a minimal latency impact.

Conclusion

This article shows why high-quality instrumentation is necessary for tuning Kafka Consumers to ultimately maximize performance and minimize costs. The low default value of max.partition.fetch.bytes and fetch.min.bytes are just two examples of inefficiencies that may exist in your Kafka Consumer configuration. Continue exploring the New Relic Kafka UI to understand your Kafka Consumers and Kafka Producers at a deeper level.

For deployments with a large number of stream processors or very dynamic workloads it can be untenable to hand tune each Kafka Consumer in the manner described above. New Relic is investigating ways to have more dynamic and automatic Kafka Consumer tuning with a simpler set of constraints. In the meantime, remember that setting up alerts on some of the Kafka Consumer metrics we reviewed is another powerful tool to help find clients that are wasting cluster or compute resources.