Hadoop MapReduceで相関行列を計算する:ステップ2:標準偏差の算出 (step2: calculating standard deviations)

今回は、前回に引き続き、第2ステップとして、変量ごとの標準偏差を計算するプログラムを実装する。以前のログでも、多変量の場合の標準偏差の計算を実装したが、以下のようにデータ形式を変更したので、過去に作成したプログラムを修正することとする。

行インデックス 列インデックス  観測値(行列要素)

開発は(これまで同様)Mac Book Pro(Mountain Lion)で行い、Hadoop1.1.2を用いる。

観測値行列と、先のログで計算した標本平均を入力とし、それぞれに異なったMapperで読み込む。
そして、観測行列の列(変量)ごとに

列インデックス  (不偏)標準偏差

を出力する。



実行時の引数は、観測値データの入っているフォルダー(もしくは、ファイル)、出力フォルダーを指定する。

以下がそのプログラムとなる。GitHubでも公開しています(StandardDeviation.java

package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.IOException;
import java.math.BigDecimal;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *  StandardDeviationsクラス
 *
 * (すでに計算された)平均と観測値をつかって、(不偏)標準偏差を算出する。
 * 
 */
public class StandardDeviations {

	/*
	 * 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。
	 *
	 */
    public static class MapAll extends Mapper<LongWritable, Text, IntWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
           	String strArr[] = value.toString().split("\t");
        	String keyArr[] = strArr[0].split(" ");
        	int var= Integer.parseInt(keyArr[0]);	// number of column

            context.write(new IntWritable(var), value);
        }
    }

    /*
	 * 変量ごとの算術平均の計算結果を読んで、変量のインデックスをキーとして、Textで書き出す。
	 * この際、平均値の後ろにmeanとつけて、reduceで読んだときのマークとする。
	 *
	 */
    public static class MapMean extends Mapper<LongWritable, Text, IntWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            String[] strArr = line.split("\t");
            value = new Text(strArr[1]+" mean");
    		context.write(new IntWritable(Integer.parseInt(strArr[0])), value);
        }
    }

    public static class Reduce extends Reducer<IntWritable, Text, IntWritable, DoubleWritable>{
    	@Override
    	protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
        	double sum = 0;
        	double num = 0;
        	double mean = 0;
            for(Text value: values){
            	if(this.isMean(value.toString())){
            		mean = this.getMean(value.toString());
            	} else {
                   	String strArr[] = value.toString().split("\t");
            		sum += Double.parseDouble(strArr[1]) * Double.parseDouble(strArr[1]);
            		num++;
            	}
            }

            double ss = (sum - num*mean*mean)/(num-1);
            double s  = Math.sqrt(ss);
            BigDecimal bd = new BigDecimal(s);
			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 
            context.write(key, new DoubleWritable(r.doubleValue()));
        }
    	
    	private boolean isMean(String line){
    		if(line.indexOf("mean")==-1) return false;
    		return true;
    	}
    	
    	private double getMean(String line){
            String[] strArr = line.split(" ");
    		return Double.parseDouble(strArr[0]);
    	}
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Job job = new Job(new Configuration(), "StandardDeviations");
        job.setJarByClass(StandardDeviations.class);

        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(DoubleWritable.class);

        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapMean.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        boolean success = job.waitForCompletion(true);
        System.out.println(success);
    }
}
テスト

以下の10行10列のデータを観測値データとする。

1 1	3.6
1 2	2.2
1 3	9.7
1 4	7.5
1 5	9.1
1 6	7.4
1 7	1.5
1 8	3.7
1 9	6.7
1 10	7.9
2 1	5.9
2 2	3.8
2 3	7.1
2 4	7.1
2 5	0.7
2 6	2.6
2 7	1.6
2 8	3.7
2 9	1.4
2 10	5.7
3 1	2.6
3 2	8.5
3 3	4.2
3 4	7.3
3 5	4.3
3 6	7.5
3 7	3.3
3 8	6.9
3 9	8.1
3 10	2.7
4 1	7.1
4 2	4.4
4 3	8.1
4 4	0.8
4 5	2.5
4 6	5.3
4 7	7.2
4 8	2.6
4 9	4.7
4 10	4.2
5 1	3.7
5 2	6.9
5 3	6.6
5 4	9.9
5 5	9.3
5 6	4.9
5 7	4.3
5 8	5.3
5 9	2.1
5 10	5.8
6 1	4.5
6 2	5.2
6 3	9.2
6 4	7.1
6 5	5.5
6 6	2.8
6 7	3.5
6 8	6.4
6 9	5.8
6 10	8.2
7 1	0.9
7 2	2.3
7 3	1.9
7 4	1.2
7 5	6.4
7 6	1.8
7 7	5.2
7 8	6.8
7 9	6.1
7 10	0.5
8 1	7.1
8 2	7.7
8 3	3.0
8 4	4.1
8 5	0.2
8 6	9.3
8 7	9.6
8 8	1.5
8 9	2.3
8 10	1.5
9 1	2.0
9 2	0.8
9 3	5.1
9 4	7.9
9 5	9.6
9 6	4.2
9 7	4.4
9 8	9.0
9 9	9.9
9 10	0.9
10 1	8.2
10 2	3.6
10 3	8.7
10 4	2.9
10 5	4.8
10 6	5.6
10 7	5.7
10 8	1.1
10 9	6.8
10 10	7.0

このデータは以下の行列に対応する。

      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9] [,10]
 [1,]  3.6  5.9  2.6  7.1  3.7  4.5  0.9  7.1  2.0   8.2
 [2,]  2.2  3.8  8.5  4.4  6.9  5.2  2.3  7.7  0.8   3.6
 [3,]  9.7  7.1  4.2  8.1  6.6  9.2  1.9  3.0  5.1   8.7
 [4,]  7.5  7.1  7.3  0.8  9.9  7.1  1.2  4.1  7.9   2.9
 [5,]  9.1  0.7  4.3  2.5  9.3  5.5  6.4  0.2  9.6   4.8
 [6,]  7.4  2.6  7.5  5.3  4.9  2.8  1.8  9.3  4.2   5.6
 [7,]  1.5  1.6  3.3  7.2  4.3  3.5  5.2  9.6  4.4   5.7
 [8,]  3.7  3.7  6.9  2.6  5.3  6.4  6.8  1.5  9.0   1.1
 [9,]  6.7  1.4  8.1  4.7  2.1  5.8  6.1  2.3  9.9   6.8
[10,]  7.9  5.7  2.7  4.2  5.8  8.2  0.5  1.5  0.9   7.0

先のログに示したように、各変量(列)の平均値は以下である。

1	5.93
2	3.96
3	5.54
4	4.69
5	5.88
6	5.82
7	3.31
8	4.63
9	5.38
10	5.44


以下がプログラムから得られる標準偏差である。

1	2.93
2	2.39
3	2.34
4	2.33
5	2.41
6	1.99
7	2.51
8	3.49
9	3.54
10	2.4

Amazon Elastic MapReduce(EMR)で同じ結果が得られることは確認しているが、Amazonでの実行結果は、一連のプログラムが完了した際に紹介したいと思う。