Hadoop MapReduceで大きな相関行列を計算する(Calculate Large Correlation Coefficient Matrix with Hadoop MapReduce)

前回までのログで、観測値行列から相関行列を求めるための一通りの仕組みができた。

この開発をスタートするときに、以下の目標を立てた。

実行時間の目標:以下のクラスターを用い、5000変量で、各変量につき5000サンプルあるとして1時間以内での計算を行う。
インフラ Amazon Elastic MapReduce
リージョン US Standard
インスタンスタイプ m1.small
マスタ・インスタンスグループ 1インスタンス
コア・インスタンスグループ 8インスタンス
タスク・インスタンスグループ 10インスタンス

観測値データ

[0,10]の一様乱数から発生させた小数点以下1桁までのデータ(2500万個=5000*5000)を利用した。このデータはPCで生成した(Javaプログラム)。
=> 有効な桁数が少ない(=データサイズが小さい)ので、今後の課題として、その評価も必要になると思われる。

実行方法

Amazon Elastic MapReduceのマスタノードへ、sftpを使い、観測値データとプログラム(jarファイル)、ジョブを流すためのshellを送った。

SSHでマスタノードへログインし

hadoop fs -mkdir /usr/hadoop/input
hadoop fs -mkdir /usr/hadoop/output
hadoop fs -copyFromLocal indexedmatrix5000.txt /usr/hadoop/input

でフォルダーをHDFS上に作成し、sftpで送ったデータをHDFSにコピーする。シェルは、この作業の後に流すようになっている。

以下にshellプログラムを示す。

#!/usr/bin/env bash

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.Means /usr/hadoop/input/ /usr/hadoop/output/Means

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.StandardDeviations /usr/hadoop/input/ /usr/hadoop/output/Means/ /usr/hadoop/output/SD

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.InvertSD /usr/hadoop/output/SD/ /usr/hadoop/output/InvSD

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.MatrixSubMeans /usr/hadoop/input/ /usr/hadoop/output/Means/ /usr/hadoop/output/MatrixSubMeans

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.TransformMatrix /usr/hadoop/output/MatrixSubMeans/ /usr/hadoop/output/SubMeansMatrix1 no

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.TransformMatrix /usr/hadoop/output/MatrixSubMeans/ /usr/hadoop/output/SubMeansMatrix2 no

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.MatrixMult /usr/hadoop/output/SubMeansMatrix1/ /usr/hadoop/output/SubMeansMatrix2/ /usr/hadoop/output/Cov 5000 5000 200 200

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.TransformMatrix /usr/hadoop/output/Cov/ /usr/hadoop/output/SubMatrix1 no

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.CorrelationCoefficient /usr/hadoop/output/SubMatrix1/ /usr/hadoop/output/InvSD/ /usr/hadoop/output/CC_tmp 5000 200

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.TransformMatrix /usr/hadoop/output/CC_tmp/ /usr/hadoop/output/SubMatrix2 yes

hadoop jar hadoopsample.jar com.tetsuyaodaka.hadoop.math.matrix.CorrelationCoefficient /usr/hadoop/output/SubMatrix2/ /usr/hadoop/output/InvSD/ /usr/hadoop/output/CC 5000 200

実行結果

観測行列から相関行列を求めるまでには、9つのMapReduceジョブを経る。
以下が上記のクラスタで実行した結果である。52分強で計算が終了し、5000行5000列の相関行列を得る事ができた。



結果の確認

実行結果から、相関係数が0.5よりも大きいものを取り出すMapReduceプログラムを作り、計算結果のチェックを行った。
変量間は、無相関な乱数としたので、ほぼ0に近い値となり、下のように対角要素のみが抽出され、(丸め誤差が生じているが)結果はほぼ1となった。
プログラムが正常に動作していることを示していると考えていいだろう。

実際に解析を行う場合には、変数感に何らかの相関を発見したいので、対角要素は抽出されないようにするのがよい。

1103 1103	1.03
1115 1115	1.01
1125 1125	1.03
1137 1137	1.05
1147 1147	1.07
1159 1159	1.03
1169 1169	1.04
1179 1179	1.04
1210 1210	0.99
1220 1220	1.04
1230 1230	1.03
1232 1232	1.04
1242 1242	1.01
1252 1252	1.06
1264 1264	1.04
1274 1274	1.05
1286 1286	1.04
1296 1296	1.06
1391 1391	1.04
1609 1609	1.05
1619 1619	1.04

追記

5000行5000列ができたのだから、10000行10000列はどうなのだろうと、同じクラスタで実行してみたが、行列積の計算部で、Heapサイズが足らずに落ちてしまう結果となった。
そこで、スケールをm1.mediumに変更して、上と同様の19台のクラスタで実行してみたのが、以下の結果である。m1.mediumは、m1.smallの倍以上のメモリが利用できるので、Heapで落ちることはなかった。
演算回数は、行(列)の3乗のオーダーとなるが、スケールアップしたことで、5000行5000列と同程度、もしくは少ない時間で計算終了した。



また、以下にm1.middleで5000行5000列を実行したときの結果を記載しておく、実行時間が半減しているが、行列積の計算時間の短縮が著しい。


課題

以下は、今後の課題とする。

  • 有効桁数を増やす。
  • 相関行列は対称行列であるから、全ての要素を計算する必要はない。(ただし、直感的には、分解することで速くなるようには思われない)
  • ジョブ中の各ステップで発生する中間ファイルは可読である必要はないので、圧縮を検討すべき。(入出力もそうではあるが。。。)
  • 利用するHeapサイズを小さく納めるためには、行列積を求めるアルゴリズムで列での分割を行う必要があるかもしれない。ただし、細かく部分行列に分解することでオーバーヘッドが極端に大きくなる可能性がある。これは、以前のログのケース1と同様の現象である。