現在、このページは英語版のみです。

In a data-processing pipeline where the timeliness of data matters for functional correctness, how do you handle unexpected latency in the system?

Why pipeline latency matters

New Relic offers an observability platform that can store metrics, logs, events, and traces. Its users care a lot about the timeliness of their data. They want to be able to see how their systems are behaving in real time and get notified of any issues as soon as possible. Late data or extreme lag in the pipeline can be problematic without proper lag detection. For example if data gets delayed in our alerting pipeline and we didn't have lag detection, then that data could be missing from alerting query results. This can cause false positive or negative alerts including loss of signal detection.

To ingest, store, and alert on data quickly, New Relic runs a streaming extract, transform, and load (ETL) pipeline at scale. Customers send us data through agents or APIs and we asynchronously process and store that data. Different APIs or products may have domain-specific validation or processing before that data is persisted and evaluated by the alerting pipeline or other real-time stream products.

If you run a streaming pipeline, you'll want to follow reliability best practices and strategies (for example, load balancing, avoiding bottlenecks, redundancy etc) to avoid latency in the critical path. However, even with the best reliability strategy, hardware can fail and you'll see latency in some of your paths. In fact some of these strategies can complicate your ability to handle latency such as re-ordering data to better load balance. This blog focuses on strategies you can apply for handling latency when it can't be avoided.

How our pipelines handle latency

We want to design a system in which latency is less impactful.

First off, we attempt to keep data in order. If data is strictly ordered it can allow us to make assumptions about its completeness. The arrival of later time-stamped data would imply that all earlier data had already arrived and any processing associated with that time frame may be finalized. We use this concept with the event flow trigger in alerting configuration, which uses a supplied bound on the order of data to advance processing. Thus that delay in the pipeline impacts all data in a timeseries by the same amount so processing implicitly adjusts to latency.

define boud for leading minute to arrive out of order

But maintaining strict order may not always be desirable or possible. There are practical scalability concerns necessitating load balancing and parallel processing. Complications arise as well when joining streams of data from a wide variety of different sources and APIs.

In this case we can buffer and reorder data. For example, this is a concept we use as part of our cumulative metric API, and deriving delta values. How much time to wait to buffer data? In some simpler cases you may be able to use heuristics, for example based on a sampling interval of the data source.

Detected_timeseries_gap

Perhaps more interesting is tracking and compensating for unexpected latency that may occur within the system.

Dynamically adapting to latency

We want to dynamically adapt the scope and amount of buffering we use. There are a few strategies to do this:

Traffic flow

We can use in-stream metadata to employ a method for tracking ingest lag using all telemetry that flows through the data processing pipeline. We attach the ingest time of the data to every piece of data at the earliest part of our pipelines. This information is passed along with the data through the pipeline, and we can take the difference from the time the data is received by our processor to determine how long it takes to reach the end. 

Traffic flow

Time waves

Another method can employ a known amount of synthetic data injected into the pipeline. We regularly write a set amount of data points to our APIs, and we call each complete set a “time wave.” On each data point in the wave we include as data the same emitted time, which serves as the “generation” ID onto each data point and allows us to be able to know when the wave was sent. We know how many data points to expect in each “wave” at the end of the pipeline, and can detect lag or data loss based on the arrival of these data points. The arrival of all data points for a generation forms the watermark of where processing is caught up with respect to ingest times.

Time waves

External observers

We can also separately monitor parts of the pipeline, and then collate and sum the individual latencies. More concretely, this is probably service and queue latency times, such as Kafka consumer lag. We must either specifically emit this data to the same place and format, or else have some service that knows what monitoring to pull and how to arrange the data.

External observers

Scoping

We can scope these strategies so that the latency measured reflects only impacted parts of our pipeline. This is important both for observability purposes and to be able to programmatically use this information in the most surgical way. At New Relic we identify and isolate paths by product source pipeline and by the cells of our cellular architecture.

This is also a place where we can use distributed tracing to get more fine-grained latency information and provide deeper visibility in the full path granularity data is traveling and what portions may be causing it to slow down.

Challenges

Each strategy also has particular challenges.

  • Traffic flow based: Blockages in the pipeline are not observed until the blockage has cleared and then the late data arrives at the end of the pipeline. This may be mitigated by using extremely fine-grained path tracing, but this in practice is tricky if it is normal for paths to change in incidents.  An additional challenge is that latency on a small portion of the path (for example, a single Kafka partition) may have too few data points that then get distributed too thinly by the time data is routed and distributed to downstream processors to be able to be readily detected in all places the data could flow to.
  • Time wave based: No information at all for an uninstrumented path. This means we must generate synthetic data down every possible input starting point and ensure every application and queue partition path possible has time wave data points flowing through them. This can be mitigated by specially routing this data in our pipelines to ensure even placement across all queues, and increase the amount of synthetic data we send.
  • External observers: We need to build a solution to ensure all pieces of your pipeline are covered and that it's understood which parts impact which downstream products pipelines, and how to add the pieces together. Generally this is not a great way to get end to end times without gaps, but it is a way for teams to own alertable places latency can crop up that are closely tied to service health.

Depending on the use case or the user, enough latency will be as bad as or worse than moving all without all the data. At New Relic we use a combination of the above strategies to monitor and adapt to latency in our pipelines.

Conclusion

Understanding pipeline latency is critical for determining customer impact, identifying underperforming parts of our pipeline, and automatically and dynamically adapting to that latency where applicable. This is often necessary for maintaining the functional correctness of parts of our pipelines as well as providing observability into our performance objectives.

While it is important to design systems that reduce the impact of latency, it's also essential to have strategies for unexpected latency issues. At New Relic we've developed strategies to handle latency when it can't be avoided.

Adopting a combination of techniques, as well as scoping these strategies, allows us to dynamically adjust the buffering and processing of data as needed. Although each strategy presents its own challenges, a well thought out and coordinated approach can yield significant improvements to pipeline performance and resilience.