Hadoop MapReduceで行列積を計算する(ケース1)(Dense Matrix Multiplication with Hadoop MapReduce: Case1)

前回のログで、MapReduceパラダイムにおいて、行列積の演算がかならずしもMapReduceパラダイムに適合したものでないことと、「行列積を求めるという行為」についてのインターネット上での評価を紹介した。

今回から2回のログに分けて、2パターンの行列積計算プログラム(ケース1、ケース2)を紹介する。

まず、結果から述べてしまう。以下が5000行5000列の行列同士をHadoop MapReduceで計算した際の実行時間の比較である。ケース1と2で「やりたいことは同じ」でも、結果にこれだけの差が出てしまう(単位:秒)。

実行は、Amazon ElasticMapReduceの以下のクラスタで実行した。

リージョン US Standard
インスタンスタイプ m1.small
マスタ・インスタンスグループ 1インスタンス
コア・インスタンスグループ 8インスタンス
タスク・インスタンスグループ 10インスタンス

入力形式

行列積は、繰り返して演算されることが多いため、入力と出力のフォーマットを合わせておくと便利である。そこで、以下の形式を入力形式として採用することとした。

行インデックス 列インデックス 計算結果

とすることにした。

ただし、行インデックスと列インデックスの間は半角スペース、列インデックスと計算結果の間はtabで区切る。(この行為は、所謂、標準化に他ならないが、MapReduceパラダイムでは「標準化という行為が誤りの原因」になりうる。これは、のちのち、ケース1と2を比較することで明らかになる。)

以下のような行列があった場合、

3.6	2.2	9.7	7.5	9.1	7.4	1.5	3.7	6.7	7.9
5.9	3.8	7.1	7.1	0.7	2.6	1.6	3.7	1.4	5.7
2.6	8.5	4.2	7.3	4.3	7.5	3.3	6.9	8.1	2.7
7.1	4.4	8.1	0.8	2.5	5.3	7.2	2.6	4.7	4.2
3.7	6.9	6.6	9.9	9.3	4.9	4.3	5.3	2.1	5.8
4.5	5.2	9.2	7.1	5.5	2.8	3.5	6.4	5.8	8.2
0.9	2.3	1.9	1.2	6.4	1.8	5.2	6.8	6.1	0.5
7.1	7.7	3.0	4.1	0.2	9.3	9.6	1.5	2.3	1.5
2.0	0.8	5.1	7.9	9.6	4.2	4.4	9.0	9.9	0.9
8.2	3.6	8.7	2.9	4.8	5.6	5.7	1.1	6.8	7.0

この行列の(プログラムへの)入力形式は以下のようになる。

1 1	3.6
1 2	2.2
1 3	9.7
1 4	7.5
1 5	9.1
1 6	7.4
1 7	1.5
1 8	3.7
1 9	6.7
1 10	7.9
2 1	5.9
2 2	3.8
2 3	7.1
2 4	7.1
2 5	0.7
2 6	2.6
2 7	1.6
2 8	3.7
2 9	1.4
2 10	5.7
3 1	2.6
3 2	8.5
3 3	4.2
3 4	7.3
3 5	4.3
3 6	7.5
3 7	3.3
3 8	6.9
3 9	8.1
3 10	2.7
4 1	7.1
4 2	4.4
4 3	8.1
4 4	0.8
4 5	2.5
4 6	5.3
4 7	7.2
4 8	2.6
4 9	4.7
4 10	4.2
5 1	3.7
5 2	6.9
5 3	6.6
5 4	9.9
5 5	9.3
5 6	4.9
5 7	4.3
5 8	5.3
5 9	2.1
5 10	5.8
6 1	4.5
6 2	5.2
6 3	9.2
6 4	7.1
6 5	5.5
6 6	2.8
6 7	3.5
6 8	6.4
6 9	5.8
6 10	8.2
7 1	0.9
7 2	2.3
7 3	1.9
7 4	1.2
7 5	6.4
7 6	1.8
7 7	5.2
7 8	6.8
7 9	6.1
7 10	0.5
8 1	7.1
8 2	7.7
8 3	3.0
8 4	4.1
8 5	0.2
8 6	9.3
8 7	9.6
8 8	1.5
8 9	2.3
8 10	1.5
9 1	2.0
9 2	0.8
9 3	5.1
9 4	7.9
9 5	9.6
9 6	4.2
9 7	4.4
9 8	9.0
9 9	9.9
9 10	0.9
10 1	8.2
10 2	3.6
10 3	8.7
10 4	2.9
10 5	4.8
10 6	5.6
10 7	5.7
10 8	1.1
10 9	6.8
10 10	7.0

