New Relicは、1日に数ペタバイトのオブザーバビリティデータを取り込んで処理する大規模なストリーミングプラットフォームを運営しています。このプラットフォームの中核となるのは、強力なオープンソースの分散イベントストリーミングテクノロジーであるApache Kafkaです。New Relicは10年以上にわたってApache Kafkaを使用しています。その間、チームは効率性と信頼性を最大限に高めながら、低レイテンシでデータを処理する方法を学んできました。

Apache Kafkaを効率的に実行するための鍵の1つは、データを受信するストリーム処理アプリケーションの部分であるConsumerを適切に調整することです。Kafka Consumerを調整する方法を理解するには、まず適切なインストゥルメンテーションがあることを確認する必要があります。

この記事では、Kafka Consumerをインストゥルメント化する方法を示し、それらのメトリクスを解釈していくつかの一般的な問題を検出して修正する方法について説明します。

インストゥルメンテーション

Kafka Consumerを調整する前に、現在のパフォーマンスとリソースの使用状況を診断するための高品質のインストゥルメンテーションが必要です。この記事は、New Relicで主に使用されている公式の Apache Kafka Javaクライアント によって提供されKafka Consumerのチューニングに焦点を当てていますが、分析の手法は他の言語で記述されたKafkaクライアントにも適用されます。Apache Kafka Javaクライアントは、多くの重要な Kafka Consumerメトリクス を追跡し、New Relic Javaエージェントと組み合わせることで それらのKafka Consumerメトリクスを自動的に収集できます。さらに、これらのメトリクスはKafkaクライアントを使用するアプリケーションのAPM画面内に キュレーションされたNew Relic Kafka UIとして自動的に認識され表示されるようになりました。

New Relic Kafka UIのスクリーンショット

librdkafkaをベースにした他の言語の Apache Kafkaクライアント の場合、内部メトリクスを収集する方法についてはその言語のドキュメントに従う必要があります。librdkafka metricsも、Javaクライアントによって生成されたものとは厳密には一致しません。このガイドの残りの部分では、Javaクライアントのメトリクスのみを参照し、チューニングの決定については上記のNew Relic Kafka UIを参照します。

Kafka Consumerフェッチのバッチ処理について

Kafka Consumerのフェッチレートは、主にフェッチ時にレコードがどのようにグループ化されるか、およびフェッチする頻度によって決まるため、そのメカニズムについて少し説明しましょう。Kafkaのより簡単な入門については、Confluentドキュメントを参照してください。

Kafka ConsumerはKafkaブローカーにフェッチリクエストを送信し、1つ以上のトピックと各トピック内の1つ以上のパーティションからのレコードを要求します。各フェッチ応答には、リクエスト内のトピックとパーティションごとに 複数のレコードバッチ を含めることができます。Kafka Consumerは複数のフェッチを並行して送信できますが、サブスクライブまたは割り当てられているデータを持つ各ブローカーには1つのフェッチのみを送信します。以下は、フェッチ応答の構造を示す図であり、次に説明する設定の一部が含まれています。

Kafka Consumerフェッチレスポンスの構造

フェッチ応答のサイズとレイテンシを制御する設定が4つあります。

上記のドキュメントにリンクされている説明をぜひ一読してみてください。以下にそれらの説明を簡略化して要約します。

fetch.max.bytes設定は、この値より大きい個々のレコードバッチがないと仮定して、フェッチ応答の最大データ量を制御します。max.partition.fetch.bytesはフェッチ内のパーティションごとに返されるデータの最大量を制御します。ここでも、この値より大きい個々のレコードバッチがないことを前提としています。ここでの重要なポイントは、これらの設定値は絶対な最大値ではないということです。単一のフェッチ応答には、応答内の複数のパーティションから返されたデータが含まれる可能性があり、また最大値よりも大きいサイズの単一のレコードバッチが返されることがあるからです(確実にデータを取得できるようにするため)。

fetch.min.bytes設定は、ブローカーによって返される前にフェッチ応答に含まれる必要がある最低データ量を制御します。そして最後に、fetch.max.wait.msはブローカーがfetch.min.bytes閾値に達するまでフェッチリクエストを待機して応答するまでの時間を決定します。

意図しないスケーリング、スループット、コストへの影響を回避するには、これらの設定を調整することが重要です。例えば:

  • スループットがKafka Consumerによって制限されている場合、アプリケーションはCPUリソースを十分に活用できない可能性があります。
  • スループットがKafka Consumerによって制限されている場合、アプリケーションは必要以上に速くスケールアウトする可能性があります。
  • アプリケーションがデータを頻繁にポーリングすると、Kafka Brokerに過度の負荷がかかり、Kafka BrokerのCPUが浪費される可能性があります。

Kafka Consumerのフェッチバッチ処理の監視と調整

