Hadoop MapReduceで行列積を計算する(ケース2)(Dense Matrix Multiplication with Hadoop MapReduce: Case2)

前回のログでは、Case1として行列積の演算プログラムを示した。
しかしながら、5000行5000列の行列同士の演算に6時間以上の時間がかかってしまい、これでは「ビッグデータ」の探索的な分析では使えないだろう。

これまで、再三引用している「エコノミスト誌(6/4号)」の分析では、変量の数は300であり、サンプルサイズは51であった(これについては、以前のログで述べた)から、オーダーとしては、Case1のプログラムでも間に合う可能性がある。

しかしながら、同誌に掲載されている「Yahoo! JAPAN 景気指数」では60万語(60万変量)と、CIとの相関を調べている。

Case2では、Case1のプログラムを改良することにより、実行速度の向上をはかる。

Case1のスケーラビリティー評価のところでも述べたが、Case1の実行時間は「単純な算術演算の回数の増加」では説明できない。
MapReduceにまつわる様々な要因(I/O速度、データ転送速度など)が、これらの「算術演算では説明できない」部分の要因になっている。
とくに、Case1では、Mapで読み込んだデータをそのままReduceに渡しており、シリアライズと圧縮(Amazon EMRでは、デフォルトのSnappy形式に転送データを圧縮する)、転送におけるオーバーヘッドが高いのは明らかである。

したがって、Case2では、2ステップのMapReduceを用いて行列積の計算における転送のオーバーヘッド削減を狙う。

結果は、先のログで示したように、以下のように(Case1の7%にまで)実行時間の削減がされた(単位:秒)。

ステップ1;行列の入力形式の変換

以下の形式を「行列に」変換する。

行インデックス 列インデックス 計算結果

以下の形式の行列があったとする。

1 1	3.6
1 2	2.2
1 3	9.7
1 4	7.5
1 5	9.1
1 6	7.4
1 7	1.5
1 8	3.7
1 9	6.7
1 10	7.9
2 1	5.9
2 2	3.8
2 3	7.1
2 4	7.1
2 5	0.7
2 6	2.6
2 7	1.6
2 8	3.7
2 9	1.4
2 10	5.7
3 1	2.6
3 2	8.5
3 3	4.2
3 4	7.3
3 5	4.3
3 6	7.5
3 7	3.3
3 8	6.9
3 9	8.1
3 10	2.7
4 1	7.1
4 2	4.4
4 3	8.1
4 4	0.8
4 5	2.5
4 6	5.3
4 7	7.2
4 8	2.6
4 9	4.7
4 10	4.2
5 1	3.7
5 2	6.9
5 3	6.6
5 4	9.9
5 5	9.3
5 6	4.9
5 7	4.3
5 8	5.3
5 9	2.1
5 10	5.8
6 1	4.5
6 2	5.2
6 3	9.2
6 4	7.1
6 5	5.5
6 6	2.8
6 7	3.5
6 8	6.4
6 9	5.8
6 10	8.2
7 1	0.9
7 2	2.3
7 3	1.9
7 4	1.2
7 5	6.4
7 6	1.8
7 7	5.2
7 8	6.8
7 9	6.1
7 10	0.5
8 1	7.1
8 2	7.7
8 3	3.0
8 4	4.1
8 5	0.2
8 6	9.3
8 7	9.6
8 8	1.5
8 9	2.3
8 10	1.5
9 1	2.0
9 2	0.8
9 3	5.1
9 4	7.9
9 5	9.6
9 6	4.2
9 7	4.4
9 8	9.0
9 9	9.9
9 10	0.9
10 1	8.2
10 2	3.6
10 3	8.7
10 4	2.9
10 5	4.8
10 6	5.6
10 7	5.7
10 8	1.1
10 9	6.8
10 10	7.0

これを以下のような、行インデックスのついた行列へ変換する。このような「変換」はMapReduceの「得意な処理」であり、また、変換によりデータサイズが約1/3となる。

