はじめに

本ブログ記事では、Kafka monitoring integration を用いて、コンシューマラグをモニタリングする手順について解説します。本記事では、簡単のため Kafka パッケージの標準コマンドのみで擬似的にトラフィックを発生させ、New Relic 上でラグを観測してみます。まずは単純な構成でお試しいただき、検証環境や本番環境で導入する Kafka monitoring integration 設定の勘所を掴んでいただくことを目的としています。

事前準備

Kafka のダウンロード

事前準備として、「Kafka のダウンロード」「インフラストラクチャエージェントのインストール」の 2 点にご対応ください。Kafka のダウンロードに際しては、以下のリンクからご利用のシステムに対応したバージョンをダウンロードしてください。

Downloads | Apache Kafka

インフラストラクチャエージェントの設定

Kafka monitoring integration はインフラストラクチャエージェントの Integration として実装されています。そのため、Kafka monitoring integration を導入するためには、まずインフラストラクチャエージェントをインストールしてください。

続いて、Kafka monitoring integration で、コンシューマラグを計算する際に必要となるオフセットを取得するように設定します。インフラストラクチャエージェントの設定ファイルを格納するディレクトリ内に Kafka monitoring integration の設定ファイルを配置します。例えば、Linux OS 上で標準のインストール手順を実行した場合には、/etc/newrelic-infra/integrations.d/kafka-config.yml に以下のような内容のファイルを配置します。

integrations:
  - name: nri-kafka
    env:
      CONSUMER_OFFSET: true
      INACTIVE_CONSUMER_GROUP_OFFSET: true
      CONSUMER_GROUP_OFFSET_BY_TOPIC: true
      CLUSTER_NAME: testcluster
      AUTODISCOVER_STRATEGY: bootstrap
      BOOTSTRAP_BROKER_HOST: localhost
      BOOTSTRAP_BROKER_KAFKA_PORT: 9092
      BOOTSTRAP_BROKER_JMX_PORT: 9999
      BOOTSTRAP_BROKER_KAFKA_PROTOCOL: PLAINTEXT
      CONSUMER_GROUP_REGEX: '.*'
    interval: 15s

上記の設定項目の中で、最も重要なのは CONSUMER_OFFSET の項目です。CONSUMER_OFFSET を true に設定することにより、KafkaOffsetSample イベントタイプでコンシューマラグが記録されるようになります。CONSUMER_OFFSET が true に設定された場合、KafkaBrokerSample/KafkaTopicSample は取得されない点にご注意ください。nri-kafka は JMX と Kafka API により各メトリクスを取得します。コンシューマラグ自体は、JMX ではなく Kafka API により取得されます。上記の設定例では、BOOTSTRAP_BROKER_HOST/BOOTSTRAP_BROKER_KAFKA_PORT により、ローカルホストの 9092 番ポートのブローカーに接続しています。これらの設定により、KafkaOffsetSample イベントタイプのデータが New Relic に連携されます。

上記はあくまで設定の例となりますので、ご利用の環境に合わせて設定内容を調整してください。

Kafka サーバ起動手順

以下の手順では、カレントディレクトリは Kafka パッケージのトップディレクトリであることを前提とします。本項目では、Kafka のサーバを起動します。

まずは、以下のようなコマンドでクラスター ID を生成します。

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

続いて、作成されたクラスター ID に基づいてサーバプロパティを作成します。

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

作成されたサーバプロパティを利用してサーバを起動します。

JMX_PORT=9999 bin/kafka-server-start.sh config/kraft/server.properties

サーバを起動させたまま、以下のコマンドでテスト用のトピック(lag-test-topic)を作成します。

bin/kafka-topics.sh --create --topic lag-test-topic --bootstrap-server localhost:9092

サーバに関する準備項目は以上です。

Consumer 起動手順

以下のコマンドで、コンシューマを起動します。

bin/kafka-console-consumer.sh \
 --topic lag-test-topic \
 --from-beginning \
 --bootstrap-server localhost:9092 \
 --consumer-property enable.auto.commit=true \
 --consumer-property auto.commit.interval.ms=1000

consumer-property でラグ計算の元となるオフセットを自動コミットするように設定します。コンシューマグループ名を明示的に指定し、NRQL で設定されたコンシューマグループ毎の傾向を把握するためには、--group オプションでコンシューマグループを指定することも可能です。

Producer 起動手順

続いて、以下のコマンドで、コンシューマの処理能力を超えることが見込まれる大量のメッセージをトピックに投入し、ラグを発生させてみます。

for i in {1..10000}; do echo "message-$i"; done | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic lag-test-topic

何度か同様の処理を実行することで、New Reilc 上で時系列グラフを表示した際にコンシューマラグの推移がわかりやすくなります。

NRQL でのコンシューマラグの確認

New Relic の Web UI 上で以下のような NRQL クエリを実行し、コンシューマラグを確認してみましょう。

FROM KafkaOffsetSample
SELECT max(consumerGroup.maxLag)
WHERE topic = 'lag-test-topic'
TIMESERIES

以下のようにグラフが表示されたら、Kafka monitoring integration は適切に設定されていると考えられます。

NRQL generated chart of maxlag attribute in KafkaOffsetSample

実際の運用の際には、以下のように consumerGroup 毎にラグを NRQL で表示すると、滞留が発生しているグループがわかりやすくなります。

FROM KafkaOffsetSample
SELECT max(consumerGroup.maxLag)
WHERE topic = 'lag-test-topic'
FACET consumerGroup
TIMESERIES

まとめ

本ブログ記事では、Kafka の標準コマンドのみでコンシューマラグを New Relic に連携する手順を見てきました。ラグの可視化が完了したら、アラートなど次のステップについてもご検討ください。