Tracing Kafka with OpenTelemetry

Learn how to implement distributed tracing in Kafka with OpenTelemetry and the otelsarama library

Published Updated 10 min read

Apache Kafka is an open source event streaming platform for capturing real-time data used by thousands of companies, including New Relic. It's distributed, highly scalable, and fault-tolerant, but it can be challenging to monitor Kafka clusters. Ideally, you should be using distributed tracing to trace requests through your system, but Kafka decouples producers and consumers, which means there are no direct transactions to trace between them. Kafka also uses asynchronous processes, which have implicit, not explicit, dependencies. That makes it challenging to understand how your microservices are working together.

However, it is possible to monitor your Kafka clusters with distributed tracing and OpenTelemetry. You can then analyze and visualize your traces in an open source distributed tracing tool like Jaeger or a full observability platform like New Relic. In this post, you'll learn about the benefits of monitoring Kafka with distributed tracing and how you can set it up yourself.

NEW RELIC KAFKA INTEGRATION
kafka logo
Start monitoring your Kafka data today.
Install the Kafka quickstart Install the Kafka quickstart

Why New Relic traces Kafka data

New Relic is a full stack observability platform that ingests more than 200 petabytes a month or approximately nine million data points per second. That’s a lot of data.

To ingest this data without data loss, New Relic uses Apache Kafka to save data coming from agents into durable storage.

New Relic uses Kafka to process telemetry data.

Once the data is written into Kafka, many different services aggregate, normalize, decorate, and transform it before it’s written into our database. There are a lot of things that can potentially go wrong during the process, so we need to monitor our Kafka installation closely and fix issues quickly.

That’s where distributed tracing comes in.

Engineering teams working on New Relic’s Telemetry Data Platform (TDP) use distributed tracing to optimize what we call time to glass. Time to glass measures how long it takes for ingested data to become queryable for our customers. 

Benefits of distributed tracing with Kafka

Let’s take a look at some of the benefits you’ll get from using distributed tracing with Kafka.

Pinpoint bottlenecks in your streaming pipeline

Without distributed tracing, you need to manually correlate logs across services and SaaS solutions. With distributed tracing, you don’t need to manually correlate your logs. You can trace journey of a request across your systems, revealing bottlenecks instantly.

Get visibility into downstream impacts of your services

Even small changes in a service in your streaming pipeline have the potential to disrupt upstream and downstream dependencies. With distributed tracing, you can use distributed traces to mitigate disruptions quickly. 

Tracking data loss

One of the challenges of Kafka at scale is tracking data loss. What if a Kafka broker deletes data after the configured retention time has passed? That data is gone and it’s hard to know what happened and how much data is missing. With distributed tracing, you can measure how and where data loss is happening by looking for traces with orphan spans

Finetune consumer polling

Kafka consumers poll upstream services when they are ready for new messages. When there are too many messages in the queue, consumers stop requesting new records, which results in increased consumer lag. 

Distributed tracing can help you optimize your consumer configuration to find an ideal balance between latency and throughput. You can adjust poll intervals with max.poll.interval.ms and total messages to be consumed per poll with max.poll.records, then use distributed tracing to see how your consumers are impacted.  

Fine-tuning env variables for Kafka producers

You can also use distributed tracing to optimize the configuration of your producer batch sizes. Kafka producers attempt to batch records into fewer requests when multiple records are being sent to the same partition. Kafka can achieve better compression when there are more messages in a batch because it’s likely there will be more repeatable data chunks to compress. The variable batch.size controls the default batch size in bytes. Whenever the number of messages in a batch reaches the limit set by batch.size, it gets compressed and sent to the broker. 

A small batch size makes batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use more memory because the batch size will have a buffer of the specified batch size for additional records.

The variable linger.ms is the maximum amount of time a Kafka producer will wait for new messages before the messages are batched and sent to the broker. For example, setting linger.ms=5 reduces the number of requests sent but adds up to 5ms of latency to records.

Distributed tracing allows you to see the end-to-end impacts of changes you make to your batch.size and linger.ms configurations, helping you optimize compression and performance.

How to trace Kafka with OpenTelemetry

In order to create traces, distributed tracing uses context, which contains important information about requests. However, Kafka messages don’t automatically include context, which means you need to manually add it yourself.

Fortunately, you can instruct your Kafka producers to inject context into message headers, which are just key-value pairs that contain metadata, similar to HTTP headers. This post will show you how to inject context with Shopify’s otelsarama library, but you can also use official Kafka libraries to inject context. To get complete tracing data, you’ll need to inject span data both when a message is created (at the producer level) and when the message is consumed (at the consumer level).

There’s also one other important part of the equation: you need to add code so your Kafka consumers understand the span context you add. You can use otelsarama to do that, too.

You can find all the code in this sample repo. The code in the main branch  can be used to automatically spin up a Kafka producer and consumer along with a Jaeger visualization. The new-relic branch includes the code for connecting the sample repo to New Relic. If you want to try out the code in the main branch yourself, you need to have Docker and golang installed. Just clone the repo and then run the following command from within the root directory of the project to spin up Apache Zookeeper, Kafka, and Jaeger.

docker-compose up -d zoo kafka jaeger

Note that if you want to run this example with New Relic, there are a few additional environment variables to set up, which are covered later in this blog. Otherwise, the code for both implementations is very similar.

Next, you need to start the Kafka producer and consumer with the following commands.

From inside the producer directory, run:

