Apache Airflow は、最新のデータエンジニアリングパイプラインで複雑なワークフローを調整する上で不可欠なツールとなっています。ただし、適切な監視が実施されていない場合、信頼性とパフォーマンスを確保することが困難になることがあります。しかし、Airflow を監視するとより深いインサイトの獲得、問題の迅速な診断、そしてデータパイプラインのパフォーマンスと信頼性の最適化など、いくつかの重要なメリットを得ることができます。


本ブログでは、まず、ワークフローとは何か、Apache Airflow がどのように機能するか について、の概要を説明します。次に、Airflow のアーキテクチャーと機能、Airflow の監視が重要な理由、オープンソースのオブザーバビリティフレームワークである OpenTelemetry(OTel)の使用がここでの目的にどのように役立つか を詳しく見ていきます。最後に、OpenTelemetry を実装して Airflow を監視する方法を説明します。Airflow についてすでによくご存知の場合は、Airflow向けOpenTelemetryの実装 に進んでください。

ワークフローとは?

大概のことは何でもワークフローにすることができますが、この考え方を説明するために、ケーキを焼くプロセスをワークフローに変えてみましょう。ケーキを焼く前に、特定の順序で実行すべきタスクがいくつかあります。

  1. すべての材料を揃える
  2. オーブンを予熱する
  3. 乾いた材料を混ぜる
  4. 湿った材料を混ぜる
  5. 湿った材料と乾いた材料を混ぜ合わせる

最終タスクはケーキを焼くことですが、このタスクを開始する前に、前のタスクを完了する必要があります。ケーキを焼き終えたら、このワークフローは完了です。たまにケーキを1つ焼くだけであれば、このワークフローを手動で管理できます。しかし、1日に数百個のケーキを焼く必要がある場合はどうでしょうか?タスクを適切な順序と適切なタイミングで完了できるように、タスクのスケジュール設定と自動化を検討してみてはいかがでしょうか。

ここで、Apache Airflow などのツールが役立ちます。

Apache Airflow とは?

Apache Airflow は、Python コードを使用してワークフローを作成、スケジュール、監視できるオープンソースのスケジューリングツールおよびプラットフォームです。タスクとその依存関係を追跡できるようにすることで、複雑なワークフローと自動化パイプラインの管理プロセスを簡素化します。このため、Airflow はさまざまな業界の幅広いタスクに使用されています。

  • ETL ( Extract 抽出、Transform 変換、Load 読み込み ) プロセス:たとえば、オンライン小売企業は Airflow を使用してさまざまな店舗から販売データを抽出し、それを標準化された形式に変換して、レポート作成と分析のためにデータベースにロードすることができます。
  • 機械学習パイプライン:たとえば、技術系企業では Airflow を使用して、データの前処理、モデルのトレーニング、評価、デプロイメントなどの機械学習タスクを管理できます。
  • DevOps 自動化:たとえば、DevOps チームは、Airflow を統合してコードのデプロイメント、テスト、監視などのタスクを自動化し、ソフトウェア開発ライフサイクルを合理化して、プロセスの一貫性を確保できます。
  • クラウドインフラストラクチャ管理:たとえば、組織は Airflow を使用して、リソースのプロビジョニング、監視、スケーリング、バックアップのスケジュール設定などのタスクを管理できます。

Airflow のワークフロー

タスクは Airflow で最も単純な単位であり、通常はデータパイプライン内の操作と考えることができます。タスクは、データの取得や他のシステムのトリガーなど、実行されるアクションを記述します。よく見られるタスクの一般的な種類は次のとおりです。

  • オペレーター:これらは事前定義されたタスクです
  • センサー:これらのタスクは外部イベントが発生するのを待ちます
  • @taskで装飾された TaskFlow これらは、タスクとしてまとめられたカスタム Python 関数です

これらのタスクには依存関係がある場合があります。つまり、次のタスクを実行する前に、1つ以上の条件を満たすか完了している必要があります。ケーキを焼くワークフローを覚えていますか?混ぜる前に材料を揃えなければならず、ケーキを焼く前に乾いた材料と湿った材料を混ぜ合わせる必要があります。相互に依存せずにタスクを並行して実行できることに注意してください (たとえば、オーブンを予熱しながらすべての材料を揃えることができます)。このワークフローは、Airflow では DAG (有向非巡回グラフ) として表されます。

