Hadoop MapReduceで行列積を計算する(ケース2)(Dense Matrix Multiplication with Hadoop MapReduce: Case2)
前回のログでは、Case1として行列積の演算プログラムを示した。
しかしながら、5000行5000列の行列同士の演算に6時間以上の時間がかかってしまい、これでは「ビッグデータ」の探索的な分析では使えないだろう。
これまで、再三引用している「エコノミスト誌(6/4号)」の分析では、変量の数は300であり、サンプルサイズは51であった(これについては、以前のログで述べた)から、オーダーとしては、Case1のプログラムでも間に合う可能性がある。
しかしながら、同誌に掲載されている「Yahoo! JAPAN 景気指数」では60万語(60万変量)と、CIとの相関を調べている。
Case2では、Case1のプログラムを改良することにより、実行速度の向上をはかる。
Case1のスケーラビリティー評価のところでも述べたが、Case1の実行時間は「単純な算術演算の回数の増加」では説明できない。
MapReduceにまつわる様々な要因(I/O速度、データ転送速度など)が、これらの「算術演算では説明できない」部分の要因になっている。
とくに、Case1では、Mapで読み込んだデータをそのままReduceに渡しており、シリアライズと圧縮(Amazon EMRでは、デフォルトのSnappy形式に転送データを圧縮する)、転送におけるオーバーヘッドが高いのは明らかである。
したがって、Case2では、2ステップのMapReduceを用いて行列積の計算における転送のオーバーヘッド削減を狙う。
結果は、先のログで示したように、以下のように(Case1の7%にまで)実行時間の削減がされた(単位:秒)。
ステップ1;行列の入力形式の変換
以下の形式を「行列に」変換する。
行インデックス 列インデックス 計算結果
以下の形式の行列があったとする。
1 1 3.6 1 2 2.2 1 3 9.7 1 4 7.5 1 5 9.1 1 6 7.4 1 7 1.5 1 8 3.7 1 9 6.7 1 10 7.9 2 1 5.9 2 2 3.8 2 3 7.1 2 4 7.1 2 5 0.7 2 6 2.6 2 7 1.6 2 8 3.7 2 9 1.4 2 10 5.7 3 1 2.6 3 2 8.5 3 3 4.2 3 4 7.3 3 5 4.3 3 6 7.5 3 7 3.3 3 8 6.9 3 9 8.1 3 10 2.7 4 1 7.1 4 2 4.4 4 3 8.1 4 4 0.8 4 5 2.5 4 6 5.3 4 7 7.2 4 8 2.6 4 9 4.7 4 10 4.2 5 1 3.7 5 2 6.9 5 3 6.6 5 4 9.9 5 5 9.3 5 6 4.9 5 7 4.3 5 8 5.3 5 9 2.1 5 10 5.8 6 1 4.5 6 2 5.2 6 3 9.2 6 4 7.1 6 5 5.5 6 6 2.8 6 7 3.5 6 8 6.4 6 9 5.8 6 10 8.2 7 1 0.9 7 2 2.3 7 3 1.9 7 4 1.2 7 5 6.4 7 6 1.8 7 7 5.2 7 8 6.8 7 9 6.1 7 10 0.5 8 1 7.1 8 2 7.7 8 3 3.0 8 4 4.1 8 5 0.2 8 6 9.3 8 7 9.6 8 8 1.5 8 9 2.3 8 10 1.5 9 1 2.0 9 2 0.8 9 3 5.1 9 4 7.9 9 5 9.6 9 6 4.2 9 7 4.4 9 8 9.0 9 9 9.9 9 10 0.9 10 1 8.2 10 2 3.6 10 3 8.7 10 4 2.9 10 5 4.8 10 6 5.6 10 7 5.7 10 8 1.1 10 9 6.8 10 10 7.0
これを以下のような、行インデックスのついた行列へ変換する。このような「変換」はMapReduceの「得意な処理」であり、また、変換によりデータサイズが約1/3となる。
1 3.6 2.2 9.7 7.5 9.1 7.4 1.5 3.7 6.7 7.9 2 5.9 3.8 7.1 7.1 0.7 2.6 1.6 3.7 1.4 5.7 3 2.6 8.5 4.2 7.3 4.3 7.5 3.3 6.9 8.1 2.7 4 7.1 4.4 8.1 0.8 2.5 5.3 7.2 2.6 4.7 4.2 5 3.7 6.9 6.6 9.9 9.3 4.9 4.3 5.3 2.1 5.8 6 4.5 5.2 9.2 7.1 5.5 2.8 3.5 6.4 5.8 8.2 7 0.9 2.3 1.9 1.2 6.4 1.8 5.2 6.8 6.1 0.5 8 7.1 7.7 3.0 4.1 0.2 9.3 9.6 1.5 2.3 1.5 9 2.0 0.8 5.1 7.9 9.6 4.2 4.4 9.0 9.9 0.9 10 8.2 3.6 8.7 2.9 4.8 5.6 5.7 1.1 6.8 7.0
MapReduceで行列積を計算するプログラム
アイデアとしては、先のNorstadのロジックをベースにして、部分行列に分解して行列積を計算する。
具体的には、行列積 C=ABを計算するにあたり、行列Aの行と、行列Bの列をそれぞれM個とN個のブロックに分解し(下図)、部分行列単位で計算を行う。(一番シンプルでストレートフォワードな方法かと思う)
Map処理を2つ用意し、それぞれ分担して「変換した行列A」と「変換した行列B」を読み込む。読み込んだ行列Aについては行インデックス、Bについては列インデックスよりブロックの番号(mとn)を算出する。このブロック番号の組み合わせをキーとする。また、バリューは、
行列の識別フラグ(0/1),行インデックス(もしくは、列インデックス),行(もしくは、列)
とする。
Case1では、行列要素をReduceへ転送していたが、Case2では行(行列Aの場合)、もしくは列(行列Bの場合)をそのままReduceへ渡す。
そして、reduceで部分行列C(m,n)を計算する。
この場合、M*N個のキーが生成される。
【以下は注意事項】
- Bは転置(B’)として作成しておかなくてはならない。これは、データの読み込みを行うMapperが、「どの行から読むか分からない」ため、転置せずにおくと、列が途中で分断されてしまう。また、Mapperのこの性質より、データの各行の先頭には、インデックス(1から始まる連番)を付与する。
- カスタマイズしたキーを用意したので、キークラスにhasHash()メソッドを用意しておく。PartitionerはデフォルトのHashPartitionerを利用するが、ソースコードを見ると、キーのhashCode()メソッドを使ってPartitioningを行っているため。これを実装しないと、並列演算した際に正しくキー単位でreduceにデータが渡されない。
以下にjavaプログラムをしめす。
開発機はMac Book Pro(Mountain Lion)。Hadoopのバージョンは1.1.2である。
まず最初は、入力形式を変換するプログラムである。引数にとして、入力データ、出力先フォルダー、転置するかどうか(転置する場合は、小文字でyesと指定する)をとる。
GitHubでも公開しました(TransformMatrix.java)。
package com.tetsuyaodaka.hadoop.math.matrix; import java.io.IOException; import java.math.BigDecimal; import java.util.Date; import java.util.HashMap; import java.util.Set; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Matrix Multiplication on Hadoop Map Reduce(Step1) * * author : tetsuya.odaka@gmail.com * tested on Hadoop1.1.2 * * Split the Large Scale Matrix to SubMatrices. * Split size (Number Of Rows or Columns) can be specified by arguments. * * This should be decided according to your resources. * Partitioner and Conditioner are not implemented here. * Can calculate real numbers (format double) and be expected. * * This program is distributed under ASF2.0 LICENSE. * */ public class TransformMatrix { /* * Map Class * * read MatrixA and decompose its elements to blocks * */ public static class Map extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String strArr[] = value.toString().split("\t"); String keyArr[] = strArr[0].split(" "); // retrieve from configuration boolean tr = Boolean.parseBoolean(context.getConfiguration().get("transpose")); // row block size if(tr){ int n= Integer.parseInt(keyArr[1]); // number of column context.write(new IntWritable(n), value); }else{ int n= Integer.parseInt(keyArr[0]); // number of row context.write(new IntWritable(n), value); } } } /* * Reduce Class * */ public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{ @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ HashMap<Integer, Double> aMap = new HashMap<Integer, Double>(); // retrieve from configuration boolean tr = Boolean.parseBoolean(context.getConfiguration().get("transpose")); // row block size for(Text value: values){ String strVal = value.toString(); String strArr[] = strVal.split("\t"); String keyArr[] = strArr[0].split(" "); Double val = Double.parseDouble(strArr[1]); BigDecimal bd = new BigDecimal(val); BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); if(tr) { aMap.put(Integer.parseInt(keyArr[0]),r.doubleValue()); }else{ aMap.put(Integer.parseInt(keyArr[1]),r.doubleValue()); } } Set<Integer> setA = aMap.keySet(); Set<Integer> sortedSetA = new TreeSet<Integer>(setA); StringBuffer sb = new StringBuffer(); for(int indexA : sortedSetA){ if(indexA > 1) sb.append(" "); sb.append(aMap.get(indexA)); } context.write(key, new Text(sb.toString())); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Date startProc = new Date(System.currentTimeMillis()); System.out.println("process started at " + startProc); Configuration conf = new Configuration(); if(args[2].equals("yes")){ conf.set("transpose", "true"); // transpose }else{ conf.set("transpose", "false"); // } Job job = new Job(conf, "MatrixMultiplication"); job.setJarByClass(TransformMatrix.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); // Mapperごとに読み込むファイルを変える。 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, Map.class); // matrixA FileOutputFormat.setOutputPath(job, new Path(args[1])); // output path boolean success = job.waitForCompletion(true); Date endProc = new Date(System.currentTimeMillis()); System.out.println("process ended at " + endProc); System.out.println(success); } }
次に行列積そのものを計算するプログラムを示す。引数に、変換後の行列Aのあるフォルダー、変換後の行列B'(転置)のあるフォルダー、出力先フォルダー、行列Aの行数、行列Bの列数、行ブロックの行数、列ブロックの列数を指定する。
(GitHubでも公開しました)
package com.tetsuyaodaka.hadoop.math.matrix; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Matrix Multiplication on Hadoop Map Reduce * * author : tetsuya.odaka@gmail.com * tested on Hadoop1.2 * * Split the Large Scale Matrix to SubMatrices. * Split size (Number Of Rows or Columns) can be specified by arguments. * * This should be decided according to your resources. * Partitioner and Conditioner are not implemented here. * Can calculate real numbers (format double) and be expected. * * This program is distributed under ASF2.0 LICENSE. * */ public class MatrixMult { /* * IndexPair Class * * reduce用のキーを、MatrixAの行ブロック番号、MatrixBの列ブロック番号、要素番号にする。 * customized key for reduce function consists of row BlockNum of MatrixA, MatrixB, and number of elements. * */ public static class IndexPair implements WritableComparable<MatrixMult.IndexPair> { public int index1; public int index2; public IndexPair() { } public IndexPair(int index1, int index2) { this.index1 = index1; this.index2 = index2; } public void write (DataOutput out) throws IOException { out.writeInt(index1); out.writeInt(index2); } public void readFields (DataInput in) throws IOException { index1 = in.readInt(); index2 = in.readInt(); } public int compareTo(MatrixMult.IndexPair o) { if (this.index1 < o.index1) { return -1; } else if (this.index1 > o.index1) { return +1; } if (this.index2 < o.index2) { return -1; } else if (this.index2 > o.index2) { return +1; } return 0; } /* * hasHash() is used by HashPartitionar. */ public int hashCode(){ int ib = this.index1; int jb = this.index2; int num = ib * Integer.MAX_VALUE + jb; int hash = new Integer(num).hashCode(); return Math.abs(hash); } } /* * MapA Class * * Matrix Aのデータを読み込んで、行をブロックに分解する。 * read MatrixA and decompose it to blocks * */ public static class MapA extends Mapper<LongWritable, Text, MatrixMult.IndexPair, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String strArr[] = value.toString().split("\t"); int i= Integer.parseInt(strArr[0]); String v= strArr[1]; int m = 0; // retrieve from configuration int IB = Integer.parseInt(context.getConfiguration().get("IB")); int N = Integer.parseInt(context.getConfiguration().get("N")); if(i%IB == 0){ m = i/IB; }else{ m = i/IB + 1; } for(int j=1;j<(N+1);j++){ context.write(new MatrixMult.IndexPair(m,j), new Text("0"+","+i+","+v)); } } } /* * MapB Class * * Matrix B'のデータを読み込んで、行をブロックに分解する。 * read MatrixB and decompose it to blocks * */ public static class MapB extends Mapper<LongWritable, Text, MatrixMult.IndexPair, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String strArr[] = value.toString().split("\t"); int k= Integer.parseInt(strArr[0]); String v= strArr[1]; int n = 0; // retrieve from configuration int KB = Integer.parseInt(context.getConfiguration().get("KB")); int M = Integer.parseInt(context.getConfiguration().get("M")); if(k%KB == 0){ n = k/KB; }else{ n = k/KB + 1; } for(int j=1;j<(M+1);j++){ context.write(new MatrixMult.IndexPair(j,n), new Text("1"+","+k+","+v)); } } } /* * Reduce Class * */ public static class Reduce extends Reducer<MatrixMult.IndexPair, Text, Text, DoubleWritable>{ @Override protected void reduce(MatrixMult.IndexPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ List<RowContents> aList = new ArrayList<RowContents>(); List<RowContents> bList = new ArrayList<RowContents>(); Map<String,List<RowContents>> cMap = new HashMap<String,List<RowContents>>(); cMap.put("A",aList); cMap.put("B",bList); for(Text value: values){ String strVal = value.toString(); String mtx; String sRow; String[] strArray = strVal.split(","); if(Integer.parseInt(strArray[0])==0) { mtx = "A"; }else{ mtx = "B"; } sRow = strArray[1]+","+strArray[2]; cMap.get(mtx).add(new RowContents(sRow)); } for(RowContents ra : cMap.get("A")){ for(RowContents rb : cMap.get("B")){ int indexA = ra.index; int indexB = rb.index; double sum = 0; for(int i=0;i<ra.lstRow.size();i++){ sum += ra.lstRow.get(i)*rb.lstRow.get(i); } BigDecimal bd = new BigDecimal(sum); BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); context.write(new Text(indexA + " " + indexB+ " "), new DoubleWritable(r.doubleValue())); } } } public class RowContents { public String strRow; public int index; // means row index public List<Double> lstRow; // list of elements of row. public RowContents() { } public RowContents(String strRow) { this.strRow = strRow; this.lstRow = new ArrayList<Double>(); this.calculate(); } public void calculate(){ String[] strArr = this.strRow.split(","); this.index = Integer.parseInt(strArr[0]); String[] aArr = strArr[1].split(" "); for(int i=0; i<aArr.length; i++){ this.lstRow.add(Double.parseDouble(aArr[i])); } return; } } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Date startProc = new Date(System.currentTimeMillis()); System.out.println("process started at " + startProc); Configuration conf = new Configuration(); int I = Integer.parseInt(args[3]); // Num of Row of MatrixA int K = Integer.parseInt(args[4]); // Num of Row of MatrixB' int IB = Integer.parseInt(args[5]); // RowBlock Size of MatrixA int KB = Integer.parseInt(args[6]); // RowBlock Size of MatrixB' int M =0; if(I%IB == 0){ M = I/IB; }else{ M = I/IB+1; } int N =0; if(K%KB == 0){ N = K/KB; }else{ N = K/KB+1; } conf.set("I", args[3]); // Num of Row of MatrixA conf.set("K", args[4]); // Num of Row of MatrixB' conf.set("IB", args[5]); // RowBlock Size of MatrixA conf.set("KB", args[6]); // RowBlock Size of MatrixB' conf.set("M", new Integer(M).toString()); conf.set("N", new Integer(N).toString()); Job job = new Job(conf, "MatrixMultiplication"); job.setJarByClass(MatrixMult.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(MatrixMult.IndexPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // Mapperごとに読み込むファイルを変える。 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapA.class); // matrixA MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapB.class); // matrixB FileOutputFormat.setOutputPath(job, new Path(args[2])); // output path System.out.println("num of MatrixA RowBlock(M) is "+M); System.out.println("num of MatrixB RowBlock(N) is "+N); boolean success = job.waitForCompletion(true); Date endProc = new Date(System.currentTimeMillis()); System.out.println("process ended at " + endProc); System.out.println(success); } }
Amazon Elastic MapReduceを使ったスケーラビリテリーの調査
Amazon ElasticMapReduceの以下のクラスタで実行時間の比較を行った。
リージョン | US Standard |
---|---|
インスタンスタイプ | m1.small |
マスタ・インスタンスグループ | 1インスタンス |
コア・インスタンスグループ | 8インスタンス |
タスク・インスタンスグループ | 10インスタンス |
- 5000行×5000列同士の積、10000行×10000列同士の積の計算を行う。
- 10000行×10000列の行列と10000行のベクトルの積、20000行×20000列の行列と20000行のベクトルの積の計算を行う。
m1.smallのメモリは1.7GBとされている。
5000行5000列のデータが340MB。10000行10000列でその4倍である。
1000行×1000列同士の積、5000行×5000列同士の積の計算を行う。
行列Aの変換(sec) | 行列Bの変換(sec) | 行列の乗算(sec) | 実行時間(sec) | |
---|---|---|---|---|
5000行×5000列同士 | 189 | 189 | 1174 | 1552(25min,52sec) |
10000行×10000列同士 | 398 | 396 | 6921 | 7715(128min,35sec) |
実行時間は、6.7倍となった。
5000行5000列では、(5000+5000)*5000*5000=2.5*10^10(250億回)。これに対して、10000行10000列では、(10000+10000)*10000*10000=2*10^12(2000億回)となり、8倍となる。(正方行列積の演算では、行数をnとすると、演算回数はO(n^3)となる)
Case1とは、逆に実行時間比率が、算術演算回数の比率を下回る結果となった。
10000行×10000列の行列と10000行のベクトルの積、20000行×20000列の行列と20000行のベクトルの積の計算を行う。
回帰分析における、目的変数と説明変数間の相関を想定したテストである。
行列の変換(sec) | ベクトルの変換(sec) | 乗算(sec) | 実行時間(sec) | |
---|---|---|---|---|
10000 | 398 | 56 | 82 | 536(8min,56sec) |
20000 | 1156 | 56 | 164 | 1376(22min,56sec) |