Hadoop MapReduceで相関行列を計算する:ステップ7・8:相関行列を計算する (step7&8: calculate correlation coefficient matrix)

前回のログで、共分散行列

まで計算ができた。

今回はいよいよ相関行列を計算する。相関行列の計算式は以下である。

ただし、

である。この対角行列はステップ3・4で計算した。

ステップ7・8にあたる「共分散行列の前後に、対角行列をかける」については、先のログでも示した以下の性質を用いる。

(性質2)対角行列を行列の前方から掛けることは、i番目の対角要素を(行列の)第i行に掛けることと同義である。

(性質3)対角行列を行列の後方から掛けることは、i番目の対角要素を(行列の)第i列に掛けることと同義である。

(性質1については、先のログを参照)

計算方針

転置行列、対角行列の一般的な性質より、以下の式をもとに計算する。
ます、以下の式の括弧内を計算する。

ここで、以下のようにおくと、

最初の式は以下となる。

転置行列、対角行列の性質より、以下の式が導かれるので、上記性質1だけ用いればよい。


そこで、プログラムでは、共分散行列データを読み込んで、各i番目の行、第i番目の標準偏差の逆数をかけることにする。

また、このプログラムも高速化のため、以下の形式から行列形式に変換を行い、行をブロックに分解(ブロックサイズを実行時に引数で渡す)することにする。

行インデックス 列インデックス 観測値

このために、実行前に、以前作成したTransformMatrix.javaを使用する。このプログラムは、行列形式にする際に「転置するかどうか」を指定できるため、上式の最後にあるAの転置行列は、この機能で作成する。
また、算出式から明らかなように、プログラムは入力データを変えて2ステップで実行する。

入力データ
1回目 共分散行列の保管フォルダー
2回目 1回目の出力先フォルダー



以下にJavaプログラムを示す。GitHubでも公開しています(CorrelationCoefficient.java)。
実行には、入力データと、標準偏差の逆数のベクトルの保存フォルダー、行列サイズ、ブロックサイズ(行数)を渡す。

package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *  CorrelationCoefficientクラス
 *
 *  相関行列を計算する
 * 
 */
public class CorrelationCoefficient {

	/*
	 * 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。
	 *
	 */
    public static class MapAll extends Mapper<LongWritable, Text, IntWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	int i= Integer.parseInt(strArr[0]);

            int m = 0;
            // retrieve from configuration
            int IB 	= Integer.parseInt(context.getConfiguration().get("IB"));
            if(i%IB == 0){
            	m = i/IB; 
            }else{
            	m = i/IB + 1;             	
            }
            context.write(new IntWritable(m), value);
        }
    }

    /*
	 *  対角行列の要素を読み込んで、マークした上でreduceに渡す
	 */
    public static class MapDiag extends Mapper<LongWritable, Text, IntWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            String[] strArr = line.split("\t");
            int i = Integer.parseInt(strArr[0]); // number of column
            value = new Text(line + " diag");

            int m = 0;
            // retrieve from configuration
            int IB 	= Integer.parseInt(context.getConfiguration().get("IB"));
            if(i%IB == 0){
            	m = i/IB; 
            }else{
            	m = i/IB + 1;             	
            }
            context.write(new IntWritable(m), value);
        }
    }

    public static class Reduce extends Reducer<IntWritable, Text, Text, DoubleWritable>{
		@Override
    	protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
        	Map<Integer,Double> diagMap = new HashMap<Integer,Double>();
        	double diag = 0;
        	int i = 0;
        	List<String> lines = new ArrayList<String>();
        	
        	for(Text value: values){
            	String line = value.toString();
            	if(line.indexOf("diag")!=-1){
                    String[] keyArr = line.split("\t");
                    String[] strArr = keyArr[1].split(" ");
                    diag = Double.parseDouble(strArr[0]);
                    diagMap.put(Integer.parseInt(keyArr[0]), diag);
            	} else {
            		lines.add(line);
            	}
            }
        	
        	for(i=0;i<lines.size();i++){
               	String keyArr[] = lines.get(i).split("\t");
        		int numRow = Integer.parseInt(keyArr[0]);
        		diag = diagMap.get(numRow);
               	String strArr[] = keyArr[1].split(" ");
            	for(int j=0;j<strArr.length;j++){
                	double var= Double.parseDouble(strArr[j]);	// number of column
                	var *= diag;
                    BigDecimal bd = new BigDecimal(var);
        			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 
                    context.write(new Text(numRow + " " + (j+1)), new DoubleWritable(r.doubleValue()));
            	}
        	}
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    	Configuration conf = new Configuration();
    	conf.set("I", args[3]); // Num of Row (=Columns)
    	conf.set("IB", args[4]); // RowBlock Size of Matrix
        
    	Job job = new Job(conf, "CalculateCC");

    	job.setJarByClass(CorrelationCoefficient.class);

        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapDiag.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        boolean success = job.waitForCompletion(true);
        System.out.println(success);
    }
}


