Hadoop MapReduceで分散と標準偏差をスケーラブルに計算する

先のログで、標本平均を求めたので、今回は(多変量の場合について)分散と標準偏差を計算するプログラムを作成し、Amazon Elastic MapReduce(EMR)で実行してみる。

開発環境は、Mac OSX Mountain Lion。Hadoopのバージョンは1.1.2である。(Amazon EMRのバージョンは1.03)

分散・標準偏差を計算するには、観測値と標本平均を用いなくてはならない。
標本平均は、先のログで得た結果を使うこととする。
プログラムでは、Mapperを2つ用意して、平均値のファイルと観測値データをそれぞれに読み込んで加工し、reduceで分散と標準偏差を算出する。
Mapperを2つもつというのは特殊な感じがするが、MultipleInputsを使えば、実行するMapperごとに読み込むファイルを指定できる。
この計算を行うに際して、「Yahoo!でペロッパーネットワーク:Hadoopを使い倒す」、「パターンでわかるMapReduce(三木大知著;2012/8)」にある「GROUP BYパターン」を参考にさせていただいた。

サンプルとしては先のログで作成した

X1からX10 [0,10]の一様乱数より生成
Y X1 から X10までを加算して、N(0,1)の正規乱数を加算して生成

を用いた。サンプルサイズ=1000。


観測値のデータは、先のログと同じ、

5.519 4.710 8.263 3.375 5.236 6.545 0.254 5.115 3.434 4.190 55.750
8.431 6.157 7.668 8.167 4.806 6.630 6.729 6.864 4.305 6.616 76.665
…..

という1000×11の行列とし、算術平均は先のログの結果を用いる。これは、

1	5.021
2	4.907
3	5.271
4	5.056
5	4.918
6	4.975
7	4.962
8	5.007
9	5.046
10	5.068
11	60.265

となっている。ここで、各行は変量を意味するインデックス(Yの場合は11)と、平均値である。
これらをそれぞれ、sample.txt、means.txtと呼ぶ。

以下の処理の流れを示す。

sample.txtとmeans.txtは異なったフォルダーに格納し、出力先フォルダーとともに、プログラムの実行時に引数として指定する。
変量Xjの分散は、後者の式で平方和を計算し、これをサンプルサイズ(=n)で除算した(よって、不偏分散ではない)。また、その平方根標準偏差とした。

以下が、MapReduceで分散と標準偏差を算出するプログラムとなる。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *  Squareクラス
 *
 * (すでに計算された)平均と観測値をつかって、分散と標準偏差を算出する。
 * 
 */
public class Square {

	/*
	 * 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。
	 *
	 */
    public static class MapAll extends Mapper<LongWritable, Text, LongWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tk = new StringTokenizer(line);
            int i = 1;
            while(tk.hasMoreTokens()){
                context.write(new LongWritable(i), new Text(tk.nextToken()));
                i++;
            }
        }
    }

    /*
	 * 変量ごとの算術平均の計算結果を読んで、変量のインデックスをキーとして、Textで書き出す。
	 * この際、平均値の後ろにmeanとつけて、reduceで読んだときのマークとする。
	 *
	 */
    public static class MapMean extends Mapper<LongWritable, Text, LongWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            String[] strArr = line.split("\t");
            key = new LongWritable(Integer.parseInt(strArr[0]));
            value = new Text(strArr[1]+" mean");
    		context.write(key, value);
        }
    }

    public static class Reduce extends Reducer<LongWritable, Text, LongWritable, Text>{
    	@Override
    	protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
        	double sum = 0;
        	double num = 0;
        	double mean = 0;
            for(Text value: values){
            	if(this.isMean(value.toString())){
            		mean = this.getMean(value.toString());
            	} else {
            		sum += Double.parseDouble(value.toString()) * Double.parseDouble(value.toString());
            		num++;
            	}
            }

            double ss = (sum - num*mean*mean)/num;
            double s  = Math.sqrt(ss);
            String strSs = new Double(ss).toString();
            String strS  = new Double(s).toString();
            context.write(key, new Text(strSs + " " + strS));
        }
    	
    	private boolean isMean(String line){
    		if(line.indexOf("mean")==-1) return false;
    		return true;
    	}
    	
    	private double getMean(String line){
            String[] strArr = line.split(" ");
            System.out.println(line);
    		return Double.parseDouble(strArr[0]);
    	}
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Job job = new Job(new Configuration(), "CalculateSD");
        job.setJarByClass(Square.class);

        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapMean.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        boolean success = job.waitForCompletion(true);
        System.out.println(success);
    }
}

