Hadoop MapReduceで相関行列を計算する:ステップ5:観測値の平均からの偏差を求める (step5: subtract means from samples in matrix form)

MapReduceで大きな(5000変量〜)相関行列を求めるプログラム作りも、ようやく折り返し地点。
今日は、行列


を算出するステップのコードを作成する。。
注記;標準偏差の算出に不偏分散を用いたので、スケールを合わせるため、サンプルサイズ(n)−1の平方根で割ることとする。



以下がプログラムとなる。GitHubでも公開しています(MatrixSubMeans.java)。

package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *  MatrixSubMeansクラス
 *
 * (すでに計算された)観測値の平均からの偏差を求める
 * 
 */
public class MatrixSubMeans {

	/*
	 * 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。
	 *
	 */
    public static class MapAll extends Mapper<LongWritable, Text, IntWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
           	String strArr[] = value.toString().split("\t");
        	String keyArr[] = strArr[0].split(" ");
        	int var= Integer.parseInt(keyArr[0]);	// number of column
            context.write(new IntWritable(var), value);
        }
    }

    /*
	 * 変量ごとの算術平均の計算結果を読んで、変量のインデックスをキーとして、Textで書き出す。
	 * この際、平均値の後ろにmeanとつけて、reduceで読んだときのマークとする。
	 *
	 */
    public static class MapMean extends Mapper<LongWritable, Text, IntWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            String[] strArr = line.split("\t");
            int oKey = Integer.parseInt(strArr[0]); // number of column
            value = new Text(strArr[1]+" mean");
    		context.write(new IntWritable(oKey), value);
        }
    }

    public static class Reduce extends Reducer<IntWritable, Text, Text, DoubleWritable>{
    	@Override
    	protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
        	double mean = 0;
        	List<String> list = new ArrayList<String>();
            for(Text value: values){
            	String line = value.toString();
            	if(line.indexOf("mean")!=-1){
                    String[] strArr = line.split(" ");
                    mean = Double.parseDouble(strArr[0]);
            	} else {
            		list.add(line);
            	}
            }
            
            
            for(int i=0;i<list.size();i++){
                String l=list.get(i);
               	String strArr[] = l.split("\t");
            	double var= Double.parseDouble(strArr[1]);	// number of column
            	var -= mean;
            	var /= Math.sqrt(list.size()-1);
                BigDecimal bd = new BigDecimal(var);
    			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 
                context.write(new Text(strArr[0]), new DoubleWritable(r.doubleValue()));
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Job job = new Job(new Configuration(), "SubtructMeans");
        job.setJarByClass(MatrixSubMeans.class);

        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapMean.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        boolean success = job.waitForCompletion(true);
        System.out.println(success);
    }
}


実行時には、観測値行列の入っているフォルダー、平均値データの入っているフォルダー、出力先フォルダーを引数として指定する。


ステップ1の観測値と、ステップ2で算出した平均値を用いた場合、以下の結果が出力される。

1 1	-0.78
1 2	-1.24
1 3	1.26
1 4	0.52
1 5	1.06
1 6	0.49
1 7	-1.48
1 8	-0.74
1 9	0.26
1 10	0.66
2 1	0.65
2 2	-0.05
2 3	1.05
2 4	1.05
2 5	-1.09
2 6	-0.45
2 7	-0.79
2 8	-0.09
2 9	-0.85
2 10	0.58
3 1	-0.98
3 2	0.99
3 3	-0.45
3 4	0.59
3 5	-0.41
3 6	0.65
3 7	-0.75
3 8	0.45
3 9	0.85
3 10	-0.95
4 1	0.8
4 2	-0.1
4 3	1.14
4 4	-1.3
4 5	-0.73
4 6	0.2
4 7	0.84
4 8	-0.7
4 9	0
4 10	-0.16
5 1	-0.73
5 2	0.34
5 3	0.24
5 4	1.34
5 5	1.14
5 6	-0.33
5 7	-0.53
5 8	-0.19
5 9	-1.26
5 10	-0.03
6 1	-0.44
6 2	-0.21
6 3	1.13
6 4	0.43
6 5	-0.11
6 6	-1.01
6 7	-0.77
6 8	0.19
6 9	-0.01
6 10	0.79
7 1	-0.8
7 2	-0.34
7 3	-0.47
7 4	-0.7
7 5	1.03
7 6	-0.5
7 7	0.63
7 8	1.16
7 9	0.93
7 10	-0.94
8 1	0.82
8 2	1.02
8 3	-0.54
8 4	-0.18
8 5	-1.48
8 6	1.56
8 7	1.66
8 8	-1.04
8 9	-0.78
8 10	-1.04
9 1	-1.13
9 2	-1.53
9 3	-0.09
9 4	0.84
9 5	1.41
9 6	-0.39
9 7	-0.33
9 8	1.21
9 9	1.51
9 10	-1.49
10 1	0.92
10 2	-0.61
10 3	1.09
10 4	-0.85
10 5	-0.21
10 6	0.05
10 7	0.09
10 8	-1.45
10 9	0.45
10 10	0.52