1  3.6	2.2	9.7	7.5	9.1	7.4	1.5	3.7	6.7	7.9
2  5.9	3.8	7.1	7.1	0.7	2.6	1.6	3.7	1.4	5.7
3  2.6	8.5	4.2	7.3	4.3	7.5	3.3	6.9	8.1	2.7
4  7.1	4.4	8.1	0.8	2.5	5.3	7.2	2.6	4.7	4.2
5  3.7	6.9	6.6	9.9	9.3	4.9	4.3	5.3	2.1	5.8
6  4.5	5.2	9.2	7.1	5.5	2.8	3.5	6.4	5.8	8.2
7  0.9	2.3	1.9	1.2	6.4	1.8	5.2	6.8	6.1	0.5
8  7.1	7.7	3.0	4.1	0.2	9.3	9.6	1.5	2.3	1.5
9  2.0	0.8	5.1	7.9	9.6	4.2	4.4	9.0	9.9	0.9
10 8.2	3.6	8.7	2.9	4.8	5.6	5.7	1.1	6.8	7.0

MapReduceで行列積を計算するプログラム

イデアとしては、先のNorstadのロジックをベースにして、部分行列に分解して行列積を計算する。

具体的には、行列積 C=ABを計算するにあたり、行列Aの行と、行列Bの列をそれぞれM個とN個のブロックに分解し(下図)、部分行列単位で計算を行う。(一番シンプルでストレートフォワードな方法かと思う)

Map処理を2つ用意し、それぞれ分担して「変換した行列A」と「変換した行列B」を読み込む。読み込んだ行列Aについては行インデックス、Bについては列インデックスよりブロックの番号(mとn)を算出する。このブロック番号の組み合わせをキーとする。また、バリューは、
行列の識別フラグ(0/1),行インデックス(もしくは、列インデックス),行(もしくは、列)
とする。
Case1では、行列要素をReduceへ転送していたが、Case2では行(行列Aの場合)、もしくは列(行列Bの場合)をそのままReduceへ渡す。
そして、reduceで部分行列C(m,n)を計算する。

この場合、M*N個のキーが生成される。

【以下は注意事項】

  • Bは転置(B’)として作成しておかなくてはならない。これは、データの読み込みを行うMapperが、「どの行から読むか分からない」ため、転置せずにおくと、列が途中で分断されてしまう。また、Mapperのこの性質より、データの各行の先頭には、インデックス(1から始まる連番)を付与する。
  • カスタマイズしたキーを用意したので、キークラスにhasHash()メソッドを用意しておく。PartitionerはデフォルトのHashPartitionerを利用するが、ソースコードを見ると、キーのhashCode()メソッドを使ってPartitioningを行っているため。これを実装しないと、並列演算した際に正しくキー単位でreduceにデータが渡されない。

以下にjavaプログラムをしめす。
開発機はMac Book Pro(Mountain Lion)。Hadoopのバージョンは1.1.2である。

まず最初は、入力形式を変換するプログラムである。引数にとして、入力データ、出力先フォルダー、転置するかどうか(転置する場合は、小文字でyesと指定する)をとる。
GitHubでも公開しました(TransformMatrix.java)。

package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *  Matrix Multiplication on Hadoop Map Reduce(Step1)
 *
 *   author : tetsuya.odaka@gmail.com
 *   tested on Hadoop1.1.2 
 *   
 *   Split the Large Scale Matrix to SubMatrices.
 *   Split size (Number Of Rows or Columns) can be specified by arguments.
 *   
 *   This should be decided according to your resources.
 *   Partitioner and Conditioner are not implemented here.
 *   Can calculate real numbers (format double) and be expected.
 *   
 *   This program is distributed under ASF2.0 LICENSE.
 * 
 */
public class TransformMatrix {
	
	/*
	 *  Map Class
	 *
	 *  read MatrixA and decompose its elements to blocks
	 *
	 */
    public static class Map extends Mapper<LongWritable, Text, IntWritable, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	String keyArr[] = strArr[0].split(" ");

            // retrieve from configuration
            boolean tr 	= Boolean.parseBoolean(context.getConfiguration().get("transpose"));	// row block size