DAG についての理解を深めるために、3つのタスクと 2つの依存関係を持つ従来の ETL ワークフローの DAG を見てみましょう。

後者の 2つのタスクは、直前のタスクに依存関係があります。つまり、変換タスクは抽出タスクに依存し、ロードタスクは変換タスクに依存します。グラフは有向 (方向がある) で、開始点と終了点が明確であり、タスク間に循環的な依存関係(またはループ)はありません(つまり「非循環」)、従ってこのワークフローは DAG と見なされます

ワークフローがどれほど単純または複雑であっても、前述の太字の基準を満たしている限り、DAG として表すことができます。一方、次のワークフローのように、ワークフローに明確なループ(循環依存関係)が含まれている場合、それは DAG ではありません。ループから抜け出す方法がない場合、ワークフローは永久に実行される可能性があります。

Airflow は シェル や Python を使った直接的なコマンド実行も含め、実行しているものに関係なく、何でもオーケストレーションして実行します。

Airflow のアーキテクチャーとデプロイメント

少なくとも、Airflow のインストレーションは次のコンポーネントで構成されます。

  • Scheduler  (スケジューラー) : このコンポーネントは、スケジュールされたワークフローをトリガーし、実行するタスクを Executer に送信します。Executer は個別のコンポーネントではなく、スケジューラープロセス内で実行されるスケジューラーの設定プロパティです。
  • Webserver  (Web サーバー) : このコンポーネントは、DAG とタスクを表示し、トリガー、デバッグできるユーザーインタフェース(UI)を提供します。
  • DAG ディレクトリ : このコンポーネントには、スケジューラーが読み取る DAG ファイルが含まれています。実行するタスクとそれがいつ実行されるかを確認できます。
  • メタデータデータベース : このコンポーネントには、ワークフローやタスクの状態、DAG が正常に実行されたかどうかなど、DAG の実行に関する情報が保存されます。

以下に示す Airflow コンポーネントはオプションですが、Airflow の拡張性、スケーラビリティ、パフォーマンスを向上させるために使用することができます。

  • Worker (ワーカー) : このコンポーネントは、スケジューラーによって指定されたタスクを実行します。ベーックインストールでは、このコンポーネントは個別のコンポーネントではなく、スケジューラーの一部になる場合があります。
  • Trigger (トリガー) : このコンポーネントは、非同期イベントループで遅延タスクを実行します。
  • DAG プロセッサ:このコンポーネントは DAG ファイルを解析し、メタデータデータベースにシリアル化します。このコンポーネントはデフォルトではスケジューラーの一部ですが、スケーリングとセキュリティのために個別に実行することもできます。
  • プラグインディレクトリ:プラグインを使用すると、Airflow の機能を拡張できます。

これらのコンポーネントはすべて Python アプリケーションで、さまざまなデプロイメントメカニズムを使用してデプロイできます。次の図に示すように、スケジューラーと Web サーバーのみで構成されるシンプルなセットアップで、単一のマシンで  Airflow を実行できます。

また、コンポーネントを異なるマシンで実行させるような分散環境でも Airflowを実行することができます。また複数の実行インスタンスを追加してスケールすることもできます。

OpenTelemetry による Airflow のモニタリング

Airflow の基本を理解したところで、Airflow のモニタリングが重要な理由、モニターに OpenTelemetry を使用する理由、最後に Airflow に OpenTelemetry を軽装する方法について記載します。

Airflow をモニタリングする理由

ワークフローをプロアクティブにトラブルシューティングおよび管理できることが Airflow 実行をモニターし、アラート化することの利点です。問題が発生した場合、問題とその根本原因についてのインサイトを得ることで、すぐに復旧して再稼働させることができ、Airflow ワークフローの信頼性とパフォーマンスを向上させることができます。

モニター対象

ユースケースに固有のモニター対象メトリクスがある場合もありますが、一般的に追跡すべき重要な項目は次の通りです。

  • Airflow コンポーネント (スケジューラーやワーカーなど) の稼働時間を含む健全性チェック
  • DAG:DAG の解析にかかった時間、DAGが 終了するまでにかかる平均時間、DAG がスケジュールを遅らせている頻度、DAG が依存関係チェックを完了するのにかかる時間
  • プールの使用率が上昇しているかどうか
  • ジョブ実行ステータス、Executer タスクステータス、オペレーターごとの実行ステータス、タスクインスタンスのステータス
  • 重要なタスクやセンサーの処理に時間がかかっているかどうか
  • カスタムメトリックがキャプチャされているかどうか