実行に際しては、sample.txtとmeans.txtは異なったフォルダーと、出力先フォルダーを、実行パラメータとして設定する。

以下はサンプルである。means.txtを入れた入力フォルダーは、算術平均を算出した際の出力フォルダーとなっている。
Program Arguments :

/Users/tetsuya/Documents/MapReduce/input/square/data  /Users/tetsuya/Documents/MapReduce/input/square/means  /Users/tetsuya/Documents/MapReduce/output/square

ヒープサイズなどが気になったり、ログの書き出し先を指定したい場合などは、以下の例ようにVM argumentに設定する。

VM Arguments: -Xmx1000m -Dhadoop.log.dir=/Users/tetsuya/hadoop/logs -Dhadoop.log.file=hadoop.log -DHadoop.home.dir=/Users/tetsuya/hadoop -Dhadoop.id.str=host -Dhadoop.root.logger=INFO,console -Dhadoop.policy.file=hadoop-policy.xml


以下が計算結果となった。(今の場合、プログラマブルであることの検証を行っているため、有効桁数は気にしていない)

1	8.2474 	2.8718
2	8.1280 	2.8509
3	8.6093	 	2.9341
4	8.0777 	2.8421
5	8.3414 	2.8881
6	7.9989	 	2.8282
7	7.8160 	2.7957
8	8.2520 	2.8726
9	8.2059 	2.8646
10	8.7075 	2.9508
11	85.229	 	9.2319

1から10までについては、サンプルが[0,10]の一様分布から発生させた乱数になっている。
理論的な標準偏差は、10/2√3 = 2.8867である。

検算として、Rで計算した結果を載せておく。

> sqrt(t(vx1-mean(vx1))%*%(vx1-mean(vx1))/length(vx1))
         [,1]
[1,] 2.871829
> sqrt(t(vx2-mean(vx2))%*%(vx2-mean(vx2))/length(vx2))
         [,1]
[1,] 2.850966
> sqrt(t(vy-mean(vy))%*%(vy-mean(vy))/length(vy))
         [,1]
[1,] 9.231984

Amazon Elastic MapReduceAmazon EMR)で分散と標準偏差を算出する。

以下のようにして、Amazon EMRで実行することにする。

  • 入出力データはAmazon S3に置く。
  • プログラムはjarファイルにエクスポートし、これもAmazon S3におく。
Amazon EMR コンフィギュレーション

前回のログと同じ、マスタインスタンス1、コアインスタンス2の構成(同一のジョブフロー)を使う。

観測値、平均値はすでにAmazon S3上に存在するので、入力元としてはそのフォルダーを指定し、出力フォルダーは適当に決めて、ステップを追加する。

elastic-mapreduce --jar s3n://[your bucket name]/[your folder name which jar file is placed]/hadoopsample.jar --main-class Square --args s3n://[your bucket name]/[your folder name which observation datas are placed]/,s3n://[your bucket name]/[your folder name which means datas are placed]/,s3n://[your bucket name]/[your folder name for output] --step-name SD --jobflow  j-1XXXXXXXXXXXXXXXXX


JobトラッカーからJob Detailをみて、以下ように表示があれば正常に終了している。

上の図を見ると、以下のように実行時間が表示されているのがわかる。算術平均を求めた際とほぼ同じである(計算規模が小さいので、殆どがオーバーヘッドかもしれない)。

Status: Succeeded
Started at: Mon Jul 08 01:03:26 UTC 2013
Finished at: Mon Jul 08 01:04:22 UTC 2013
Finished in: 58sec


正常終了後、出力フォルダーに指定したS3のフォルダーをみると、以下のようになる。


以下はpart-r-00000である。

3	8.609369646154311 2.9341727362502557
6	7.998965034248384 2.8282441610031452
9	8.205945989440082 2.8646022393065467