go run producer.go

Then, from inside the consumer directory, run:

go run consumer.go

Now let’s walk through what the code does.

Injecting context into messages from Kafka producer

Kafka producers don’t include span data in message headers, but the otelsarama library provides the  WrapAsyncProducer function, which does exactly what it sounds like. It adds a wrapper to an async Kafka producer that provides new functionality. The function uses the propagator design pattern, which means that propagators are used to add context to parts of a system that are otherwise independent. In this case, the wrapped producer uses the propagator pattern to pass a unique span to a message. The context in that span is passed downstream, accumulating more context along the way so that no context is ever lost—which is exactly what you need for distributed tracing.

Here’s the code that wraps an async Kafka producer and provides this functionality. The full producer code can be found in this file:

producer = otelsarama.WrapAsyncProducer(config, producer,otelsarama.WithTracerProvider(tracerProvider),otelsarama.WithPropagators(propagators))

This code wraps the Kafka producer so all messages it produces will be associated with a span, which allows downstream services to extract the parent span and create new child spans.

otelsarama.WithPropagators gets passed in as an argument because the consumer should use the propagators from the producer, not the global provider, to create a continuous trace. 

Now that your producer is creating messages with spans that can be traced, you need to actually pass along context that makes those traces useful. You can inject context as a header into Kafka messages with (&msg) using the OTel Propagators API. Here’s the code that does that:

propagators := propagation.TraceContext{}
propagators.Inject(ctx, otelsarama.NewProducerMessageCarrier(&msg))
producer.Input() <- &msg

The first line of this code registers OTel’s TraceContext propagator globally. Next, the code uses OTel’s Inject method along with otelsarama.NewProducerMessageCarrier to inject span context directly into the headers of the Kafka message.

After context is added to a message, the producer sends it to the Kafka broker. 

You can see the headers of your Kafka messages by installing kcat, a CLI to view topics and partitions for Kafka.

If you've installed kcat and you're running the repo, you can run the following command:

kcat -b localhost:9092 -t kafkademo

Your messages should include the relevant headers. The next image shows that message headers now include tracestate and traceparent keys.

Enabling Kafka consumers to read incoming message context

You’re halfway there. A message’s span data should also include context about when the message is consumed. The span data that’s injected when a producer creates a message won't give you a complete trace—it just tells you that a message has been created. When a consumer processes a claim, it needs to log that the message has been successfully processed. That means you need to inject post-processing span data into each message.

The startConsumerGroup function in consumer.go uses the otelsarama library to wrap consumers with the functionality needed to add additional context to incoming messages. Then it uses Shopify’s sarama library to instantiate a new consumer group and start consuming messages. Here’s the line of code that allows consumers to add context to messages:

handler := otelsarama.WrapConsumerGroupHandler(&consumerGroupHandler,otelsarama.WithPropagators(propagators))

Remember that parent span the producer creates for each message? Here, OpenTelemetry is extracting that parent span. The next step is to add a child span with new context:

_, span := openTelemetryTracer.Start(ctx, "consume message")

openTelemetryTracer is the instantiated OpenTelemetry tracer, which adds ”consume message” to the context (ctx). By using the propagator model, OpenTelemetry can inject context wherever you manually instrument your code, giving you detailed tracing data as it moves through each downstream service.

Visualizing the data in Jaeger

Finally, the code includes a Jaeger exporter so you can visualize the traces with Jaeger at localhost:16686. We won’t go into the details of adding a Jaeger exporter here. Just run the code and check out the dashboard yourself.

You can see the end-to-end journey of a request in a waterfall graph, with the latency associated with each request, as well as other metadata like destination topic, message id, and partition in Jaeger.

Visualizing the data in New Relic

If you want to visualize and analyze your Kafka distributed traces at scale, you should use an observability platform like New Relic. To try out this repository with New Relic, you need an account and your license key.

Before spinning up the Docker image in the new-relic branch, you first need to define two additional environmental variables in the  .env files of consumer/.env and producer/.env:

OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp.nr-data.net:4317
OTEL_EXPORTER_OTLP_HEADERS="api-key=<your_license_key>"

Replace <your_license_key> in the previous code snippet with your license key.

This implementation also uses the otelsarama library and the code is very similar. The key difference (other than additional environment variables) is that the trace provider needs to be configured to use the OTLP trace exporter instead of the Jaeger exporter. 

If you’re taking a close look at the code, you’ll also see that both consumer.go and producer.go include the following line in their tracerProvider() functions:

ctx := context.Background()

This is because the OTLP tracing library requires that context be passed in.

Once you’ve added in your environment variables, you can spin up the Docker image with the following command:

docker-compose up -d zoo kafka

The next image shows Kafka traces coming into New Relic.

Conclusion

Distributed tracing is a powerful tool that can help you quickly pinpoint performance bottlenecks and reduce your mean time to resolution (MTTR). However, it’s not an easy feat to implement, especially at scale.  A lot of cross-functional work is required to make sure tracing is implemented in every service, step, and stage of your streaming pipeline. In addition, it’s going to require processing a lot of data. Even when you sample your requests for every 10k, 50k,  or100k traces, it can be a lot of data that could be duplicated.

When you do have distributed tracing implemented with Kafka, it helps you get better visibility into issues and helps you optimize your data ingest pipeline. Just with this code example, you can gather important data about when messages are produced and consumed, giving you valuable insights into how you can optimize your Kafka producers and consumers for peak performance.