MapReduce: Simplified Data Processing on Large Clusters メモ

Hadoop MapReduceが、2004年12月に行われた6th Symposium of Operating System and Implementation(San Francisco , Dec.6-8)において発表された「MapReduce: Simplified Data Processing on Large Clusters (Dean, J. and Ghemawat, S. 2004)」に触発されて開発されたことは有名である。

今でも、この論文を入手することができるので、MapReduceを考察する上では読んでおかなくてはならない(だろう)。

読後、最初の感想としては、この論文にきわめて忠実に、Hadoop MapReduceが開発されているということ、が言えると思う。
また、印象に残った点としては、

  • クラスタ・サイズは1000以上のオーダーを想定していること。
  • Googleの通常の思想そのままを反映して、PCは家庭用+アルファ程度の廉価なサーバーで構成することを想定していること。
  • あらゆるリソースの中で、バンドウィドゥスをもっとも貴重と考えていることを。
  • Googleでの通常運用にまで踏み込んだ論考になっていること。

である。

遅いSATAディスクが紛れ込んだ場合のインパクトまで踏み込んで、徹底した冗長化、Fault Tolerantを追求する姿勢は求道的な雰囲気まで感じる。

さて、この論文で述べられている、MapReduceについて、できるだけ原論文に忠実に記載したいと思う。

MapReduceの適用範囲について

論文の冒頭(Abstract)は

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the paper.
...「MapReduce: Simplified Data Processing on Large Clusters」より引用

MapReduceは、大規模なデータセットを処理、生成するためのプログラミングモデルと、それに関連する実装である。利用者は、キー/バリューペアを処理をして、中間的なキー/バリューペアを生成するmap機能を仕様化し、reduce機能がすべての中間的なすべてのバリューを、中間的なキーに基づいてマージする。多くの現実世界のタスクは、その論文で示すように、このモデルで表現可能である。(訳:小高)

と力強い(一般的に、力強くない論文なないので差し引かなくてはならないが)。

MapReduceの開発は、2003/11にファーストバージョンがリリースされ、発表時点で数百のMapReduceプログラムと、1000近くのジョブが毎日稼働している、と書いている。

2.3節 More Examples において

  • 分散 Grep
  • URLのアクセスカウント(Webログ解析の一つ)
  • リバース・Webリンクグラフ(バックリンクのカウント)
  • ホスト単位のTerm Vector(Term Vectorとは、ドキュメントやドキュメントセットに含まれるワードとその頻度の組み合わせ、とのこと)

の算出方法について簡単に説明されている。

また、6章において、Googleでサービス化された利用例として、

  • 大規模機械学習
  • Google NewsやFroogle(現Google Product Search)
  • ポピュラーな検索語から報告書を作成するデータ抽出(日本語がへんだが、Google Zeitgeistのようなデータの2次利用によるサービス)
  • Webページからの新しい体験(Geolocationの抽出など)
  • 大規模なグラフ計算(ここでいうグラフはFacebookでいうような、物事や人の関連を意味する)

があげられている。

データを徹底的に使い尽くす、まさに「ビッグデータ」というサービスがあげられている。

MapReduceの仕組みについて

まず、抽象的なモデルとして、mapとreduceが以下の記号を使って表記されている。
矢印の左は入力、右は出力である。

map (K1 , V1) → list(K2, V2)
reduce (K2,list(V2)) → list(V2)

ここで、list()という抽象化されているのは、Javaのインターフェースみたいなものではなくて、文字列を連結したものといった意味でとらえるべきかと思う(論文中にもreduceの出力は、典型的には0か1つのnアウトプットである、と書かれている)


以下の図は、論文のFigure.1であって、これがMapReduceの具体的な仕様を表している。

3.1節のExecution Overviewで述べられているが、概略は以下のようになっている。
図中、Master、workerと書かれているのが、MapReduceを構成するクラスターである。

  1. 入力データ(GFS上)は、16MBから64MBのブロックに分割される(説明ではM個のsplit)。処理が開始されると、プログラムのコピーがクラスターにばらまかれる
  2. 1つのコピー(クラスタ上のプログラム)はmasterという特別な存在で、M個mapタスクとR個のreduceタスクを、アイドル状態にあるworkerからアサインする。
  3. mapタスクをアサインされたworkerは、分割された入力ブロックからデータを読み込み、キー/バリューを解析して、利用者が作ったMap機能をとおして、中間的なキー/バリューをメモリーにバッファリングする。
  4. 周期的に、バッファリングされた中間的なキー/バリューペアはローカルディスクに書き込まれる。このとき、Partitioning関数でReduce関数用の書き込みパーティションを計算する(keyのハッシュ値をRで割った余り)。書き込み位置は、masterに通知され、reduceワーカーに伝達される。
  5. reduceワーカーが位置を通知されると、remote procedure callを使ってmapワーカーのローカルディスクからデータを取得する。すべての中間的なキー/バリューペアを読み込むと、キーでソートされ、グループ化される。ソートは、1つのreduceタスクに複数のキーが割り当てられるために、必要である。
  6. reduceワーカーは、ソートされた中間データをよみ、同一キーに対して、利用者が定義したreduce機能を実行する。実行結果は最終ファイルに書き出される(GFS上)。
  7. すべてのmapとreduceタスクが終了すると、masterがユーザープログラムを呼び出して、戻り値を返す。

GFSとの関係について

この論文は、MapReduceに関する論文なので、GFSについて明示的な説明が少ない。
論文中、Global File Systemと記載されているのは、GFSのことであって、MapReduceの入出力データが(最低3回レプリケートされる)GFS上で管理されることによって、安全に処理される。

これに対して、map機能(関数)が吐き出す中間データは、mapワーカーの(メモリーにバッファリングされた後に)ローカルディスクに書き出されると、明確に記載がある。これは、workerが処理の終了をmasterに通知することで、masterがアイドリング状態にあるwokerを常時しっていること。定期的にpingをうってworkerの死活監視を行い、死んでしまったワーカーはクラスタから切り離して、変わりのワーカーに処理をさせるという仕組みが組み込まれていることによる。

Overviewの3に書かれているように、Map機能は、GFS上の入力ファイルをブロックサイズに区切った単位で処理を開始する。
GFSと(ここで述べられている)MapReduceは一体のものということに注意する必要があろう。