Don’t miss part one in this series: Using Apache Kafka for Real-Time Event Processing at New Relic. This blog series was originally published in March 2018.
If you’re a recent adopter of Apache Kafka, you’re undoubtedly trying to determine how to handle all the data streaming through your system. The Events Pipeline team at New Relic processes a huge amount of “event data” on an hourly basis, so we’re thinking about Kafka monitoring and this question a lot. Unless you’re processing only a small amount of data, you need to distribute your data onto separate partitions.
In part one of this series—Using Apache Kafka for Real-Time Event Processing at New Relic—we explained how we built some of the underlying architecture of our event processing streams using Kafka. In this post, we explain how the partitioning strategy for your producers depends on what your consumers will do with the data.
Why partition your data in Kafka?
If you have so much load that you need more than a single instance of your application, you need to partition your data. How you partition serves as your load balancing for the downstream application. The producer clients decide which topic partition that the data ends up in, but it’s what the consumer applications do with that data that drives the decision logic. If possible, the best partitioning strategy to use is uncorrelated/random.
However, you may need to partition on an attribute of the data if:
- The consumers of the topic need to aggregate by some attribute of the data
- The consumers need some sort of ordering guarantee
- Another resource is a bottleneck and you need to shard data
- You want to concentrate data for efficiency of storage and/or indexing
In part one, we used the following diagram to illustrate a simplification of a system we run for processing ongoing queries on event data:
Random partitioning of Kafka data
We use this system on the input topic for our most CPU-intensive application—the match service. This means that all instances of the match service must know about all registered queries to be able to match any event. While the event volume is large, the number of registered queries is relatively small, and thus a single application instance can handle holding all of them in memory, for now at least.
The following diagram uses colored squares to represent events that match to the same query. It shows messages randomly allocated to partitions:
Random partitioning results in the most even spread of load for consumers, and thus makes scaling the consumers easier. It is particularly suited for stateless or “embarrassingly parallel” services.
This is effectively what you get when using the default partitioner while not manually specifying a partition or a message key. To get an efficiency boost, the default partitioner in Kafka from version 2.4 onwards uses a “sticky” algorithm, which groups all messages to the same random partition for a batch.
Partition by aggregate
On the topic consumed by the service that does the query aggregation, however, we must partition according to the query identifier since we need all of the events that we’re aggregating to end up at the same place.
This diagram shows that events matching to the same query are all co-located on the same partition. The colors represent which query each event matches to:
After releasing the original version of the service, we discovered that the top 1.5% queries accounted for approximately 90% of the events processed for aggregation. As you can imagine, this resulted in some pretty bad hot spots on the unlucky partitions.
In the following example, you can see that we broke up the aggregation service into two pieces. Now we can randomly partition on the first stage, where we partially aggregate the data and then partition by the query ID to aggregate the final results per window. This approach allows us to greatly condense the larger streams at the first aggregation stage, so they are manageable to load balance at the second stage.
Of course, this approach comes with a resource cost trade-off. Writing an extra hop to Kafka and having to split the service into two means that we spend more on network and service costs.
In this example, co-locating all the data for a query on a single client also sets us up to be able to make better ordering guarantees.
We partition our final results by the query identifier, as the clients that consume from the results topic expect the windows to be provided in order:
Planning for resource bottlenecks and storage efficiency
When choosing a partition strategy, it’s important to plan for resource bottlenecks and storage efficiency.
(Note that the examples in this section reference other services that are not a part of the streaming query system I’ve been discussing.)
Resource bottleneck: We have another service that has a dependency on some databases that have been split into shards. We partition its topic according to the how the shards are split in the databases. This approach produces a result similar to the diagram in our partition by aggregate example. Each consumer will be dependent only on the database shard it is linked with. Thus, issues with other database shards will not affect the instance or its ability to keep consuming from its partition. Also, if the application needs to keep state in memory related to the database, it will be a smaller share. Of course, this method of partitioning data is also prone to hotspots.
Storage efficiency: The source topic in our query processing system shares a topic with the system that permanently stores the event data. It reads in all the same data using a separate consumer group. The data on this topic is partitioned by which customer account the data belongs to. For efficiency of storage and access, we concentrate an account’s data into as few nodes as possible. While many accounts are small enough to fit on a single node, some accounts must be spread across multiple nodes. If an account becomes too large, we have custom logic to spread it across nodes, and, when needed, we can shrink the node count back down.
Consumer partition assignment
Whenever a consumer enters or leaves a consumer group, the brokers rebalance the partitions across consumers, meaning Kafka handles load balancing with respect to the number of partitions per application instance for you. This is great—it’s a major feature of Kafka. We use consumer groups on nearly all our services.
By default, when a rebalance happens, all consumers drop their partitions and are reassigned new ones (which is called the “eager” protocol). If you have an application that has a state associated with the consumed data, like our aggregator service for example, you need to drop that state and start fresh with data from the new partition.
To reduce this partition shuffling on stateful services, you can use the StickyAssignor. This assignor makes some attempt to keep partition numbers assigned to the same instance, as long as they remain in the group, while still evenly distributing the partitions across members. Because partitions are always revoked at the start of a rebalance, the consumer client code must track whether it has kept/lost/gained partitions, if partition moves are important to the logic of the application. This is the approach we use for our aggregator service.
I want to highlight a few other options. From Kafka release 2.4 and later, you can use the CooperativeStickyAssignor. Rather than always revoking all partitions at the start of a rebalance, the consumer listener only gets the difference in partitions revoked, as assigned over the course of the rebalance. The rebalances as a whole do take longer, and in our application we need to optimize for shortening the time of rebalances when a partition does move. That's why we stayed with using the “eager” protocol under the StickyPartitioner for our aggregator service. However, starting with Kafka release 2.5, we have the ability to keep consuming from partitions during a cooperative rebalance, so it might be worth revisiting.
Additionally, you might be able to take advantage of static membership, which can avoid triggering a rebalance altogether, if clients consistently ID themselves as the same member. This approach works even if the underlying container restarts, for example. (Both brokers and clients must be on Kafka release 2.3 or later.)
Instead of using a consumer group, you can directly assign partitions through the consumer client, which does not trigger rebalances. Of course, in that case, you must balance the partitions yourself and also make sure that all partitions are consumed. We do this in situations where we’re using Kafka to snapshot state. We keep snapshots messages manually associated with the partitions of the input topic that our service reads from.
Your partitioning strategies will depend on the shape of your data and what type of processing your applications do. As you scale, you might need to adapt your strategies to handle new volume and shape of data. Consider what the resource bottlenecks are in your architecture, and spread load accordingly across your data pipelines. It might be CPU, database traffic, or disk space, but the principle is the same. Be efficient with your most limited/expensive resources.
To learn more tips for working with Kafka, see 20 Best Practices for Working with Kafka at Scale.
The views expressed on this blog are those of the author and do not necessarily reflect the views of New Relic. Any solutions offered by the author are environment-specific and not part of the commercial solutions or support offered by New Relic. Please join us exclusively at the Explorers Hub (discuss.newrelic.com) for questions and support related to this blog post. This blog may contain links to content on third-party sites. By providing such links, New Relic does not adopt, guarantee, approve or endorse the information, views or products available on such sites.