New Relic Now Start training on Intelligent Observability February 25th.
Save your seat.
현재 이 페이지는 영어로만 제공됩니다.

Apache Airflow has become an essential tool for orchestrating complex workflows in modern data engineering pipelines. However, ensuring its reliability and performance can be challenging without proper monitoring in place. Additionally, monitoring Airflow provides some key benefits, including helping you gain deeper insights, diagnose issues faster, and ultimately optimize the performance and reliability of your data pipelines. 


First, we’ll start with some brief fundamentals of what a workflow is, and how Apache Airflow comes into play. Then, we’ll take a deeper look at Airflow’s features and architecture, why monitoring Airflow is important, and how using OpenTelemetry (OTel), an open source observability framework, can be beneficial for this purpose. Finally, you’ll learn how to implement OpenTelemetry to monitor Airflow. If you’re already familiar with Airflow, you can skip ahead to Implementing OpenTelemetry for Airflow.

What is a workflow?

Just about anything can be a workflow, but to help illustrate the idea, let’s turn the process of baking a cake into a workflow. There are several tasks that need to happen in a specific order before you can bake the cake:

  1. Gather all the ingredients
  2. Preheat the oven
  3. Mix the dry ingredients
  4. Mix the wet ingredients
  5. Combine the wet and dry ingredients

The final task is to bake the cake, but you need to complete the previous tasks before you can start this one. Once you’ve baked the cake, then this workflow is complete. If you just need to bake one cake every once in a while, you could manage this workflow manually. However, what if you need to bake a few hundred cakes per day? Now, you might be interested in scheduling and automating your tasks to help make sure you’re completing the tasks in the right order and at the right time. 

This is where a tool such as Apache Airflow comes into play.

What is Apache Airflow?

Apache Airflow is an open-source scheduling tool and platform that enables you to create, schedule, and monitor workflows through Python code. By helping you keep track of your tasks and their dependencies, it simplifies the process of managing complex workflows and automation pipelines. Because of this, Airflow is used for a wide range of tasks across industries:

  • ETL (extract, transform, load) processes. For example, an online retail company could use Airflow to extract sales data from different stores, transform it into a standardized format, and load it into a central database for reporting and analysis. 
  • Machine learning pipelines. For example, a tech company could use Airflow to manage its machine learning tasks, such as data preprocessing, model training, evaluation, and deployment. 
  • DevOps automation. For example, a devops team could integrate Airflow for automating tasks such as code deployments, testing, and monitoring, thereby streamlining the software development lifecycle and ensuring consistency in its processes. 
  • Cloud infrastructure management. For example, an organization could use Airflow to manage tasks such as provisioning resources, monitoring, scaling, and scheduling backups.

Workflows in Airflow

Tasks are the simplest unit in Airflow, and can be thought of as operations, typically in a data pipeline. Tasks describe the action to be performed, such as fetching data, and triggering other systems. The following are common types of tasks you might see:

  • Operators: These are predefined tasks.
  • Sensors: These tasks wait for an external event to occur.
  • TaskFlow-decorated @tasks: These are custom Python functions packaged up as tasks.

These tasks might have dependencies, meaning one or more conditions or tasks need to be met or completed before the next task runs. Remember our cake baking workflow? You’d have to gather the ingredients before you can mix them, and you’d have to mix the dry and wet ingredients before you could bake the cake. Note that you can have tasks that run in parallel and are not dependent on each other (for instance, you can gather all the ingredients while preheating your oven). This workflow would be represented in Airflow as a DAG, or directed-acyclic graph. 

To help solidify or refresh your understanding of DAGs, let’s look at the following DAG of a traditional ETL workflow, which has three tasks and two dependencies:

Diagram of an extract, load, transform (ETL) process showing tasks and its dependencies.

The latter two tasks have dependencies on the task immediately preceding them: the transforming task depends on the extracting task, and the loading task depends on the transforming task. Since the graph is directed, has a clear start and end point, and there are no cyclic dependencies (or loops) between the tasks—hence, “acyclic”—this workflow is considered a DAG

No matter how simple or complex the workflow, as long as it meets the preceding bolded criteria, it can be represented as a DAG. On the other hand, if your workflow includes distinct loops (cyclic dependencies), such as in the following workflow, then it would not be a DAG. If there’s no way to break out of the loop, then the workflow might run forever:

Diagram showing an extract, load, transform (ETL) process that contains a dependency loop.

Airflow will orchestrate and run anything, regardless of what it is you’re running, including directly as a command using a shell or Python operators.

Architecture and deployment of Airflow

At minimum, an Airflow installation consists of the following components:

  • Scheduler: This component triggers scheduled workflows and submits tasks to the executor to run. The executor is not a separate component, but a configuration property of the scheduler that runs within the scheduler process. 
  • Web server: This component provides the user interface (UI) where you can inspect, trigger, and debug your DAGs and tasks. 
  • DAG directory: This component contains the DAG files the scheduler reads so it knows which tasks to run and when to run them. 
  • Metadata database: This component stores information about DAG runs, such as the state of workflows and tasks, and whether or not the DAG ran successfully. 

