分散処理環境について考えた事(レガシーな機械学習ジョブとの共存)
はじめまして!ウイングアーク1st の ehal と申します。日頃、お客さんに生成・識別 AI を透過的に活用してもらう方法について考えております。
AI モデルをのチューニングをされた事はありますか?最近の AI ベンダー各社から提供されているサービスでは、使用するモデルのドメイン適応したいデータをアップロードして、良しなにしてもらう方法が有りますので、それを活用して使用したい方面に特化したモデルを作り上げて便利に使用されている方もいらっしゃると思います。その際に使用するデータは、目的によって様々ですが、対象モデルの学習規模に見合ったデータ量が必要になるので大抵の場合でビックデータが対象になります。
今回はそのモデル学習に使用するビッグデータを効率的に処理するために必要な「分散処理」基盤について触れたいと思います。私が従来型の処理システムで経験してきた事を振り返り、現在主流となっている Apache Spark の活用について考えてみますのでお付き合いください。
大体14年ほど前に私が機械学習を仕事として取り組み始めた頃は、次の様なシステムを組んで、ビックデータを対象にした分析を行ってました。
PBS(クラスタノード間のジョブ管理)
MPI(複数ノード利用の計算処理アプリケーション実行)
NFS(全ノード間でのデータ共有)
計算処理を行うクラスタノード群を全て PBS の管理下に置いておき、巨大なデータ群を対象にする計算処理をシステムに依頼して分散実行するのが基本の流れです。
MPI を利用したアプリケーションを実行する際には MPI アプリケーションが実行するスレッド数を PBS で確保した上で PBS ジョブとして実行します(勝手に MPI で使用するノードを確保して使用すると、PBS が後に実行するジョブに割り当てる選択ノードと衝突する)。
また全ノードから見て処理対象のデータ・ファイル群が同一のファイルパスで参照できるようにしておきます(どのノードでジョブ実行しても同じ処理で同じ結果にするため)。
これは、いわゆる古典的なPCクラスタを利用するシステム構成です。現在でもこれに近いシステムで運用されているケースも多いのではないでしょうか。当時のハードウェアでこのシステムを構成した場合 1,000 を超える PBS ジョブ同時実行は十分に可能でした。
しかし、いきなり大量のデータを相手に大規模な計算ジョブを実行してしまうと、却って結果を得るまで時間が掛かってしまう問題が有りました。ビッグデータを格納する NFS で参照するストレージデバイスの性能問題です。
大体256スレッドが一斉に1ストレージにアクセスが連続して集中して、NFS サーバ側のワーカースレッドの半分ほどが I/O 待ちになった辺りから極端に性能が劣化していき、その結果 PBS ジョブがデータを取り出すまで長時間待たされてしまいます。
それを改善するために当時、色々とシステム側の改良や運用の仕方を変えてみましたが、ストレージサーバを用途に応じて複数に分けて、1ストレージにアクセスが集中しないように使用する前に調整して対処してました。ですから事前準備としてデータ配置に気を使ったり、配置のし直しが必要だったりして結構面倒です。
データアクセスが集中することで処理全体が進まなくなる点は間違いなく根本的に改善すべき点で、今でしたら高性能なエンタープライズ向け NVMe ストレージを利用した性能向上で容易に対処できますが、それでも同時実行するジョブ数を大幅に上げたいとか、PB オーダーのデータを対象に処理したい場合に高価なデバイスをたくさん並べるのは難しく、システム構成のコストを抑えるために安価なデバイスで何とかならないかと考えたくなります。これを実現するのに Apache Spark で行われる分散処理の機能を使って PBS ジョブの制御ができないでしょうか?
Spark は HDFS 上のデータ配置に基づいてジョブ実行を行うことができます。つまりクラスタノード毎のローカルストレージを束ねて作った HDFS 上に分析対象のビックデータを配置して Spark ジョブを実行すると、クラスタノード毎に分散配置されたデータはそのクラスタノードで処理され易くなる訳で、対象のビックデータ全体へのアクセスが最適化される事に繋がります。これまでの PBS による分散処理ジョブ制御では、ジョブが実行されるクラスタノードと処理対象データの配置関係が考慮されていませんでしたが、この Spark のデータパーティショニングに基づき分散処理を行うように変更できれば、上で触れたストレージアクセス集中の問題への解となると思います。ただこの方法は、今までの分散処理を PBS を介したジョブ起動から Spark アプリケーションとしての起動への変更を伴う事になるので、その変更で影響が有りそうな箇所についてチェックが必要でしょう(例えばジョブへのパラメータの渡し方など)。この方法の実現性については、今後検討を進めようと思ってます。
あと Spark は v1 の登場時から最新版の間で大きく進化していきました。個人的に特に素晴らしいと思うのは次の2点です。
Python 対応(pyspark)と Numpy, Scipy 等の主要モジュールのサポート
Kubernetes Native サポート
まず Python 対応については、Spark 導入への敷居が大きく下がったと感じています。更には慣れ親しんできた Numpy と Scipy を使った処理ルーチンに大きく手を入れずに分散処理化できるのは非常に大きな手助けとなります。今まで MPI で対応していた部分は、こちらに置き換えで良いかもしれません。
次に Kubernetes サポートは、その名から分かる通り Kubernetes クラスタ下で指定したコンテナを展開した上で Spark の分散処理を実行する機能です。これが出来て嬉しいのは、分散処理を実行する全クラスタノードのユーザランドの構成を完全に一致させて固定できる事です。普段はオンプレ環境のクラスタだけで回している処理をクラウドから一時的にインスタンスを調達して同時実行数を上げて処理するといった、クラスタ構成の動的変更に柔軟に対応するのを実現するのに大きな助けとなります。
今回は以上となります。ありがとうございました