MapReduceで行列積を計算するプログラムを作成する。

イデアとしては、先のNorstadのロジックをベースにして、部分行列に分解して行列積を計算する。

具体的には、行列積 C=ABを計算するにあたり、行列Aの行と、行列Bの列をそれぞれM個とN個のブロックに分解し(下図)、部分行列単位で計算を行う。(一番シンプルでストレートフォワードな方法かと思う)

Map処理を2つ用意し、それぞれ分担して行列AとBを読み込む。この際、読み込み形式は上記のものとする。読み込んだ行列Aについては行インデックス、Bについては列インデックスよりブロックの番号(mとn)を算出する。このブロック番号の組み合わせをキーとする。また、バリューは、
行列の識別フラグ(0/1),行インデックス,列インデックス,行列要素
とする。
そし、reduceで部分行列C(m,n)を計算する。

この場合、M*N個のキーが生成される。


【以下は注意事項】

  • カスタマイズしたキーを用意したので、キークラスにhasHash()メソッドを用意しておく。PartitionerはデフォルトのHashPartitionerを利用するが、ソースコードを見ると、キーのhashCode()メソッドを使ってPartitioningを行っているため。これを実装しないと、並列演算した際に正しくキー単位でreduceにデータが渡されない。


以下にjavaプログラムをしめす。引数に、変換後の行列Aのあるフォルダー、変換後の行列B'(転置)のあるフォルダー、出力先フォルダー、行列Aの行数、行列Bの列数、行ブロックの行数、列ブロックの列数を指定する。(GitHubでも公開しました

開発機はMac Book Pro(Mountain Lion)。Hadoopのバージョンは1.1.2である。

package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *  Matrix Multiplication on Hadoop Map Reduce (Case1)
 *
 *   author : tetsuya.odaka@gmail.com
 *   tested on Hadoop1.1.2 
 *   
 *   Split the Large Scale Matrix to SubMatrices.
 *   Split size (Number Of Rows or Columns) can be specified by arguments.
 *   
 *   This should be decided according to your resources.
 *   Partitioner and Conditioner are not implemented here.
 *   Can calculate real numbers (format double) and be expected.
 *   
 *   This program is distributed under ASF2.0 LICENSE.
 * 
 */
public class MatrixMultiplication {

	/*
	 *  IndexPair Class
	 *
	 * customized key for reduce function consists of row BlockNum of MatrixA, MatrixB
	 *
	 */
	public static class IndexPair implements WritableComparable<MatrixMultiplication.IndexPair> {
		public int index1;
		public int index2;
		
		public IndexPair() {
		}
		
		public IndexPair(int index1, int index2) {
			this.index1 = index1;
			this.index2 = index2;
		}

		public void write (DataOutput out)
			throws IOException
		{
			out.writeInt(index1);
			out.writeInt(index2);
		}
		
		public void readFields (DataInput in)
			throws IOException
		{
			index1 = in.readInt();
			index2 = in.readInt();
	
		}

		public int compareTo(MatrixMultiplication.IndexPair o) {
			if (this.index1 < o.index1) {
				return -1;
			} else if (this.index1 > o.index1) {
				return +1;
			}
			if (this.index2 < o.index2) {
				return -1;
			} else if (this.index2 > o.index2) {
				return +1;
			}
			return 0;
		}
		