The following are Airflow components that are optional, but that you can use to improve extensibility, scalability, and performance in your Airflow:

  • Worker: This component executes the tasks given to it by the scheduler. In a basic installation, this component might be part of the scheduler instead of a separate component.
  • Triggerer: This component executes deferred tasks in an asyncio event loop. 
  • DAG processor: This component parses DAG files, then serializes them into the metadata database. This component is part of the scheduler by default, but for scaling and security, can be run separately. 
  • Plugins directory: Plugins enable you to extend the functionality of Airflow. 

These components are all Python applications that can be deployed using a number of deployment mechanisms. You can run Airflow in a single machine with a simple setup consisting of only a scheduler and web server, which you can see in the following diagram:

Diagram of a simple Apache Airflow setup, consisting of only a scheduler and web server

You can also run Airflow in a distributed environment, with components running on different machines, and scale it by running multiple instances of the preceding components.

Monitoring Airflow with OpenTelemetry

Now that you have a basic understanding of Airflow, let’s learn why monitoring Airflow is important, why you might want to use OpenTelemetry to do so, and finally, how to implement OpenTelemetry for your Airflow.

Why monitor Airflow?

Being able to proactively troubleshoot and manage your workflows are key benefits of monitoring and alerting on your Airflow deployments. When an issue occurs, having insight into the problem and its root cause means you’re able to get things back up and running again quickly, which in turn improves the reliability and performance of your Airflow workflows.

What to monitor

There might be metrics specific to your use case that you might want to monitor, but in general, the following list highlights key aspects to track: 

  • Health checks for your Airflow components (for example, your scheduler and workers), including their uptime. 
  • DAGs: How much time is used for parsing DAGs, the average time DAGs take to end, how frequently DAGs are delaying their schedules, and how much time is taken by DAGs to complete dependency checks.
  • Whether pool utilization is increasing. 
  • The statuses of job executions, executor tasks, operator-by-operator executions, and task instances.
  • Whether critical tasks and sensors are taking longer. 
  • Whether your custom metrics are being captured.

Why OpenTelemetry?

There are a number of managed solutions available for monitoring your Airflow workflows, but if you want a high amount of flexibility when it comes to where you ship, store, and analyze your data, OpenTelemetry is for you. OpenTelemetry is an open source observability framework that is used to generate information about your applications and infrastructure. It is vendor-agnostic, meaning that once you instrument your apps with it, you have the ability to export your data to any backend, such as New Relic.

Implementing OpenTelemetry for Airflow

The OpenTelemetry Collector is required for authentication with New Relic, as Airflow currently lacks support for shipping OpenTelemetry data with authentication headers. This just means you need to configure your metrics to be routed to a Collector, which will then forward the data to a New Relic endpoint for OpenTelemetry data using your license key.

First, depending on how you’re deploying your Airflow, you’ll need to install the Airflow package with the otel extra. 

If you’re installing Airflow from PyPI, after following these steps, use the following pip command:

pip install "apache-airflow[otel]"

If you’re installing Airflow using Docker, after setting up the Airflow Docker image, extend the pre-built image by using a Dockerfile to install the otel extra (if needed, you can replace latest with and/or set $AIRFLOW_VERSION to a specific version -- note that support for OpenTelemetry was added in **Airflow v2.7.0**):

FROM apache/airflow:latest
RUN pip install --no-cache-dir "apache-airflow[otel]==$AIRFLOW_VERSION"

Next, set up your OpenTelemetry Collector using these steps, then run your Collector using the following configuration file, making sure to replace:

  • <INSERT_NEW_RELIC_OTLP_ENDPOINT> with the appropriate New Relic OTLP endpoint.
  • <INSERT_NEW_RELIC_LICENSE_KEY> with your license key.
receivers:
 otlp:
   protocols:
     grpc:
     http:


processors:
 batch:


exporters:
 otlphttp:
   endpoint: <INSERT_NEW_RELIC_OTLP_ENDPOINT>
   headers:
     api-key: <INSERT_NEW_RELIC_LICENSE_KEY>


service:
 pipelines:
   traces:
     receivers: [otlp]
     processors: [batch]
     exporters: [otlphttp]
   metrics:
     receivers: [otlp]
     processors: [batch]
     exporters: [otlphttp]
   logs:
     receivers: [otlp]
     processors: [batch]
     exporters: [otlphttp]

If you’re using Docker, ensure that port 4318 on the Collector is reachable from the running Airflow instance; otherwise, you might need to use a Docker network. Port 4318 is how Airflow sends metrics with OpenTelemetry. 

Another thing to note is that if you’re running Airflow in a Docker container alongside your Collector, you will need to update the otel_host setting from localhost to your Collector’s container address. 

Finally, set the following required configurations for Airflow using one of the following options:

  • In the airflow.cfg file
[metrics]
otel_on = True
otel_host = localhost
otel_port = 4318
otel_ssl_active = False
  • As environment variables
export AIRFLOW__METRICS__OTEL_ON=True
export AIRFLOW__METRICS__OTEL_HOST=localhost
export AIRFLOW__METRICS__OTEL_PORT=4318
export AIRFLOW__METRICS__OTEL_SSL_ACTIVE=False

If your metric names exceed the OpenTelemetry limit of 63 bytes, you can configure the state_name_handler option in the [metrics] section of your airflow.cfg file to point to a function that validates the stat name, applies the name change if needed, and returns the new name. For example:

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

Once your data is in New Relic, you can visualize tasks, operators, and DAG executions as metrics, and set alerts on critical metrics:

Screen shot of Airflow data in New Relic