Hadoop MapReduceで相関行列を計算する:ステップ2:標準偏差の算出 (step2: calculating standard deviations)
今回は、前回に引き続き、第2ステップとして、変量ごとの標準偏差を計算するプログラムを実装する。以前のログでも、多変量の場合の標準偏差の計算を実装したが、以下のようにデータ形式を変更したので、過去に作成したプログラムを修正することとする。
行インデックス 列インデックス 観測値(行列要素)
開発は(これまで同様)Mac Book Pro(Mountain Lion)で行い、Hadoop1.1.2を用いる。
観測値行列と、先のログで計算した標本平均を入力とし、それぞれに異なったMapperで読み込む。
そして、観測行列の列(変量)ごとに
列インデックス (不偏)標準偏差
を出力する。
実行時の引数は、観測値データの入っているフォルダー(もしくは、ファイル)、出力フォルダーを指定する。
以下がそのプログラムとなる。GitHubでも公開しています(StandardDeviation.java)
package com.tetsuyaodaka.hadoop.math.matrix; import java.io.IOException; import java.math.BigDecimal; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * StandardDeviationsクラス * * (すでに計算された)平均と観測値をつかって、(不偏)標準偏差を算出する。 * */ public class StandardDeviations { /* * 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。 * */ public static class MapAll extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String strArr[] = value.toString().split("\t"); String keyArr[] = strArr[0].split(" "); int var= Integer.parseInt(keyArr[0]); // number of column context.write(new IntWritable(var), value); } } /* * 変量ごとの算術平均の計算結果を読んで、変量のインデックスをキーとして、Textで書き出す。 * この際、平均値の後ろにmeanとつけて、reduceで読んだときのマークとする。 * */ public static class MapMean extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] strArr = line.split("\t"); value = new Text(strArr[1]+" mean"); context.write(new IntWritable(Integer.parseInt(strArr[0])), value); } } public static class Reduce extends Reducer<IntWritable, Text, IntWritable, DoubleWritable>{ @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ double sum = 0; double num = 0; double mean = 0; for(Text value: values){ if(this.isMean(value.toString())){ mean = this.getMean(value.toString()); } else { String strArr[] = value.toString().split("\t"); sum += Double.parseDouble(strArr[1]) * Double.parseDouble(strArr[1]); num++; } } double ss = (sum - num*mean*mean)/(num-1); double s = Math.sqrt(ss); BigDecimal bd = new BigDecimal(s); BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); context.write(key, new DoubleWritable(r.doubleValue())); } private boolean isMean(String line){ if(line.indexOf("mean")==-1) return false; return true; } private double getMean(String line){ String[] strArr = line.split(" "); return Double.parseDouble(strArr[0]); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(new Configuration(), "StandardDeviations"); job.setJarByClass(StandardDeviations.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(DoubleWritable.class); // Mapperごとに読み込むファイルを変える。 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapMean.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); boolean success = job.waitForCompletion(true); System.out.println(success); } }
テスト
以下の10行10列のデータを観測値データとする。
1 1 3.6 1 2 2.2 1 3 9.7 1 4 7.5 1 5 9.1 1 6 7.4 1 7 1.5 1 8 3.7 1 9 6.7 1 10 7.9 2 1 5.9 2 2 3.8 2 3 7.1 2 4 7.1 2 5 0.7 2 6 2.6 2 7 1.6 2 8 3.7 2 9 1.4 2 10 5.7 3 1 2.6 3 2 8.5 3 3 4.2 3 4 7.3 3 5 4.3 3 6 7.5 3 7 3.3 3 8 6.9 3 9 8.1 3 10 2.7 4 1 7.1 4 2 4.4 4 3 8.1 4 4 0.8 4 5 2.5 4 6 5.3 4 7 7.2 4 8 2.6 4 9 4.7 4 10 4.2 5 1 3.7 5 2 6.9 5 3 6.6 5 4 9.9 5 5 9.3 5 6 4.9 5 7 4.3 5 8 5.3 5 9 2.1 5 10 5.8 6 1 4.5 6 2 5.2 6 3 9.2 6 4 7.1 6 5 5.5 6 6 2.8 6 7 3.5 6 8 6.4 6 9 5.8 6 10 8.2 7 1 0.9 7 2 2.3 7 3 1.9 7 4 1.2 7 5 6.4 7 6 1.8 7 7 5.2 7 8 6.8 7 9 6.1 7 10 0.5 8 1 7.1 8 2 7.7 8 3 3.0 8 4 4.1 8 5 0.2 8 6 9.3 8 7 9.6 8 8 1.5 8 9 2.3 8 10 1.5 9 1 2.0 9 2 0.8 9 3 5.1 9 4 7.9 9 5 9.6 9 6 4.2 9 7 4.4 9 8 9.0 9 9 9.9 9 10 0.9 10 1 8.2 10 2 3.6 10 3 8.7 10 4 2.9 10 5 4.8 10 6 5.6 10 7 5.7 10 8 1.1 10 9 6.8 10 10 7.0
このデータは以下の行列に対応する。
[,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9] [,10] [1,] 3.6 5.9 2.6 7.1 3.7 4.5 0.9 7.1 2.0 8.2 [2,] 2.2 3.8 8.5 4.4 6.9 5.2 2.3 7.7 0.8 3.6 [3,] 9.7 7.1 4.2 8.1 6.6 9.2 1.9 3.0 5.1 8.7 [4,] 7.5 7.1 7.3 0.8 9.9 7.1 1.2 4.1 7.9 2.9 [5,] 9.1 0.7 4.3 2.5 9.3 5.5 6.4 0.2 9.6 4.8 [6,] 7.4 2.6 7.5 5.3 4.9 2.8 1.8 9.3 4.2 5.6 [7,] 1.5 1.6 3.3 7.2 4.3 3.5 5.2 9.6 4.4 5.7 [8,] 3.7 3.7 6.9 2.6 5.3 6.4 6.8 1.5 9.0 1.1 [9,] 6.7 1.4 8.1 4.7 2.1 5.8 6.1 2.3 9.9 6.8 [10,] 7.9 5.7 2.7 4.2 5.8 8.2 0.5 1.5 0.9 7.0
先のログに示したように、各変量(列)の平均値は以下である。
1 5.93 2 3.96 3 5.54 4 4.69 5 5.88 6 5.82 7 3.31 8 4.63 9 5.38 10 5.44
以下がプログラムから得られる標準偏差である。
1 2.93 2 2.39 3 2.34 4 2.33 5 2.41 6 1.99 7 2.51 8 3.49 9 3.54 10 2.4
Amazon Elastic MapReduce(EMR)で同じ結果が得られることは確認しているが、Amazonでの実行結果は、一連のプログラムが完了した際に紹介したいと思う。