            if(tr){
            	int n= Integer.parseInt(keyArr[1]);	// number of column
            	context.write(new IntWritable(n), value);
            }else{
            	int n= Integer.parseInt(keyArr[0]);	// number of row
            	context.write(new IntWritable(n), value);
            }
        }
    }
    
	/*
	 * Reduce Class
	 * 
	 */
    public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{

		@Override
        protected void reduce(IntWritable key, Iterable<Text> values, Context context) 
        		throws IOException, InterruptedException{

    		HashMap<Integer, Double> aMap = new HashMap<Integer, Double>();

    		// retrieve from configuration
            boolean tr 	= Boolean.parseBoolean(context.getConfiguration().get("transpose"));	// row block size
            
        	for(Text value: values){
            	String strVal = value.toString();
            	String strArr[] = strVal.split("\t");
            	String keyArr[] = strArr[0].split(" ");
            	
            	Double val = Double.parseDouble(strArr[1]);
                BigDecimal bd = new BigDecimal(val);
    			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 

    			if(tr) {
            		aMap.put(Integer.parseInt(keyArr[0]),r.doubleValue());

            	}else{
            		aMap.put(Integer.parseInt(keyArr[1]),r.doubleValue());
        		}
        	}

        	Set<Integer> setA = aMap.keySet();
        	Set<Integer> sortedSetA = new TreeSet<Integer>(setA);
        	StringBuffer sb = new StringBuffer();
        	for(int indexA : sortedSetA){
        		if(indexA > 1) sb.append(" ");
        		sb.append(aMap.get(indexA));
        	}
            context.write(key, new Text(sb.toString()));
       }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    	Date startProc = new Date(System.currentTimeMillis());
    	System.out.println("process started at " + startProc);
    	
    	Configuration conf = new Configuration();
        if(args[2].equals("yes")){
        	conf.set("transpose", "true"); // transpose
        }else{
        	conf.set("transpose", "false"); // 
        }

        Job job = new Job(conf, "MatrixMultiplication");
        job.setJarByClass(TransformMatrix.class);

        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, Map.class); // matrixA
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // output path

        boolean success = job.waitForCompletion(true);

    	Date endProc = new Date(System.currentTimeMillis());
    	System.out.println("process ended at " + endProc);

    	System.out.println(success);
    }
}

次に行列積そのものを計算するプログラムを示す。引数に、変換後の行列Aのあるフォルダー、変換後の行列B'(転置)のあるフォルダー、出力先フォルダー、行列Aの行数、行列Bの列数、行ブロックの行数、列ブロックの列数を指定する。
GitHubでも公開しました

package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *  Matrix Multiplication on Hadoop Map Reduce
 *
 *   author : tetsuya.odaka@gmail.com
 *   tested on Hadoop1.2 
 *   
 *   Split the Large Scale Matrix to SubMatrices.
 *   Split size (Number Of Rows or Columns) can be specified by arguments.
 *   
 *   This should be decided according to your resources.
 *   Partitioner and Conditioner are not implemented here.
 *   Can calculate real numbers (format double) and be expected.
 *   
 *   This program is distributed under ASF2.0 LICENSE.
 * 
 */
public class MatrixMult {

	/*
	 *  IndexPair Class
	 *
	 * reduce用のキーを、MatrixAの行ブロック番号、MatrixBの列ブロック番号、要素番号にする。
	 * customized key for reduce function consists of row BlockNum of MatrixA, MatrixB, and number of elements.
	 *
	 */
	public static class IndexPair implements WritableComparable<MatrixMult.IndexPair> {
		public int index1;
		public int index2;
		
		public IndexPair() {
		}
		
		public IndexPair(int index1, int index2) {
			this.index1 = index1;
			this.index2 = index2;
		}

		public void write (DataOutput out)
			throws IOException
		{
			out.writeInt(index1);
			out.writeInt(index2);
		}
		
		public void readFields (DataInput in)
			throws IOException
		{
			index1 = in.readInt();
			index2 = in.readInt();
	
		}

