Hadoop MapReduceで分散と標準偏差をスケーラブルに計算する
先のログで、標本平均を求めたので、今回は(多変量の場合について)分散と標準偏差を計算するプログラムを作成し、Amazon Elastic MapReduce(EMR)で実行してみる。
開発環境は、Mac OSX Mountain Lion。Hadoopのバージョンは1.1.2である。(Amazon EMRのバージョンは1.03)
分散・標準偏差を計算するには、観測値と標本平均を用いなくてはならない。
標本平均は、先のログで得た結果を使うこととする。
プログラムでは、Mapperを2つ用意して、平均値のファイルと観測値データをそれぞれに読み込んで加工し、reduceで分散と標準偏差を算出する。
Mapperを2つもつというのは特殊な感じがするが、MultipleInputsを使えば、実行するMapperごとに読み込むファイルを指定できる。
この計算を行うに際して、「Yahoo!でペロッパーネットワーク:Hadoopを使い倒す」、「パターンでわかるMapReduce(三木大知著;2012/8)」にある「GROUP BYパターン」を参考にさせていただいた。
サンプルとしては先のログで作成した
X1からX10 | [0,10]の一様乱数より生成 |
Y | X1 から X10までを加算して、N(0,1)の正規乱数を加算して生成 |
を用いた。サンプルサイズ=1000。
観測値のデータは、先のログと同じ、
5.519 4.710 8.263 3.375 5.236 6.545 0.254 5.115 3.434 4.190 55.750 8.431 6.157 7.668 8.167 4.806 6.630 6.729 6.864 4.305 6.616 76.665 …..
という1000×11の行列とし、算術平均は先のログの結果を用いる。これは、
1 5.021 2 4.907 3 5.271 4 5.056 5 4.918 6 4.975 7 4.962 8 5.007 9 5.046 10 5.068 11 60.265
となっている。ここで、各行は変量を意味するインデックス(Yの場合は11)と、平均値である。
これらをそれぞれ、sample.txt、means.txtと呼ぶ。
以下の処理の流れを示す。
sample.txtとmeans.txtは異なったフォルダーに格納し、出力先フォルダーとともに、プログラムの実行時に引数として指定する。
変量Xjの分散は、後者の式で平方和を計算し、これをサンプルサイズ(=n)で除算した(よって、不偏分散ではない)。また、その平方根を標準偏差とした。
以下が、MapReduceで分散と標準偏差を算出するプログラムとなる。
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Squareクラス * * (すでに計算された)平均と観測値をつかって、分散と標準偏差を算出する。 * */ public class Square { /* * 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。 * */ public static class MapAll extends Mapper<LongWritable, Text, LongWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); StringTokenizer tk = new StringTokenizer(line); int i = 1; while(tk.hasMoreTokens()){ context.write(new LongWritable(i), new Text(tk.nextToken())); i++; } } } /* * 変量ごとの算術平均の計算結果を読んで、変量のインデックスをキーとして、Textで書き出す。 * この際、平均値の後ろにmeanとつけて、reduceで読んだときのマークとする。 * */ public static class MapMean extends Mapper<LongWritable, Text, LongWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] strArr = line.split("\t"); key = new LongWritable(Integer.parseInt(strArr[0])); value = new Text(strArr[1]+" mean"); context.write(key, value); } } public static class Reduce extends Reducer<LongWritable, Text, LongWritable, Text>{ @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ double sum = 0; double num = 0; double mean = 0; for(Text value: values){ if(this.isMean(value.toString())){ mean = this.getMean(value.toString()); } else { sum += Double.parseDouble(value.toString()) * Double.parseDouble(value.toString()); num++; } } double ss = (sum - num*mean*mean)/num; double s = Math.sqrt(ss); String strSs = new Double(ss).toString(); String strS = new Double(s).toString(); context.write(key, new Text(strSs + " " + strS)); } private boolean isMean(String line){ if(line.indexOf("mean")==-1) return false; return true; } private double getMean(String line){ String[] strArr = line.split(" "); System.out.println(line); return Double.parseDouble(strArr[0]); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(new Configuration(), "CalculateSD"); job.setJarByClass(Square.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // Mapperごとに読み込むファイルを変える。 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapMean.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); boolean success = job.waitForCompletion(true); System.out.println(success); } }
実行に際しては、sample.txtとmeans.txtは異なったフォルダーと、出力先フォルダーを、実行パラメータとして設定する。
以下はサンプルである。means.txtを入れた入力フォルダーは、算術平均を算出した際の出力フォルダーとなっている。
Program Arguments :
/Users/tetsuya/Documents/MapReduce/input/square/data /Users/tetsuya/Documents/MapReduce/input/square/means /Users/tetsuya/Documents/MapReduce/output/square
ヒープサイズなどが気になったり、ログの書き出し先を指定したい場合などは、以下の例ようにVM argumentに設定する。
VM Arguments: -Xmx1000m -Dhadoop.log.dir=/Users/tetsuya/hadoop/logs -Dhadoop.log.file=hadoop.log -DHadoop.home.dir=/Users/tetsuya/hadoop -Dhadoop.id.str=host -Dhadoop.root.logger=INFO,console -Dhadoop.policy.file=hadoop-policy.xml
以下が計算結果となった。(今の場合、プログラマブルであることの検証を行っているため、有効桁数は気にしていない)
1 8.2474 2.8718 2 8.1280 2.8509 3 8.6093 2.9341 4 8.0777 2.8421 5 8.3414 2.8881 6 7.9989 2.8282 7 7.8160 2.7957 8 8.2520 2.8726 9 8.2059 2.8646 10 8.7075 2.9508 11 85.229 9.2319
1から10までについては、サンプルが[0,10]の一様分布から発生させた乱数になっている。
理論的な標準偏差は、10/2√3 = 2.8867である。
検算として、Rで計算した結果を載せておく。
> sqrt(t(vx1-mean(vx1))%*%(vx1-mean(vx1))/length(vx1)) [,1] [1,] 2.871829 > sqrt(t(vx2-mean(vx2))%*%(vx2-mean(vx2))/length(vx2)) [,1] [1,] 2.850966 > sqrt(t(vy-mean(vy))%*%(vy-mean(vy))/length(vy)) [,1] [1,] 9.231984
Amazon Elastic MapReduce(Amazon EMR)で分散と標準偏差を算出する。
以下のようにして、Amazon EMRで実行することにする。
Amazon EMR コンフィギュレーション
前回のログと同じ、マスタインスタンス1、コアインスタンス2の構成(同一のジョブフロー)を使う。
観測値、平均値はすでにAmazon S3上に存在するので、入力元としてはそのフォルダーを指定し、出力フォルダーは適当に決めて、ステップを追加する。
elastic-mapreduce --jar s3n://[your bucket name]/[your folder name which jar file is placed]/hadoopsample.jar --main-class Square --args s3n://[your bucket name]/[your folder name which observation datas are placed]/,s3n://[your bucket name]/[your folder name which means datas are placed]/,s3n://[your bucket name]/[your folder name for output] --step-name SD --jobflow j-1XXXXXXXXXXXXXXXXX
JobトラッカーからJob Detailをみて、以下ように表示があれば正常に終了している。
上の図を見ると、以下のように実行時間が表示されているのがわかる。算術平均を求めた際とほぼ同じである(計算規模が小さいので、殆どがオーバーヘッドかもしれない)。
Status: Succeeded Started at: Mon Jul 08 01:03:26 UTC 2013 Finished at: Mon Jul 08 01:04:22 UTC 2013 Finished in: 58sec
正常終了後、出力フォルダーに指定したS3のフォルダーをみると、以下のようになる。
以下はpart-r-00000である。
3 8.609369646154311 2.9341727362502557 6 7.998965034248384 2.8282441610031452 9 8.205945989440082 2.8646022393065467