フェッチサイズとレイテンシを制御する設定がわかったので、Kafka Consumerのバッチ処理パフォーマンスを示すメトリクスを見てみましょう。New Relic Kafka UIの使用上で述べたように、Consumerタブをクリックして少し下にスクロールすると、Fetching というセクションに以下の5つのメトリクスが表示されます。

New Relic Kafka UIでのメトリクスの取得

これらのメトリクスからすぐに検出できる2つの一般的なシナリオについて説明します。

シナリオ1. テーブルにデータが滞留してしまう:パーティションフェッチの最大サイズに達する

平均フェッチサイズfetch-size-avg)メトリクスが一貫して1 MiB * number of partitions per fetch request付近にある場合(フェッチリクエストあたりのパーティションの計算方法については後ほど説明します)、コンシューマーがmax.partition.fetch.bytes制限に達している可能性があります。max.partition.fetch.bytesのデフォルト値は 1MiBです。つまり、ブローカーは要求したパーティションに対してより多くのデータがある場合でも、パーティションごとに最大1MiBの応答を送信します。この結果、Kafka Consumerはブローカーごとに1つの実行中のフェッチ(またはバッファリングされたフェッチ)しか許可しないため、発行されるリクエストが大幅に増加し、スループットがブローカーへの往復時間によって制限される可能性があります。
このシナリオを確認する別の方法は、平均フェッチリクエストレートfetch-rate)を確認することです。(注:fetch-rateの値は、ブローカーごとではなく、コンシューマーがフェッチするすべてのブローカー全体にわたります)。適切なフェッチレートは、ブローカーあたり 1 秒あたり1~5件のリクエスト程度です。これは、200~1,000ミリ秒のフェッチレイテンシに相当するためです。平均フェッチサイズが常に制限に達しており、平均フェッチリクエストレートブローカーあたりで1秒あたり5リクエストを超えている場合は、小さなフェッチリクエストが多数作成されている可能性があります。これにより、アプリケーションのスループットが制限されるだけでなく、大量のリクエストレートによりKafka Brokerの容量が急速に消費され、多大なコストに直接つながる可能性があります。

max.partition.fetch.sizeを増やす

ここでの明らかな解決策は、max.partition.fetch.sizeの値を増やすことです。しかし、どの程度の値を設定すればよいのでしょうか?注目すべき点の1つは、fetch.max.bytes のデフォルト値が50MiBであることです。これは、フェッチ応答全体にとって安全なサイズと見なされるものについての適切な視点です。これを別の方法で考えると、デフォルト設定で50MiBフェッチを完全に利用するには、フェッチごとに50個のパーティションを消費する必要があり、これは非常に大きな量です。

フェッチリクエストあたりのパーティション数がわかっている場合は、max.partition.fetch.sizeをフェッチあたり50MiBに近づく値まで増やすことができます。

max.partition.fetch.size = 50MiB / partitions per fetch

フェッチあたりのパーティション数は一定ではない可能性が高いことに注意してください。グループメンバーシップが変更されたり、クラスタの数が変更されたりすると、この値は変動する可能性がありますが、これは問題ありません。クラスタ内のブローカーの数がわかっている場合は、assigned-partitions のメトリクスを使用してコンシューマーあたりの割り当てられたパーティションの平均数を求め、フェッチあたりのパーティション数を見積もることができます。

average partitions per fetch = min(ceil(assigned-partitions / broker count), assigned-partitions)

これは、パーティションがクラスタ全体に均等に分散されていることを前提としているため推定値であり、検証する価値があります。フェッチあたりの平均パーティション数がわかったので、それを使用して必要な max.partition.fetch.sizeを計算できます。

max.partition.fetch.size = fetch.max.bytes / average partitions per fetch

max.partition.fetch.sizeを上記の値に増やすと、リクエストレートが低下し、コンシューマーがアプリケーションのボトルネックになるのを防ぐことができます。これにより、重要なKafka Brokerインフラストラクチャのコストとリソース使用量が節約され、アプリケーションのパフォーマンスが向上します。リクエストレートがまだ高い場合は、fetch.max.bytesを増やすことも検討してください。あらゆる設定変更と同様にその影響を適切にテストし、負荷テストを実施してください。

メモリチェック

max.partition.fetch.sizeまたはfetch.max.bytesを増やすと、特にビジネスロジックに負荷がかかっている場合、コンシューマーがメモリに保持するデータの量が大幅に増加する可能性があります。これはスループットを最大化するには良いことですが、メモリ不足を避けるために、Java仮想マシン(JVM)に十分なメモリを割り当てるようにする必要があります。取得したレコードにアプリケーションが必要とするメモリの量を見積もるには、次のメトリクスを使用できます。

fetched records max memory bytes = max.partition.fetch.size * assigned-partitions / compression-rate

残念ながら、Kafka Consumersは Kafkaプロデューサーのように 圧縮率メトリクスを収集しないため、Kafka Consumerのincoming-byte-rateとbytes-consumed-rateを使用して平均圧縮率を見積もります(アップストリームプロデューサーからのメトリクスがある場合は、それを使用して圧縮率を取得できます)。