		public int compareTo(MatrixMult.IndexPair o) {
			if (this.index1 < o.index1) {
				return -1;
			} else if (this.index1 > o.index1) {
				return +1;
			}
			if (this.index2 < o.index2) {
				return -1;
			} else if (this.index2 > o.index2) {
				return +1;
			}
			return 0;
		}

		/*
		 * hasHash() is used by HashPartitionar.
		 */
		public int hashCode(){
			int ib = this.index1;
			int jb = this.index2;
			int num = ib * Integer.MAX_VALUE + jb;
			int hash = new Integer(num).hashCode();
			return Math.abs(hash);
		}

	}
	
	/*
	 *  MapA Class
	 *
	 * Matrix Aのデータを読み込んで、行をブロックに分解する。
	 *  read MatrixA and decompose it to blocks
	 *
	 */
    public static class MapA extends Mapper<LongWritable, Text, MatrixMult.IndexPair, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	int i= Integer.parseInt(strArr[0]);
        	String v= strArr[1];

            int m = 0;

            // retrieve from configuration
            int IB 	= Integer.parseInt(context.getConfiguration().get("IB"));
            int N 	= Integer.parseInt(context.getConfiguration().get("N"));

            if(i%IB == 0){
            	m = i/IB; 
            }else{
            	m = i/IB + 1;             	
            }
            for(int j=1;j<(N+1);j++){
            	context.write(new MatrixMult.IndexPair(m,j), new Text("0"+","+i+","+v));
            }
        }
    }

	/*
	 * MapB Class
	 * 
	 * Matrix B'のデータを読み込んで、行をブロックに分解する。
	 *  read MatrixB and decompose it to blocks
	 *
	 */
    public static class MapB extends Mapper<LongWritable, Text, MatrixMult.IndexPair, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	int k= Integer.parseInt(strArr[0]);
        	String v= strArr[1];
            int n = 0;

            // retrieve from configuration
            int KB 	= Integer.parseInt(context.getConfiguration().get("KB"));
            int M 	= Integer.parseInt(context.getConfiguration().get("M"));
  
            if(k%KB == 0){
            	n = k/KB; 
            }else{
            	n = k/KB + 1;             	
            }
            for(int j=1;j<(M+1);j++){
            	context.write(new MatrixMult.IndexPair(j,n), new Text("1"+","+k+","+v));
            }
        }
    }

	/*
	 * Reduce Class
	 * 
	 */
    public static class Reduce extends Reducer<MatrixMult.IndexPair, Text, Text, DoubleWritable>{
		@Override
        protected void reduce(MatrixMult.IndexPair key, Iterable<Text> values, Context context) 
        		throws IOException, InterruptedException{

    		List<RowContents> aList = new ArrayList<RowContents>();
        	List<RowContents> bList = new ArrayList<RowContents>();
        	Map<String,List<RowContents>> cMap = new HashMap<String,List<RowContents>>();
        	cMap.put("A",aList);
        	cMap.put("B",bList);

    		for(Text value: values){
            	String strVal = value.toString();

            	String mtx;
            	String sRow;
            	String[] strArray = strVal.split(",");
        		if(Integer.parseInt(strArray[0])==0) {
        			mtx = "A";
        		}else{
        			mtx = "B";
        		}
            	
    			sRow = strArray[1]+","+strArray[2];

            	cMap.get(mtx).add(new RowContents(sRow));
            }
            	
    		for(RowContents ra : cMap.get("A")){
        		for(RowContents rb : cMap.get("B")){
        			int indexA = ra.index;
        			int indexB = rb.index;
        			double sum = 0;
        			for(int i=0;i<ra.lstRow.size();i++){
        				sum += ra.lstRow.get(i)*rb.lstRow.get(i);
        			}
                    BigDecimal bd = new BigDecimal(sum);
        			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 
                    context.write(new Text(indexA + " " + indexB+ " "), new DoubleWritable(r.doubleValue()));
        		}
    		}
    			
        }
    	
    	public class RowContents {
    		public String 	strRow;
    		public int 		index;			// means row index
    		public List<Double> 	lstRow;	// list of elements of row.

    		public RowContents() {
    		}

    		public RowContents(String strRow) {
    			this.strRow = strRow;
    			this.lstRow = new ArrayList<Double>();
    			this.calculate();
    		}
    		
    		public void calculate(){
    			String[] strArr = this.strRow.split(",");
    			this.index = Integer.parseInt(strArr[0]);
    			String[] aArr 	= strArr[1].split(" ");
                for(int i=0; i<aArr.length; i++){
                     	this.lstRow.add(Double.parseDouble(aArr[i]));
                }
                return;
    		}
    		
    	}
    	
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    	Date startProc = new Date(System.currentTimeMillis());
    	System.out.println("process started at " + startProc);
    	
    	Configuration conf = new Configuration();
        int I = Integer.parseInt(args[3]); // Num of Row of MatrixA
		int K = Integer.parseInt(args[4]); // Num of Row of MatrixB'

		int IB = Integer.parseInt(args[5]); // RowBlock Size of MatrixA
		int KB = Integer.parseInt(args[6]); // RowBlock Size of MatrixB'
		
		int M =0;
		if(I%IB == 0){
			M = I/IB;
		}else{
			M = I/IB+1;
		}

		int N =0;
		if(K%KB == 0){
			N = K/KB;
		}else{
			N = K/KB+1;
		}
		
    	conf.set("I", args[3]); // Num of Row of MatrixA
    	conf.set("K", args[4]); // Num of Row of MatrixB'
    	conf.set("IB", args[5]); // RowBlock Size of MatrixA
    	conf.set("KB", args[6]); // RowBlock Size of MatrixB'
    	conf.set("M", new Integer(M).toString());
    	conf.set("N", new Integer(N).toString());
    	
    	Job job = new Job(conf, "MatrixMultiplication");
        job.setJarByClass(MatrixMult.class);

        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(MatrixMult.IndexPair.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapA.class); // matrixA
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapB.class); // matrixB
        FileOutputFormat.setOutputPath(job, new Path(args[2])); // output path
		
		System.out.println("num of MatrixA RowBlock(M) is "+M);
		System.out.println("num of MatrixB RowBlock(N) is "+N);

		boolean success = job.waitForCompletion(true);

    	Date endProc = new Date(System.currentTimeMillis());
    	System.out.println("process ended at " + endProc);

    	System.out.println(success);
    }
}

