Hadoop MapReduceで相関行列を計算する:ステップ7・8:相関行列を計算する (step7&8: calculate correlation coefficient matrix)
前回のログで、共分散行列
まで計算ができた。
今回はいよいよ相関行列を計算する。相関行列の計算式は以下である。
ただし、
である。この対角行列はステップ3・4で計算した。
ステップ7・8にあたる「共分散行列の前後に、対角行列をかける」については、先のログでも示した以下の性質を用いる。
計算方針
転置行列、対角行列の一般的な性質より、以下の式をもとに計算する。
ます、以下の式の括弧内を計算する。
ここで、以下のようにおくと、
最初の式は以下となる。
転置行列、対角行列の性質より、以下の式が導かれるので、上記性質1だけ用いればよい。
そこで、プログラムでは、共分散行列データを読み込んで、各i番目の行、第i番目の標準偏差の逆数をかけることにする。
また、このプログラムも高速化のため、以下の形式から行列形式に変換を行い、行をブロックに分解(ブロックサイズを実行時に引数で渡す)することにする。
行インデックス 列インデックス 観測値
このために、実行前に、以前作成したTransformMatrix.javaを使用する。このプログラムは、行列形式にする際に「転置するかどうか」を指定できるため、上式の最後にあるAの転置行列は、この機能で作成する。
また、算出式から明らかなように、プログラムは入力データを変えて2ステップで実行する。
入力データ | |
---|---|
1回目 | 共分散行列の保管フォルダー |
2回目 | 1回目の出力先フォルダー |
以下にJavaプログラムを示す。GitHubでも公開しています(CorrelationCoefficient.java)。
実行には、入力データと、標準偏差の逆数のベクトルの保存フォルダー、行列サイズ、ブロックサイズ(行数)を渡す。
package com.tetsuyaodaka.hadoop.math.matrix; import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * CorrelationCoefficientクラス * * 相関行列を計算する * */ public class CorrelationCoefficient { /* * 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。 * */ public static class MapAll extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String strArr[] = value.toString().split("\t"); int i= Integer.parseInt(strArr[0]); int m = 0; // retrieve from configuration int IB = Integer.parseInt(context.getConfiguration().get("IB")); if(i%IB == 0){ m = i/IB; }else{ m = i/IB + 1; } context.write(new IntWritable(m), value); } } /* * 対角行列の要素を読み込んで、マークした上でreduceに渡す */ public static class MapDiag extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] strArr = line.split("\t"); int i = Integer.parseInt(strArr[0]); // number of column value = new Text(line + " diag"); int m = 0; // retrieve from configuration int IB = Integer.parseInt(context.getConfiguration().get("IB")); if(i%IB == 0){ m = i/IB; }else{ m = i/IB + 1; } context.write(new IntWritable(m), value); } } public static class Reduce extends Reducer<IntWritable, Text, Text, DoubleWritable>{ @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ Map<Integer,Double> diagMap = new HashMap<Integer,Double>(); double diag = 0; int i = 0; List<String> lines = new ArrayList<String>(); for(Text value: values){ String line = value.toString(); if(line.indexOf("diag")!=-1){ String[] keyArr = line.split("\t"); String[] strArr = keyArr[1].split(" "); diag = Double.parseDouble(strArr[0]); diagMap.put(Integer.parseInt(keyArr[0]), diag); } else { lines.add(line); } } for(i=0;i<lines.size();i++){ String keyArr[] = lines.get(i).split("\t"); int numRow = Integer.parseInt(keyArr[0]); diag = diagMap.get(numRow); String strArr[] = keyArr[1].split(" "); for(int j=0;j<strArr.length;j++){ double var= Double.parseDouble(strArr[j]); // number of column var *= diag; BigDecimal bd = new BigDecimal(var); BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); context.write(new Text(numRow + " " + (j+1)), new DoubleWritable(r.doubleValue())); } } } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("I", args[3]); // Num of Row (=Columns) conf.set("IB", args[4]); // RowBlock Size of Matrix Job job = new Job(conf, "CalculateCC"); job.setJarByClass(CorrelationCoefficient.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // Mapperごとに読み込むファイルを変える。 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapDiag.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); boolean success = job.waitForCompletion(true); System.out.println(success); } }
ステップ4、7のを入力として、以下の結果を得る。対角要素を見ると、有効桁数が少ないため丸め誤差が発生しているのが分かるが、本質的なものではない。
1 1 1 1 2 0.21 1 3 -0.07 1 4 -0.18 1 5 0.39 1 6 0.54 1 7 -0.17 1 8 -0.6 1 9 0.32 1 10 0.31 2 1 0.21 2 2 1.01 2 3 -0.2 2 4 0.07 2 5 0.27 2 6 0.63 2 7 -0.77 2 8 -0.06 2 9 -0.38 2 10 0.19 3 1 -0.07 3 2 -0.2 3 3 1.01 3 4 -0.47 3 5 0.06 3 6 -0.16 3 7 0.21 3 8 0.07 3 9 0.34 3 10 -0.58 4 1 -0.18 4 2 0.07 4 3 -0.47 4 4 1 4 5 -0.58 4 6 -0.14 4 7 -0.21 4 8 0.48 4 9 -0.47 4 10 0.78 5 1 0.39 5 2 0.27 5 3 0.06 5 4 -0.58 5 5 0.98 5 6 0.34 5 7 -0.16 5 8 -0.3 5 9 0.14 5 10 -0.41 6 1 0.54 6 2 0.63 6 3 -0.16 6 4 -0.14 6 5 0.34 6 6 0.99 6 7 -0.21 6 8 -0.72 6 9 0.08 6 10 0.13 7 1 -0.17 7 2 -0.77 7 3 0.21 7 4 -0.21 7 5 -0.16 7 6 -0.2 7 7 1 7 8 -0.33 7 9 0.73 7 10 -0.4 8 1 -0.6 8 2 -0.06 8 3 0.07 8 4 0.48 8 5 -0.3 8 6 -0.72 8 7 -0.33 8 8 1.03 8 9 -0.55 8 10 0.1 9 1 0.32 9 2 -0.38 9 3 0.34 9 4 -0.47 9 5 0.14 9 6 0.09 9 7 0.73 9 8 -0.55 9 9 0.98 9 10 -0.37 10 1 0.31 10 2 0.19 10 3 -0.58 10 4 0.78 10 5 -0.41 10 6 0.13 10 7 -0.4 10 8 0.1 10 9 -0.37 10 10 1.02
これを、行列の形にするには、ステップ6で用いたTransformMatrix.javaを使えばよい。
以下が行列のフォーマットの相関行列である。
1 1.0 0.21 -0.07 -0.18 0.39 0.54 -0.17 -0.6 0.32 0.31 2 0.21 1.01 -0.2 0.07 0.27 0.63 -0.77 -0.06 -0.38 0.19 3 -0.07 -0.2 1.01 -0.47 0.06 -0.16 0.21 0.07 0.34 -0.58 4 -0.18 0.07 -0.47 1.0 -0.58 -0.14 -0.21 0.48 -0.47 0.78 5 0.39 0.27 0.06 -0.58 0.98 0.34 -0.16 -0.3 0.14 -0.41 6 0.54 0.63 -0.16 -0.14 0.34 0.99 -0.2 -0.72 0.09 0.13 7 -0.17 -0.77 0.21 -0.21 -0.16 -0.21 1.0 -0.33 0.73 -0.4 8 -0.6 -0.06 0.07 0.48 -0.3 -0.72 -0.33 1.03 -0.55 0.1 9 0.32 -0.38 0.34 -0.47 0.14 0.08 0.73 -0.55 0.98 -0.37 10 0.31 0.19 -0.58 0.78 -0.41 0.13 -0.4 0.1 -0.37 1.02