		/*
		 * hasHash() is used by HashPartitioner.
		 */
		public int hashCode(){
			int ib = this.index1;
			int jb = this.index2;
			int num = ib * Integer.MAX_VALUE + jb;
			int hash = new Integer(num).hashCode();
			return Math.abs(hash);
		}
	}
	
	/*
	 *  MapA Class
	 *
	 *  read MatrixA and decompose its elements to blocks
	 *
	 */
    public static class MapA extends Mapper<LongWritable, Text, MatrixMultiplication.IndexPair, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	String keyArr[] = strArr[0].split(" ");
        	int nor= Integer.parseInt(keyArr[0]);	// number of row
        	String noc= keyArr[1];	// number of column
        	String v= strArr[1];	// value

            int m = 0;

            // retrieve from configuration
            int IB 	= Integer.parseInt(context.getConfiguration().get("IB"));	// row block size
            int N 	= Integer.parseInt(context.getConfiguration().get("N"));	// number of block of MatrixB

            if(nor%IB == 0){
            	m = nor/IB; 
            }else{
            	m = nor/IB + 1;             	
            }
            for(int j=1;j<(N+1);j++){
            	context.write(new MatrixMultiplication.IndexPair(m,j), new Text("0"+","+nor+","+noc+","+v));
            }
        }
    }

	/*
	 * MapB Class
	 * 
	 *  read MatrixB and decompose it to blocks
	 *
	 */
    public static class MapB extends Mapper<LongWritable, Text, MatrixMultiplication.IndexPair, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	String keyArr[] = strArr[0].split(" ");
        	int noc= Integer.parseInt(keyArr[1]);	// number of row
        	String nor= keyArr[0];	// number of column
        	String v= strArr[1];	// value

        	int n = 0;

            // retrieve from configuration
            int KB 	= Integer.parseInt(context.getConfiguration().get("KB"));
            int M 	= Integer.parseInt(context.getConfiguration().get("M"));
  
            if(noc%KB == 0){
            	n = noc/KB; 
            }else{
            	n = noc/KB + 1;             	
            }
            for(int j=1;j<(M+1);j++){
            	context.write(new MatrixMultiplication.IndexPair(j,n),new Text("1"+","+nor+","+noc+","+v));
            }
        }
    }
    
	/*
	 * Reduce Class
	 * 
	 */
    public static class Reduce extends Reducer<MatrixMultiplication.IndexPair, Text, Text, DoubleWritable>{

		@Override
        protected void reduce(MatrixMultiplication.IndexPair key, Iterable<Text> values, Context context) 
        		throws IOException, InterruptedException{

    		Map<String,String> aMap = new HashMap<String,String>();
    		Map<String,String> bMap = new HashMap<String,String>();
    		
			int rCount=0;
        	for(Text value: values){
            	String strVal = value.toString();
            	String[] strArray = strVal.split(",");

            	if(Integer.parseInt(strArray[0])==0) {
            		aMap.put(strArray[1]+" "+strArray[2],strArray[3]);
            		rCount++;

            	}else{
            		bMap.put(strArray[2]+" "+strArray[1],strArray[3]);
        		}
        	}
        	
            int is = Integer.parseInt(context.getConfiguration().get("IB"));
            int js = Integer.parseInt(context.getConfiguration().get("KB"));

            int im = Integer.parseInt(context.getConfiguration().get("I"));
            int jm = Integer.parseInt(context.getConfiguration().get("K"));

            
            int startI = (key.index1-1)*is+1;
            int endI = (key.index1)*is;
            if(endI>im) endI=im;
            int startJ = (key.index2-1)*js+1;
            int endJ = (key.index2)*js;
            if(endJ>jm) endJ=jm;

            rCount=rCount/(endI-startI+1);
            
            for(int i=startI;i<endI+1;i++){
                for(int j=startJ;j<endJ+1;j++){
                	double sum=0;
                	for(int k=1;k<rCount+1;k++){
                        sum+=Double.parseDouble(aMap.get(i+" "+k))*Double.parseDouble(bMap.get(j+" "+k));
                	}
                    BigDecimal bd = new BigDecimal(sum);
        			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 
                	context.write(new Text(i + " " + j), new DoubleWritable(r.doubleValue()));
                }
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    	Date startProc = new Date(System.currentTimeMillis());
    	System.out.println("process started at " + startProc);
    	
    	Configuration conf = new Configuration();
        int I = Integer.parseInt(args[3]); // Num of Row of MatrixA
		int K = Integer.parseInt(args[4]); // Num of Row of MatrixB'

		int IB = Integer.parseInt(args[5]); // RowBlock Size of MatrixA
		int KB = Integer.parseInt(args[6]); // RowBlock Size of MatrixB'
		
		int M =0;
		if(I%IB == 0){
			M = I/IB;
		}else{
			M = I/IB+1;
		}

		int N =0;
		if(K%KB == 0){
			N = K/KB;
		}else{
			N = K/KB+1;
		}
		
    	conf.set("I", args[3]); // Num of Row of MatrixA
    	conf.set("K", args[4]); // Num of Row of MatrixB'
    	conf.set("IB", args[5]); // RowBlock Size of MatrixA
    	conf.set("KB", args[6]); // RowBlock Size of MatrixB'
    	conf.set("M", new Integer(M).toString());
    	conf.set("N", new Integer(N).toString());
    	
    	Job job = new Job(conf, "MatrixMultiplication");
        job.setJarByClass(MatrixMultiplication.class);

        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(MatrixMultiplication.IndexPair.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapA.class); // matrixA
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapB.class); // matrixB
        FileOutputFormat.setOutputPath(job, new Path(args[2])); // output path
        
		System.out.println("num of MatrixA RowBlock(M) is "+M);
		System.out.println("num of MatrixB ColBlock(N) is "+N);

		boolean success = job.waitForCompletion(true);

    	Date endProc = new Date(System.currentTimeMillis());
    	System.out.println("process ended at " + endProc);

    	System.out.println(success);
    }
}
Amazon Elastic MapReduceでのテスト

