Amazon EMRでMahoutのレコメンデーションのワクワク感を体験する:30分でできる分散レコメンデーション

追記:2013/9/17
このログの続編の投稿が完了しましたので、記事の末にリンクを追加しました。これで、このログの方法を応用した分散レコメンデーションエンジンの構築もばっちり(のはず)です。


先のログでは、Parallel ALS(Alternating Least Squares)を用いたレコメンデーションの理論面のフォローアップと、Apache Mahoutでの実装を少し詳しくみた。

このアルゴリズムは極めてシンプルで、かつ、Apache MahoutではHadoop上でスケーラブルに実装されている。

Apache Mahoutは発展途上のプロジェクトなので、スケーラブルに実装されているアルゴリズムとそうでないアルゴリズムがあって、レコメンデーションについていえば、0.8では、アイテムベースのレコメンデーション、Slope Oneレコメンデーション、Parallel ALSを使ったレコメンデーションがスケーラブルに実装されている。

なにはともあれ、ユーザーとアイテム、そのプリファレンス値(レーティング:rating)から、レコメンデーションができるのは「楽しい」につきる。
「自分にも有名サイトのようなリコメンドができる」と考えるだけで、ワクワクする。


今回のログでは、Amazon EMRを使ったレコメンデーションを行う環境構築と、1700万件のプリファレンス値を持つデータでのレコメンデーション作成までを一気に行う。
環境構築やデータのセットアップまでは、30分もあれば十分できてしまう(すでに、Hadoopの実行環境を持っていれば、10分もあればできてしまうだろう)。1700万件のデータから13万ユーザーのレコメンデーションを作成するには、1時間半位かかるが、サンプルをみていただければ、手持ちのデータにも簡単に適用できることが分かると思う。協調フィルタリング(Collaborative Filtering)の最大の長所である。

以下に流れを示す。

   

サンプルデータについて

