New Relic runs a large streaming platform that ingests and processes several petabytes of observability data per day. Core to this platform is Apache Kafka, a powerful open-source distributed event streaming technology. New Relic has been using Apache Kafka for over 10 years. During that time the team has learned how to maximize efficiency and reliability while also ensuring data is processed at low latency.
One of the keys to efficiently running Apache Kafka is appropriately tuning the Consumers, the pieces of your stream processing applications that receive data. To understand how to tune your Kafka Consumers you must first ensure you have the right instrumentation.
This article shows how to instrument Kafka Consumers, and then explains how to interpret those metrics to detect and remediate a few common issues.
Instrumentation
Before tuning your Kafka Consumer you need high quality instrumentation to diagnose their current performance and resource usage. This article is focused on tuning the Kafka Consumer provided by the official Apache Kafka Java Client because that’s primarily used at New Relic, but the insights and analysis apply to the Kafka Clients written in other languages too. The Apache Kafka Java Client tracks many key Kafka Consumer metrics and combined with the New Relic Java agent you can automatically collect those Kafka Consumer metrics, you can automatically collect those Kafka Consumer metrics. To make things even easier, those metrics are now visible in a curated New Relic Kafka UI for any services that use Kafka Clients. Here’s what that looks like:
Note that for the Apache Kafka clients in other languages based on librdkafka
you will need to follow that language's documentation on how to harvest the internal metrics. The librdkafka metrics
also don’t match those produced by the Java client. Through the rest of this guide we’ll refer to the Java client metrics only and the New Relic Kafka UI above to inform tuning decisions.
Understanding Kafka Consumer fetch batching
The fetch rate of a Kafka Consumer is largely based on how records are grouped together in a fetch and how often you fetch, so let’s take some time to explain that mechanism. For a more gentle introduction to Kafka, try the Confluent documentation.
Kafka Consumers send fetch requests to Kafka Brokers, asking for records from one or more topics and one or more partitions within each topic. Each fetch response can contain a batch of record batches for each topic and partition tuple that was in the request. Kafka Consumers may send multiple fetches in parallel but only one fetch to each broker that has data it’s subscribed or assigned to. Here's a diagram that shows the structure of a fetch response and includes some of the configurations that we'll talk about next:
There are four key configuration settings that control the size and latency of fetch responses:
The descriptions linked above in the documentation are worth a read. Here’s a simplified summary of those descriptions:
The fetch.max.bytes
setting controls the maximum amount of data in a fetch response, assuming you don't have individual record batches larger than this value. The max.partition.fetch.bytes
controls the maximum amount of data returned per partition within a fetch, again assuming you don't have individual record batches that are larger than this value. The key points here are: a single fetch response can have data returned from multiple partitions in the response and these values aren't absolute maximums since a single record batch larger than the max will still be consumed to ensure progress is made.
The fetch.min.bytes
setting controls how much data must be in a fetch response before it will be returned by the broker. And finally, the fetch.max.wait.ms
determines how long a broker will hold onto a fetch request waiting to reach the fetch.min.bytes
threshold before responding.
It’s important to tune these settings to avoid unintended scaling, throughput, and cost implications. For example:
- An application may underutilize CPU resources if throughput is limited by the Kafka Consumer.
- An application may scale out faster than necessary if throughput is limited by the Kafka Consumer.
- An application may put excess load on the Kafka Brokers, and waste Kafka Broker CPU, if it polls for data too frequently.
Monitoring and tuning Kafka Consumer fetch batching
Now that we know the settings that control fetch size and latency, let’s take a look at the metrics that show the batching performance of a Kafka Consumer. Using the New Relic Kafka UI mentioned above and clicking on the Consumer tab, then scrolling down a bit, we come to a section titled Fetching with five metrics: average fetch latency
, max fetch latency
, average fetch request rate
, average fetch size
, and max fetch size
.
Let's talk about two common scenarios that you can quickly detect from these metrics.
Leaving data on the table: Hitting max partition fetch size
If the average fetch size (fetch-size-avg
) metric is consistently around 1 MiB * number of partitions per fetch request
(we explain how to calculate partitions per fetch request later on), it’s likely your consumer is hitting the max.partition.fetch.bytes
limit. The default value for max.partition.fetch.bytes
is 1 MiB, which means the broker will send you a response with at most 1 MiB per partition even if it has more data for the partition(s) you’re asking for. The result of this is you will end up issuing significantly more requests and your throughput could be bound by the round trip time to your broker, because Kafka Consumers only allow one fetch in-flight (or buffered) per broker.
Another way to confirm this scenario is to take a look at your average fetch request rate (fetch-rate
) (Note: the value of fetch-rate
is across all brokers your consumer is fetching from, not per broker). A reasonable fetch rate is somewhere around 1 to 5 requests per second per broker because it corresponds to 200 to 1,000 milliseconds of fetch latency. If your average fetch size is consistently hitting a limit and your average fetch request rate is higher than five requests per second per broker, you’re likely creating many small fetch requests instead of fewer large ones. This not only limits the throughput of your application, it can very quickly consume a large amount of your Kafka Broker capacity due to the high volume request rate which directly translates to significant costs.
Increasing max.partition.fetch.size
The obvious solution here is to increase the value of max.partition.fetch.size
. But what should we set it to? One thing to notice is that the default value for fetch.max.bytes
is 50 MiB! That’s a good perspective on what’s considered a safe size for the entire fetch response. Here’s another way to think of this: To fully utilize a 50 MiB fetch with the default configuration you would need to be consuming 50 partitions per fetch, which is a lot.
If you know the number of partitions per fetch request you can increase max.partition.fetch.size
to a value that will get you much closer to 50 MiB per fetch:
max.partition.fetch.size = 50MiB / partitions per fetch
Note that the number of partitions per fetch is likely not constant, as your group membership changes or the number of brokers in the cluster changes this value can fluctuate, which is OK. If you know what the average number of assigned partitions per consumer is (use the assigned-partitions metric) and you know how many brokers are in your cluster, you can estimate the number of partitions per fetch:
average partitions per fetch = min(ceil(assigned-partitions / broker count), assigned-partitions)
This is an estimate because it assumes partitions are evenly distributed around your cluster and that’s something worth verifying.
Now that we know the average partitions per fetch, we can use it to calculate our desired max.partition.fetch.size
:
max.partition.fetch.size = fetch.max.bytes / average partitions per fetch
Increasing max.partition.fetch.size
to the above value can help bring down your request rate and prevent your consumer being the bottleneck in your application. This will save costs and resource usage on your critical Kafka Broker infrastructure and increase the performance of your application. If your request rate is still high, consider increasing fetch.max.bytes
too. As with any configuration changes, make sure to properly game day and load test their impact.
Memory check!
Increasing max.partition.fetch.size
or fetch.max.bytes
can drastically increase the amount of data held in memory by your consumer, especially when your business logic is back pressuring. This is a good thing for maximizing throughput, but we need to make sure we allocate enough memory to our Java virtual machine (JVM) to avoid running out of memory. To estimate how much memory an application needs for fetched records we can use the following metrics:
- assigned-partitions
- compression-rate (for upstream producers)
- incoming-byte-rate
- bytes-consumed-rate
In the worst case your Kafka Consumer can have a fetch response with data from every partition you’re assigned held in memory simultaneously. Note that the Apache Kafka Consumer has natural back pressuring here by limiting the number of in-flight (and unconsumed) fetches from a broker to one. You can estimate your memory use with the following:
fetched records max memory bytes = max.partition.fetch.size * assigned-partitions / compression-rate
Unfortunately, Kafka Consumers don’t track a compression-rate metric like Kafka Producers do. If you have metrics from the upstream producers you can use those to get the compression rate. Alternatively the incoming-byte-rate and bytes-consumed-rate of your Kafka Consumer can be used to estimate the average compression rate:
compression-rate = incoming-byte-rate / bytes-consumed-rate
Note that incoming-byte-rate
is a global metric and bytes-consumed-rate
is a per-topic metric. If you consume from multiple topics you will need to sum each topic bytes-consumed-rate
to calculate the global average compression rate.
Make sure to test and analyze your consumer memory use under the heaviest load conditions to ensure you don't run out of memory.
Not waiting for more data: Low min bytes fetch size
The previous example covered a scenario where you have a high throughput topic and you're not asking for as much data as you could be. But what if you have a low throughput topic? It turns out you can waste even more Kafka resources with low throughput topics. If the average fetch size for your topic is very low (< 100 KB) and the average fetch latency is very low (< 100 ms) your consumer is likely doing a very poor job of batching and just returning a response on the first record batch it receives. Again take a look at your average request rate, if it is significantly higher than 1 to 5 requests per second per broker it is putting a lot of load onto the Kafka brokers.
Why is this happening though? It turns out the default value for fetch.min.bytes
is 1 byte! That setting is good if your primary concern is millisecond latency but not good if you’re willing to sacrifice some latency to save big on cost. The default value of 1 byte means the first record batch to arrive on a low throughput topic is likely to be the only one returned in the response.
What’s a good setting for fetch.min.bytes
to bring down the request rate and increase the size of your batches?
The first thing to do is decide what your latency tolerance is when consuming. By default fetch.max.wait.ms
is configured to be 500 ms, but let's imagine you wanted to limit the latency introduced by your consumer to 250 ms. Step 1 would be to change our fetch.max.wait.ms
to be 250ms.
Next we need to calculate what our current consumption throughput is per broker for a single application instance. If you have node level metrics enabled in your agent configuration you can use the incoming-byte-rate
in the consumer-node-metrics scope to see the bytes consumed per second per broker filtered to a single host. Here's an example New Relic Query Language (NRQL) timeslice query:
SELECT average(newrelic.timeslice.value) FROM Metric
WHERE appName = 'my-stream-processor'
AND clientId = 'my-stream-processor.event-consumer'
AND host = 'my-stream-processor-5597b54c6d-n5gm9'
WITH METRIC_FORMAT 'MessageBroker/Kafka/Internal/consumer-node-metrics/node/{nodeId}/client/{clientId}/incoming-byte-rate'
FACET nodeId
In the screenshot above we're consuming at most 1 KiB/s from each broker for this one application instance. Now that we know our throughput and our latency tolerance, we can pick a reasonable value for the fetch.min.bytes
.
fetch.min.bytes >= average throughput per second * max latency seconds
For the scenario I described that results in setting fetch.min.bytes
to 256 or more:
256B = 1KiB/s * .2s
In many scenarios it might be easier to pick a very high value for fetch.min.bytes
and simply rely on fetch.max.wait.ms
to control the latency tolerance. The calculation above is useful, though, since it gives you an idea of how high you would need to set fetch.min.bytes
.
By increasing fetch.min.bytes
for consumers of low throughput topics and tuning your value of fetch.max.wait.ms
you can greatly reduce the request rate load on your Kafka Brokers with a minimal latency impact.
Conclusion
This article shows why high-quality instrumentation is necessary for tuning Kafka Consumers to ultimately maximize performance and minimize costs. The low default value of max.partition.fetch.bytes
and fetch.min.bytes
are just two examples of inefficiencies that may exist in your Kafka Consumer configuration. Continue exploring the New Relic Kafka UI to understand your Kafka Consumers and Kafka Producers at a deeper level.
For deployments with a large number of stream processors or very dynamic workloads it can be untenable to hand tune each Kafka Consumer in the manner described above. New Relic is investigating ways to have more dynamic and automatic Kafka Consumer tuning with a simpler set of constraints. In the meantime, remember that setting up alerts on some of the Kafka Consumer metrics we reviewed is another powerful tool to help find clients that are wasting cluster or compute resources.
Étapes suivantes
To fully optimize your Apache Kafka Consumers and ensure your streaming platform runs efficiently, sign up with New Relic today. Leverage our comprehensive tools and insights to monitor, tune, and enhance your Kafka infrastructure.
Look out for more blog posts coming from New Relic on tuning other key Apache Kafka components, including Kafka Producers and Kafka Topics.
Les opinions exprimées sur ce blog sont celles de l'auteur et ne reflètent pas nécessairement celles de New Relic. Toutes les solutions proposées par l'auteur sont spécifiques à l'environnement et ne font pas partie des solutions commerciales ou du support proposés par New Relic. Veuillez nous rejoindre exclusivement sur l'Explorers Hub (discuss.newrelic.com) pour toute question et assistance concernant cet article de blog. Ce blog peut contenir des liens vers du contenu de sites tiers. En fournissant de tels liens, New Relic n'adopte, ne garantit, n'approuve ou n'approuve pas les informations, vues ou produits disponibles sur ces sites.