先のデータを行列A、以下のデータの形式変換したデータを行列Bとし、Amazon ElasticMapReduceでテストをおこなった。

3.3	1.6	8.5	3.0	9.6	3.1	0.6	5.8	2.1	5.4
5.6	8.9	9.5	10.0	7.4	7.9	3.4	2.0	9.3	4.4
4.4	2.6	2.9	2.0	4.5	3.4	1.8	8.8	6.5	0.7
5.1	0.5	4.7	4.8	4.1	1.6	0.7	1.2	2.8	9.2
5.4	1.5	9.8	9.3	1.9	3.6	2.1	6.5	4.7	5.2
1.9	4.0	4.0	4.0	0.5	0.4	2.1	8.3	0.3	0.3
3.0	1.0	1.2	8.3	7.3	8.3	6.7	0.8	4.1	1.3
3.2	3.2	2.1	4.8	1.2	1.0	8.6	4.6	4.1	5.8
6.3	5.8	1.9	7.9	7.3	1.8	6.3	2.1	0.8	2.7
6.2	0.1	9.5	8.0	9.9	9.7	3.8	2.6	0.5	7.3

以下が結果である(実際には、reducer数分だけ分断されたデータをマージした)

1 1	275.86
1 2	150.55
1 3	331.01
1 4	348.77
1 5	288.74
1 6	214.08
1 7	181.1
1 8	293.04
1 9	187.69
1 10	253.62
2 1	177.72
2 2	98.85
2 3	223.97
2 4	208.59
2 5	231.22
2 6	162.16
2 7	114.16
2 8	175.01
2 9	143.53
2 10	192.22
3 1	249.11
3 2	203.46
3 3	280.97
3 4	352.33
3 5	266.84
3 6	194.71
3 7	210.65
3 8	226.41
3 9	204.36
3 10	232.04
4 1	196.93
4 2	140.13
4 3	238.03
4 4	272.76
4 5	279.42
4 6	208.27
4 7	166.91
4 8	220.97
4 9	170.1
4 10	153.11
5 1	268.96
5 2	157.01
5 3	348.79
5 4	371.03
5 5	287.49
5 6	240.73
5 7	183.97
5 8	253.65
5 9	231.68
5 10	280.21
6 1	274.04
6 2	158.84
6 3	319.36
6 4	351.52
6 5	320.79
6 6	244.02
6 7	205.53
6 8	250.71
6 9	213.51
6 10	265.57
7 1	147.2
7 2	106.64
7 3	147.43
7 4	229.97
7 5	147.79
7 6	118.89
7 7	163.49
7 8	134.07
7 9	123.94
7 10	127.49
8 1	176.8
8 2	155.13
8 3	233.92
8 4	280.09
8 5	264
8 6	203.91
8 7	156.07
8 8	189.7
8 9	169.33
8 10	154.21
9 1	243.58
9 2	149.44
9 3	238.94
9 4	333.33
9 5	224.9
9 6	150.81
9 7	220.28
9 8	232.87
9 9	176.6
9 10	232.97
10 1	243.71
10 2	148.19
10 3	300.77
10 4	321.27
10 5	330.19
10 6	236.15
10 7	173.78
10 8	254.58
10 9	176.43
10 10	202.78