まず始めに、サンプルデータについて簡単に紹介しておく。
チェコスロバキアに「Libimseti.czという出会い系サイト(http://www.libimseti.cz/)」がある。このサイトでは、ユーザーが他者のプロファイルを評価できるようになっている。この評価データを、研究者が匿名化とクレンジングした後に公開してくれている(http://www.occamslab.com/petricek/data/)。評価は10段階で値が大きいほど「好意的」であることを意味する。
このデータは、「Mahout イン アクション」でも利用されており、同書では、ユーザーベース・レコメンド、アイテムベース・レコメンド、Slope Oneレコメンドが、(MapReduceを使わずに)実行されている。データについては、以下に簡単にまとめておく。

  • ユーザーID:135,359人
  • プロファイルID(=アイテムID);220,970件(全てがレイティングされているわけではない)
  • ユーザー×プロファイル = 29,910,278,230 (290億)
  • 評価の数;17,359,346件(rating.datの行数)。組み合わせの0.00058が評価されている。
  • 1−10までのレーティングで、10がベスト。
  • データクレンジング
    • レーティングが20未満のユーザーはオミット
    • 同じレートをつけているユーザーはオミット

Mahoutイン・アクション

Mahoutイン・アクション

準備:Amazon Elastic Mapreduceを使う

Amazon Elastic MapReduceは、Amazon Web Servicesの1つで、従量課金性で手軽にHadoop環境を利用することができる。

注記:詳しい使い方については、以前のログ(「Amazon Elastic MapReduceを使う(その1)」、「Amazon Elastic MapReduceを使う(その4:マスタノードにSSHで接続して、普通にhadoopをつかってみる)」)を参照ください。特に、Security Group、アクセスキーの設定に注意してください。

今回のサンプルデータの場合、m1.smallではメモリーサイズが足りないため、m1.mediumをオンデマンドで6台使う(マスタ・インスタンスグループが1台、コア・インスタンスグループが5台)。

注記:実行には料金がかかります。課金は、実行時点で行われ、1時間以内に終了しても1時間分として課金されます。
EC2のインスタンス料金;http://aws.amazon.com/jp/ec2/pricing/#on-demand
これに加えて、EMRの料金が別途発生します。
EMRの料金;http://aws.amazon.com/jp/elasticmapreduce/pricing/

さぁ、それでは、elastic mapreduce rubyコマンドラインツールでジョブフローを起動しよう。

以下のコマンドで、上記6台で構成されたHadoopの実行環境を立ち上げる。

elastic-mapreduce --create --alive --name Mahout-LibimSeTi --instance-group master --instance-type m1.medium --instance-count 1 --instance-group core --instance-type m1.medium --instance-count 5

注記:コマンドラインツールについては、以前のログ(「Amazon Elastic MapReduceを使う(その2:コマンドラインツール)」)を参照してください。

このとき、EMRの状態をAmazon Management Consoleでみたものが、以下の図。


この裏側で動いている、EC2インスタンス6台の状態を見るには、EC2のManagement Consoleから見る。


このとき、ついでに、これらの中からマスタノードを見つけ、ホスト名を覚えておく。1つ1つをクリックして、Security GroupにElasticmapreduce-masterとされるものがマスタノードである(コアは、Elasticmapreduce-slaveと表示される)。


マスタノードの環境変数の設定

マスタノードにSSHでログインし、Mahoutを実行するための環境を整える。

MacBook-Pro:~ tetsuya$ ssh -i ~/.ssh/[your_key_name].pem hadoop@[hostname of master node]
Linux (none) 3.2.30-49.59.amzn1.x86_64 #1 SMP Wed Oct 3 19:54:33 UTC 2012 x86_64
--------------------------------------------------------------------------------

Welcome to Amazon Elastic MapReduce running Hadoop and Debian/Squeeze.
 
Hadoop is installed in /home/hadoop. Log files are in /mnt/var/log/hadoop. Check
/mnt/var/log/hadoop/steps for diagnosing step failures.

The Hadoop UI can be accessed via the following commands: 

  JobTracker    lynx http://localhost:9100/
  NameNode      lynx http://localhost:9101/
 
--------------------------------------------------------------------------------
hadoop@ip-[private address]:~$

EMRのデフォルトの設定では、環境変数が足りないので、以下のように環境変数を追加する。

export HADOOP_CONF_DIR=/home/hadoop/conf
export MAHOUT_HOME=/home/hadoop/mahout-distribution-0.7
export CLASS_PATH=$HADOOP_CONF_DIR:$MAHOUT_HOME/conf
export PATH=$MAHOUT_HOME/bin:$PATH

必要なファイルのダウンロードと解凍

分散レコメンデーションに必要なファイルをダウンロードし、解凍する。

Apache Mahoutのダウンロード

ここでは、Mahout0.7を使う。SSHでログインした状態で、/home/hadoopに以下のコマンドでzipアーカイブ取得して、解凍する。解凍すると/home/hadoop/mahout-distribution-0.7というディレクトリができる。

wget http://ftp.kddilabs.jp/infosystems/apache/mahout/0.7/mahout-distribution-0.7.zip
unzip mahout-distribution-0.7.zip
libimsetiサンプルデータのダウンロード

/home/hadoopにて、以下のコマンドでダウンロード&解凍する。解凍するとlibimsetiというディレクトリができ、その中のratings.datがプリファレンスデータとなっており、これを用いる。

wget http://www.occamslab.com/petricek/data/libimseti-complete.zip
unzip libimseti-complete.zip

プリファレンスデータは、以下の形式になっており、そのまま利用することができる。

1,133,8
1,720,6
1,971,10
1,1095,7
1,1616,10
1,1978,7
1,2145,8
…..
Parallel ALSによるレコメンデーションを作成するシェルスクリプトのダウンロード

/home/hadoopにて、以下のコマンドでシェルスクリプトをダウンロード&解凍する。

wget http://www.tetsuyaodaka.com/als/libimseti/factorize-libimseti.sh.zip
unzip factorize-libimseti.sh.zip

シェルスクリプトは先日のログ「Amazon Elastic MapReduceで、Apache Mahoutの分散次元縮約(Parallel ALS)を動かす」をmodifyしたもので、データの90%を使ってモデリングを行い、残りの10%をテストに用いる。テスト結果はRMSEで評価し、それを画面に表示する。
また、各ユーザーについて6つのレコメンデーションを作成し、HDFS上に保存する。この一部(10件)をランダムに画面に表示する。

シェルの中身を以下に示す。

if [ "$1" = "--help" ] || [ "$1" = "--?" ]; then
  echo "This script runs the Alternating Least Squares Recommender on the Groupl
ens data set (size 1M)."
  echo "Syntax: $0 /path/to/ratings.dat\n"
  exit
fi

if [ $# -ne 1 ]
then
  echo -e "\nYou have to download the libimseti dataset from http://www.occamslab.com/petricek/data/  before"
  echo -e "you can run this example. After that extract it and supply the path to the ratings.dat file.\n"
  echo -e "Syntax: $0 /path/to/ratings.dat\n"
  exit -1
fi

MAHOUT="../../bin/mahout"
hadoop fs -mkdir /usr/hadoop/tmp/libimseti
hadoop fs -copyFromLocal $1 /usr/hadoop/tmp/libimseti
WORK_DIR=/usr/hadoop/tmp

# create a 90% percent training set and a 10% probe set
$MAHOUT splitDataset --input ${WORK_DIR}/libimseti/ratings.dat --output ${WORK_DIR}/dataset  --trainingPercentage 0.9 --probePercentage 0.1 --tempDir ${WORK_DIR}/dataset/tmp

# run distributed ALS-WR to factorize the rating matrix defined by the training 
set
$MAHOUT parallelALS --input ${WORK_DIR}/dataset/trainingSet/ --output ${WORK_DIR}/als/out  --tempDir ${WORK_DIR}/als/tmp --numFeatures 20 --numIterations 10 --lambda 0.200

# compute predictions against the probe set, measure the error
$MAHOUT evaluateFactorization --input ${WORK_DIR}/dataset/probeSet/ --output ${WORK_DIR}/als/rmse/  --userFeatures ${WORK_DIR}/als/out/U/ --itemFeatures ${WORK_DIR}/als/out/M/ --tempDir ${WORK_DIR}/als/tmp 

# compute recommendations
$MAHOUT recommendfactorized --input ${WORK_DIR}/als/out/userRatings/ --output ${WORK_DIR}/recommendations/ --userFeatures ${WORK_DIR}/als/out/U/ --itemFeatures ${WORK_DIR}/als/out/M/  --numRecommendations 6 --maxRating 10

# print the error
echo -e "\nRMSE is:\n"
hadoop fs -copyToLocal /usr/hadoop/tmp/als/rmse/rmse.txt .
cat rmse.txt
#cat ${WORK_DIR}/als/rmse/rmse.txt
echo -e "\n"

echo -e "\nSample recommendations:\n"

hadoop fs -copyToLocal /usr/hadoop/tmp/recommendations/part-m-00000 .
shuf part-m-00000 |head
#shuf ${WORK_DIR}/recommendations/part-m-00000 |head
echo -e "\n\n"

echo "removing work directory"
rm -rf ${WORK_DIR}


細かいことは置いておき、ここで以下のコマンドを入れると、レコメンデーション・ジョブが動き出す。

./factorize-libimseti.sh libimseti/ratings.dat

追記(2013/8/28):Parallel ALSの実行パラメータについては、「30分でできる分散レコメンデーション:パラメータを決定する」に記載しました。

動き出したことは、SSHターミナルでも見ることができるが、「HTTP://マスタノードのホスト名:9100」にブラウザーでアクセスすると、ジョブトラッカーで進行状況を見ることができる。


全部で28のジョブが1時間半ほど終了し、SSHのターミナルに以下のような画面がでて終了する。

RMSE is:

1.8382212693021418


Sample recommendations:

128184	[156050:10.0,183803:10.0,131360:10.0,108921:10.0,199165:10.0,82471:10.0]
68544	[74889:10.0,114069:10.0,9402:10.0,128107:10.0,188117:10.0,164617:10.0]
26552	[201721:10.0,209587:9.721462,169507:9.602367,5809:9.541938,138955:9.534509,84192:9.514705]
48912	[37479:10.0,10988:10.0,218717:10.0,13501:10.0,155058:10.0,65735:10.0]
4272	[201721:10.0,176379:9.965527,55388:9.926402,201647:9.889579,115037:9.837585,188072:9.823316]
60576	[201721:9.950418,138856:8.8009,109459:8.618746,200913:8.537033,209587:8.506007,138955:8.479612]
29776	[54103:10.0,37479:10.0,197072:10.0,122365:10.0,53522:10.0,115020:10.0]
736	[201721:10.0,13501:10.0,21350:10.0,160292:10.0,10988:10.0,85224:10.0]
33344	[143524:10.0,175333:10.0,162823:10.0,58915:10.0,180288:10.0,107318:10.0]
33224	[201721:10.0,192409:10.0,39215:10.0,1699:10.0,85224:10.0,168520:10.0]


全てのレコメンデーションは、HDFS

/usr/hadoop/tmp/recommendations/

にできるので、EC2のファイルシステムに落とす場合には、hadoop fsコマンドのcopyToLocalオプションで落としてください。

Amazon Elastic Mapreduceの終了

このまま放っておくと「永久に課金されてしまう」ので、忘れずにEMRをTerminateする。
Terminateは、EMRのAmazon Management Consoleで、ジョブフローを選んでTerminateするか、右クリックして「terminate」で終了させる。

応用と補足 (追記:2013/9/14)

続編を公開したので追記します。これで、応用までバッチリです。

続編その1:「30分でできる分散レコメンデーション:パラメータを決定する
続編その2:「30分でできる分散レコメンデーション:パラメータを変更して、応用できるようにする