compression-rate = incoming-byte-rate / bytes-consumed-rate

incoming-byte-rateはグローバルメトリクスであり、bytes-consumed-rateはトピックごとのメトリクスであることに注意してください。複数のトピックから消費する場合は、各トピックのbytes-consumed-rateを合計して、全体の平均圧縮率を計算する必要があります。

メモリ不足が発生しないように、最も負荷の高い状況でコンシューマーのメモリ使用量をテストおよび分析してください。

シナリオ2. データがない(少ない)のにデータ取得しようとしている:最小バイトフェッチサイズが低い

スループットの高いトピックがある前の例では、要求に対して十分なデータが取得できずにデータが滞留するシナリオについて説明しました。しかし、スループットの低いトピックがある場合はどうなるでしょうか?スループットの低いトピックでは、さらに多くのKafkaリソースを浪費する可能性があることがわかりました。トピックの平均フェッチサイズが非常に低く(< 100 KB)、平均フェッチレイテンシも非常に低い(< 100 ms)場合、コンシューマーはバッチ処理を過剰に行っている傾向と判断できます。もう一度、平均リクエストレートを確認してください。ブローカーあたり1秒あたり1 ~5のリクエストよりも大幅に高い場合は、Kafkaブローカーに大きな負荷がかかっています。

なぜこのようなことが起こるのでしょうか?これはfetch.min.bytes のデフォルト値が1バイトだからです。この設定は、ミリ秒のレイテンシが主な懸念事項である場合には適していますが、コストを大幅に節約するために多少のレイテンシを犠牲にする場合には適していません。デフォルト値の1バイトは、低スループットのトピックに到着する最初のレコードバッチが、応答で返される唯一のレコードバッチになる可能性が高いことを意味します。

リクエストレートを下げてバッチのサイズを増やすには、fetch.min.bytesを調整します。その適切な値の求め方を解説します。

まず最初に行うことは、Consume時のレイテンシ許容度を決定することです。デフォルトでは、fetch.max.wait.msは500ミリ秒に設定されていますが、コンシューマーによって発生するレイテンシを250ミリ秒に制限したい場合は、fetch.max.wait.msを250ミリ秒に変更します。

次に、単一のアプリケーションインスタンスのブローカーあたりの現在のスループットを計算する必要があります。エージェント設定で ノードレベルのメトリクスが有効 になっている場合は、consumer-node-metricsスコープの incoming-byte-rateを使用して、ブローカーごとに1秒あたりに消費されたバイト数をホスト単位でグルーピングして表示できます。以下は、New Relicクエリ言語(NRQL)タイムスライスクエリの例です。

SELECT average(newrelic.timeslice.value) FROM Metric
WHERE appName = 'my-stream-processor'
AND clientId = 'my-stream-processor.event-consumer'
AND host = 'my-stream-processor-5597b54c6d-n5gm9'
WITH METRIC_FORMAT 'MessageBroker/Kafka/Internal/consumer-node-metrics/node/{nodeId}/client/{clientId}/incoming-byte-rate'
FACET nodeId

nodeIdでファセットされた受信バイトレートクエリの例

上記のスクリーンショットでは、この1つのアプリケーションインスタンスに対して、各ブローカーから最大1KiB/秒を消費しています。スループットとレイテンシ許容度がわかったので、fetch.min.bytesに適切な値を選択できます。

fetch.min.bytes = average throughput per second * max latency seconds

私が説明したシナリオでは、fetch.min.bytesを256以上に設定することになります。

256B = 1KiB/s * .2s

多くのシナリオでは、fetch.min.bytesに非常に高い値を選択し、fetch.max.wait.msのみを使用してレイテンシ許容値を制御する方が簡単な場合があります。ただし、上記の計算は、fetch.min.bytesをどの程度高く設定する必要があるかを把握することができるので便利です。

こうすることで、低スループットのトピックのコンシューマー向けにfetch.min.bytesを増やし、 fetch.max.wait.msの値を調整することで、レイテンシへの影響を最小限に抑えながら、Kafka Brokersに対するリクエストレートの負荷を大幅に削減できます。

結論

この記事では、最終的にパフォーマンスを最大化し、コストを最小限に抑えるために、Kafka Consumerをチューニングするために高品質のインストゥルメンテーションが必要な理由を説明しました。max.partition.fetch.bytesfetch.min.bytesの低いデフォルト値は、Kafka Consumer設定に存在する可能性のある非効率性の2つの例にすぎません。New Relic Kafka UIの探索を続け、Kafka ConsumerとKafka Producersをより深く分析してみてください。 そして、この記事で紹介したような傾向が見えた場合にKafka Consumerメトリクスを使用してアラートを設定することは、クラスタまたは計算リソースを浪費しているクライアントを見つけるのに役立つ、もう1つの強力なツールであることを忘れないでください。