以下にRによる計算(検算)を示しておく。

        [,1]   [,2]   [,3]   [,4]   [,5]   [,6]   [,7]   [,8]   [,9]
 [1,] 275.86 150.55 331.01 348.77 288.74 214.08 181.10 293.04 187.69
 [2,] 177.72  98.85 223.97 208.59 231.22 162.16 114.16 175.01 143.53
 [3,] 249.11 203.46 280.97 352.33 266.84 194.71 210.65 226.41 204.36
 [4,] 196.93 140.13 238.03 272.76 279.42 208.27 166.91 220.97 170.10
 [5,] 268.96 157.01 348.79 371.03 287.49 240.73 183.97 253.65 231.68
 [6,] 274.04 158.84 319.36 351.52 320.79 244.02 205.53 250.71 213.51
 [7,] 147.20 106.64 147.43 229.97 147.79 118.89 163.49 134.07 123.94
 [8,] 176.80 155.13 233.92 280.09 264.00 203.91 156.07 189.70 169.33
 [9,] 243.58 149.44 238.94 333.33 224.90 150.81 220.28 232.87 176.60
[10,] 243.71 148.19 300.77 321.27 330.19 236.15 173.78 254.58 176.43
       [,10]
 [1,] 253.62
 [2,] 192.22
 [3,] 232.04
 [4,] 153.11
 [5,] 280.21
 [6,] 265.57
 [7,] 127.49
 [8,] 154.21
 [9,] 232.97
[10,] 202.78

Amazon Elastic MapReduceを使ったスケーラビリテリーの調査

  • 上記のクラスタで、1000行×1000列同士の積、5000行×5000列同士の積の計算を行う。
1000行×1000列同士の積、5000行×5000列同士の積の計算を行う。
実行時間(sec)
1000行×1000列同士 540 (9mins, 50sec)
5000行×5000列同士 22639(6hrs, 17mins, 19sec)

実行時間は、42倍となった。

1000行1000列の行列積の演算の場合、算術演算の回数は、ほぼ(1000+1000)*1000*1000=2*10^9(20億)回。これに対して、5000行5000列では、(5000+5000)*5000*5000=2.5*10^10(250億回)となり、12.5倍。

実行時間は、演算回数の増加よりはるかに大きくなっており、hadoopの特徴が現れているように思われる(シリアライズやソート、コピーによるオーバーヘッドなど)。

次回のログでは、これを改善するプログラムを紹介する。