Amazon Elastic MapReduceを使ったスケーラビリテリーの調査

Amazon ElasticMapReduceの以下のクラスタで実行時間の比較を行った。

リージョン US Standard
インスタンスタイプ m1.small
マスタ・インスタンスグループ 1インスタンス
コア・インスタンスグループ 8インスタンス
タスク・インスタンスグループ 10インスタンス
  • 5000行×5000列同士の積、10000行×10000列同士の積の計算を行う。
  • 10000行×10000列の行列と10000行のベクトルの積、20000行×20000列の行列と20000行のベクトルの積の計算を行う。

m1.smallのメモリは1.7GBとされている。
5000行5000列のデータが340MB。10000行10000列でその4倍である。

1000行×1000列同士の積、5000行×5000列同士の積の計算を行う。
行列Aの変換(sec) 行列Bの変換(sec) 行列の乗算(sec) 実行時間(sec)
5000行×5000列同士 189 189 1174 1552(25min,52sec)
10000行×10000列同士 398 396 6921 7715(128min,35sec)

実行時間は、6.7倍となった。

5000行5000列では、(5000+5000)*5000*5000=2.5*10^10(250億回)。これに対して、10000行10000列では、(10000+10000)*10000*10000=2*10^12(2000億回)となり、8倍となる。(正方行列積の演算では、行数をnとすると、演算回数はO(n^3)となる)

Case1とは、逆に実行時間比率が、算術演算回数の比率を下回る結果となった。

10000行×10000列の行列と10000行のベクトルの積、20000行×20000列の行列と20000行のベクトルの積の計算を行う。

回帰分析における、目的変数と説明変数間の相関を想定したテストである。

行列の変換(sec) ベクトルの変換(sec) 乗算(sec) 実行時間(sec)
10000 398 56 82 536(8min,56sec)
20000 1156 56 164 1376(22min,56sec)