なぜ OpenTelemetry なのでしょうか?

Airflow ワークフローをモニターするためのマネージドソリューションは多数ありますが、データの送信、保存、分析場所に関して高い柔軟性を求める場合は、OpenTelemetry が最適です。OpenTelemetry は、アプリケーションとインフラストラクチャに関する情報の生成に使用されるオープンソースのオブザーバビリティフレームワークです。ベンダーに依存しないため、これを使用してアプリを計装すると、New Relic などの任意のバックエンドにデータをエクスポートできるようになります。

Airflow 向け OpenTelemetry の実装

現在、Airflow では認証ヘッダー付きの OpenTelemetry データ送信がサポートされていないため、New Relic の認証には OpenTelemetry コレクター などを利用する必要があります。つまり、メトリクスの送信先をコレクターにルーティングするように設定します。そしてコレクターは New Relic のライセンスキー を使用して、データを OpenTelemetry データ受信用の New Relic エンドポイント に転送します。

まず、Airflow をデプロイする方法に応じて、otel extra を使用して Airflow パッケージをインストールします。
PyPI から Airflow をインストールする場合は、Airflow の インストールドキュメント を参考に、次の pip コマンドを使用します。

pip install "apache-airflow[otel]"

Docker を使用して Airflow をインストールする場合は Airflow Docker イメージを設定した後、Dockerfile を使用して otel extra をインストールして、ビルド済みイメージを拡張します(必要に応じて、$AIRFLOW_VERSION を特定のバージョンに設定することで  latest から変更が可能です。*Airflow v2.7.0* で、OpenTelemetry のサポートが追加されています)。

FROM apache/airflow:latest
RUN pip install --no-cache-dir "apache-airflow[otel]==$AIRFLOW_VERSION"

次に、OTEL コレクターのインストール手順 を使用して OpenTelemetry コレクターを設定し、下記のコンフィグサンプルを使用してコレクターを実行します。

receivers:
 otlp:
   protocols:
     grpc:
     http:


processors:
 batch:


exporters:
 otlphttp:
   endpoint: <INSERT_NEW_RELIC_OTLP_ENDPOINT>
   headers:
     api-key: <INSERT_NEW_RELIC_LICENSE_KEY>


service:
 pipelines:
   traces:
     receivers: [otlp]
     processors: [batch]
     exporters: [otlphttp]
   metrics:
     receivers: [otlp]
     processors: [batch]
     exporters: [otlphttp]
   logs:
     receivers: [otlp]
     processors: [batch]
     exporters: [otlphttp]

また、下記内容は必ず置き換えてください。

  • <INSERT_NEW_RELIC_OTLP_ENDPOINT> を適切なNew Relic OTLPエンドポイントに置き換え
  • <INSERT_NEW_RELIC_LICENSE_KEY> をライセンスキーに置き換え

Docker を使用している場合は、実行中のインスタンスからコレクターのポート 4318 にアクセスできることを確認してください。そうでない場合は、Dockerネットワーク を使用する必要がある場合があります。Airflow は OpenTelemetry over HTTP  (ポート 4318) を使用してメトリクスを送信します。

もう 1つ注意すべき点として、コレクターと並行して Docker コンテナで Airflow を実行している場合は、otel_host 設定を localhost からコレクターのコンテナアドレスに更新する必要があります。

最後に、airflow.cfg ファイルか環境変数で、Airflow の設定を行います。

  • airflow.cfg ファイルで設定する
[metrics]
otel_on = True
otel_host = localhost
otel_port = 4318
otel_ssl_active = False
  • 環境変数で設定する場合
export AIRFLOW__METRICS__OTEL_ON=True
export AIRFLOW__METRICS__OTEL_HOST=localhost
export AIRFLOW__METRICS__OTEL_PORT=4318
export AIRFLOW__METRICS__OTEL_SSL_ACTIVE=False

メトリクス名が  OpenTelemetry の上限制限である 63 バイトを超えてしまうような場合は、名前の変更することが可能です。

airflow.cfg ファイルの [metrics] セクションに  state_name_handler オプション を設定して、新しい名前を返す関数を指すように設定することができます。以下にその例を挙げます。

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

データが New Relic に取り込まれると、タスク、演算子、DAG 実行をメトリクスとして視覚化し、重要なメトリクスにアラートを設定できます。