ステップ4、7のを入力として、以下の結果を得る。対角要素を見ると、有効桁数が少ないため丸め誤差が発生しているのが分かるが、本質的なものではない。

1 1 	1
1 2 	0.21
1 3 	-0.07
1 4 	-0.18
1 5 	0.39
1 6 	0.54
1 7 	-0.17
1 8 	-0.6
1 9 	0.32
1 10 	0.31
2 1 	0.21
2 2 	1.01
2 3 	-0.2
2 4 	0.07
2 5 	0.27
2 6 	0.63
2 7 	-0.77
2 8 	-0.06
2 9 	-0.38
2 10 	0.19
3 1 	-0.07
3 2 	-0.2
3 3 	1.01
3 4 	-0.47
3 5 	0.06
3 6 	-0.16
3 7 	0.21
3 8 	0.07
3 9 	0.34
3 10 	-0.58
4 1 	-0.18
4 2 	0.07
4 3 	-0.47
4 4 	1
4 5 	-0.58
4 6 	-0.14
4 7 	-0.21
4 8 	0.48
4 9 	-0.47
4 10 	0.78
5 1 	0.39
5 2 	0.27
5 3 	0.06
5 4 	-0.58
5 5 	0.98
5 6 	0.34
5 7 	-0.16
5 8 	-0.3
5 9 	0.14
5 10 	-0.41
6 1 	0.54
6 2 	0.63
6 3 	-0.16
6 4 	-0.14
6 5 	0.34
6 6 	0.99
6 7 	-0.21
6 8 	-0.72
6 9 	0.08
6 10 	0.13
7 1 	-0.17
7 2 	-0.77
7 3 	0.21
7 4 	-0.21
7 5 	-0.16
7 6 	-0.2
7 7 	1
7 8 	-0.33
7 9 	0.73
7 10 	-0.4
8 1 	-0.6
8 2 	-0.06
8 3 	0.07
8 4 	0.48
8 5 	-0.3
8 6 	-0.72
8 7 	-0.33
8 8 	1.03
8 9 	-0.55
8 10 	0.1
9 1 	0.32
9 2 	-0.38
9 3 	0.34
9 4 	-0.47
9 5 	0.14
9 6 	0.09
9 7 	0.73
9 8 	-0.55
9 9 	0.98
9 10 	-0.37
10 1 	0.31
10 2 	0.19
10 3 	-0.58
10 4 	0.78
10 5 	-0.41
10 6 	0.13
10 7 	-0.4
10 8 	0.1
10 9 	-0.37
10 10 	1.02

これを、行列の形にするには、ステップ6で用いたTransformMatrix.javaを使えばよい。
以下が行列のフォーマットの相関行列である。

1	1.0 0.21 -0.07 -0.18 0.39 0.54 -0.17 -0.6 0.32 0.31
2	0.21 1.01 -0.2 0.07 0.27 0.63 -0.77 -0.06 -0.38 0.19
3	-0.07 -0.2 1.01 -0.47 0.06 -0.16 0.21 0.07 0.34 -0.58
4	-0.18 0.07 -0.47 1.0 -0.58 -0.14 -0.21 0.48 -0.47 0.78
5	0.39 0.27 0.06 -0.58 0.98 0.34 -0.16 -0.3 0.14 -0.41
6	0.54 0.63 -0.16 -0.14 0.34 0.99 -0.2 -0.72 0.09 0.13
7	-0.17 -0.77 0.21 -0.21 -0.16 -0.21 1.0 -0.33 0.73 -0.4
8	-0.6 -0.06 0.07 0.48 -0.3 -0.72 -0.33 1.03 -0.55 0.1
9	0.32 -0.38 0.34 -0.47 0.14 0.08 0.73 -0.55 0.98 -0.37
10	0.31 0.19 -0.58 0.78 -0.41 0.13 -0.4 0.1 -0.37 1.02