Amazon Elastic MapReduceで、Apache Mahoutの分散次元縮約(Parallel ALS)を動かす

さて、夏休みもそろそろ終わり。総力(自分のですが)を結集して、一仕事してみたいと思います。

今回のログでは、MahoutのサンプルにあるParallel ALS(という次元縮約の分散計算アルゴリズム)を使ったジョブを動かしてみる。

Mahoutに、Parallel ALS(Alternating Least Square)が実装されているということは、協調フィルタリング(リコメンデーションエンジン)のエリアにとどまらず、注目に値することと思う。

注記:
Mahout0.7では、Amazon EMR(Hadoop1.0.3)で動いたのだが、0.8になってジョブが変更されており、RMSEまで計算したところでジョブが落ちてしまう。
RMSEが計算されているということは、予測値(リコメンデーション)まで計算できているということだが、ここでは、0.7の環境で実行する。


$MAHOUT_HOME/examples/binディレクトリにある、factorize-movielens-1M.shは、Movie Lens DataSets(http://www.grouplens.org/node/73)のデータ(1Mデータ)をもとにリコメンデーションを生成するサンプルで、Hadoop MapReduceを使ってParallelALSFactorization.jobが動く。

Movie Lens はミネソタ大学が行った、調査プロジェクト(http://www.grouplens.org/)で、実際の調査に基づいたデータセットを特定のライセンスで提供している(README、Webサイトにライセンス条項がある)。
1Mデータは約6000のユーザーが約4000の映画に対して5段階で評価を与えたデータ(Preferenceデータ)となっている。
正確には、ダウンロードしたデータのREADMEにあるように、

  • UserIDs range between 1 and 6040
  • MovieIDs range between 1 and 3952
  • Ratings are made on a 5-star scale (whole-star ratings only)
  • Timestamp is represented in seconds since the epoch as returned by time(2)
  • Each user has at least 20 ratings

となっていて、最低20以上の評価をした人のデータのみ抽出されている。(評価が極端に少ないと、評価者同士の類似度が計算できなくなる可能性がたかい。たとえば、2人のユーザーが共通に評価した映画が1つの場合、Pearsonの相関係数、Spearmanの順序相関係数、ジャカードインデックスといった代表的な類似度が算出できない)

Parallel ALS(Alternativing Least Square)は次元縮約のアルゴリズムの1つである。ここで、次元縮約とは、代数的に大きな次元の空間を、小さな次元の空間(への写像)で代替近似することをさす。この原論文は、HPラボの研究者による「Large-scale Parallel Collaborative Filtering for the Netflix Prize」で、分散実行環境で次元縮約を行うアルゴリズムが提案されている。

原論文では、MATLABの分散処理機能が用いられているが、Mahoutではその部分をHadoop MapReduceに書き換えてあると考えられる(まだ、実際にソースコードを調べていません)。

一般に、次元縮尺では、レーティングを表した行列を特異値分解(Singular Value Decomposition;SVD)し、適当な次元(情報の損失が大きくならない程度で、かつ、数桁の次元縮尺となる次元)に縮約するというのがストレートなソリューションなのであるが、ALSは特異値分解を用いずに、ユーザーとアイテムのそれぞれについてFeatureという新たな次元を追加することで、これを行う。

さて、それでは、Amazon EMR上でこのサンプルを動かしてみる。
EMRのマスタノードへログインし、環境変数をMahout用に変更する。

export HADOOP_CONF_DIR=/home/hadoop/conf
export MAHOUT_HOME=/home/hadoop/mahout-distribution-0.7
export CLASS_PATH=$HADOOP_CONF_DIR:$MAHOUT_HOME/conf
export PATH=$MAHOUT_HOME/bin:$PATH

Movie Lens DataSetsから1Mのzipをダウンロードし、$MAHOUT_HOME/examples/binで解凍する。
すると、$MAHOUT_HOME/examples/bin/ml-1m/ratings.datができる。中身を見てみると、以下のようになっている。左から、ユーザー(1-6040)、映画(1-3952のいずれか)、評価(1-5の整数)、タイムスタンプである。

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
….
….


次に、factorize-movielens-1M.shがHDFSを使うようになっていないため、Hadoop環境で動かすために、以下のdiffに示す変更を行う。Movie Lens DataSetsからダウンロードしたデータを一旦ローカルで加工したあと(オリジナルコードのまま)、HDFSにコピーするようにする。

43c43
< WORK_DIR=$MAHOUT_HOME/examples/bin/tmp/mahout-work-${USER}
---
> WORK_DIR=/tmp/mahout-work-${USER}
50,53d49
< hadoop fs -mkdir /usr/hadoop/tmp/movielens
< hadoop fs -copyFromLocal ${WORK_DIR}/movielens/ratings.csv /usr/hadoop/tmp/movielens
< WORK_DIR=/usr/hadoop/tmp
< 
73,75c69
< hadoop fs -copyToLocal /usr/hadoop/tmp/als/rmse/rmse.txt .
< cat rmse.txt
< #cat ${WORK_DIR}/als/rmse/rmse.txt
---
> cat ${WORK_DIR}/als/rmse/rmse.txt
79,81c73
< hadoop fs -copyToLocal /usr/hadoop/tmp/recommendations/part-m-00000 .
< shuf part-m-00000 |head
< #shuf ${WORK_DIR}/recommendations/part-m-00000 |head
---
> shuf ${WORK_DIR}/recommendations/part-m-00000 |head

実行用に、ローカルファイルシステムに以下のディレクトリを作成する。

mkdir $MAHOUT_HOME/examples/bin/tmp/mahout-work-hadoop

また、一応、HDFSに、このサンプルのためのディレクトリを作成しておく。

hadoop fs -mkdir /usr/hadoop/tmp

この準備ができたら、$MAHOUT_HOME/examples/binで

./factorize-movielens-1M.sh ml-1m/ratings.dat

とすれば、Hadoop MapReduceが流れ出す。(Explicit Feedbackの文言が見えるが、これについては、改めてログに書きたい)

最後に、以下の結果が返ってくる。実行時間は、34.75 Mins (2085 Sec)であった。

RMSE is:
0.8544593358567881

90%を学習に、10%を評価に使用したときの最小二乗誤差Root Mean Square Error が0.85程度という意味。


サンプル・リコメンデーションは各人に対して5番目まで計算され、以下の形式となる。

ユーザーID	[ 一番目にリコメンドする映画のID:推定評価値,  …..... ]

この結果をシャッフルして、先頭の10行を表示したものが画面に出力される。

Sample recommendations:
5410	[3307:5.0,1207:5.0,3469:5.0,615:5.0,904:4.93058,527:4.9215302]
4987	[3338:4.4963303,2700:4.4519396,50:4.385181,2858:4.3640075,2762:4.3376927,593:4.336168]
41	[2562:5.0,2084:5.0,2056:5.0,3012:5.0,1035:5.0,670:5.0]
4311	[572:4.233099,110:3.7144964,3753:3.7023435,356:3.6804912,858:3.6353533,3916:3.6033573]
5877	[3338:5.0,1200:5.0,260:4.9684415,589:4.867796,1210:4.8585286,1198:4.8541017]
697	[912:4.802189,913:4.748428,1207:4.6984935,969:4.6699486,3338:4.594149,1304:4.546064]
3436	[3338:4.8201466,318:4.7190824,50:4.7103577,541:4.6235585,3836:4.6017118,1036:4.593134]
1806	[2197:5.0,572:5.0,1148:5.0,745:4.9966965,1198:4.942266,598:4.9272003]
3606	[572:5.0,527:4.837927,318:4.742655,1271:4.7325063,1246:4.716095,2197:4.683401]
4563	[2329:5.0,3338:5.0,50:4.9770794,678:4.972953,2324:4.9693327,745:4.816033]


さて、6000ユーザーの4000映画に対するプリファレンスは、言葉を換えれば、4000次元の空間に6000のプロットをして、その類似度を計算したことになる。
これを、最初に述べたAmazon EMSの最小構成で、35分弱の時間で計算できたのは、驚異的に感じる。

これには、ParallelALSFactorization.jobによる次元縮約の処理の貢献が大きいに違いない。
m1.smallインスタンス(メモリー1.7GB;Hadoopのマスタノードは3GB以上のメモリが推奨されている)で、このサイズのデータを次元縮約をせずに1台の機械で実行したらハングアップする。

先も述べたが、次元縮約とは、この例の場合、約4000の映画の次元と約6000のユーザーの次元を、フィーチャー(Feature)という概念を挟んで、できるだけ少ない次元で代替しようとすることであり、shellを見ると、最大10回の繰り返し計算をを行い、20次元まで落とすように設定されている。
つまり、この処理により、4000次元(映画の数)、6000次元(ユーザー)を、20次元まで一気に次元を下げてしまう。これにより、計算が高速になるのは言うまでもないことだろう。SVDの並列処理化が難しいということを考えれば、このアルゴリズムがMahout+Hadoop Mapreduceで実装されている意味はとても大きい事だと思う(誉め過ぎかもしれない。「実際にはスケールしない」なんてことがなければよいが。。。)
原論文では、(当然のことであるが)リコメンドの精度も改善されるとうたわれている。


さて、RMSE(Root Mean Square Error;最小二乗誤差の平方根)の0.85を「どう評価」したらいいだろうか?
その定義より、1リコメンドにつき、評価予測値と実際の値の間に、平均0.85の誤差があるということである。
当然、0に越した事はないだが、200分の1に次元を落としているのであるから、それはありえない。
上の論文にあるシミュレーション結果(Fig1,2)が0.9以上にあること、論文のAbstructに

Our ALS-WR applied to the Net-flix dataset with 1000 hidden features obtained a RMSE score of 0.8985,which is one of the best results based on a pure method
….. Large-scale Parallel Collaborative Filtering for the Netflix Prize より引用

とあることから、悪い数字ではないのだと思われる。

RMSEは、大きく外れた値出ない限り、異なった手法やパラメータで算出した結果について、それらの相対的な優劣を決定するために用いられるのが正しい使われ方だとは思う。

Parallel ALSの理論面と実装面については、次回に、もう少し突っ込んで解説する。