Hadoop MapReduceで行列積を計算する(ケース2)(Dense Matrix Multiplication with Hadoop MapReduce: Case2)



しかしながら、同誌に掲載されている「Yahoo! JAPAN 景気指数」では60万語(60万変量)と、CIとの相関を調べている。


とくに、Case1では、Mapで読み込んだデータをそのままReduceに渡しており、シリアライズと圧縮(Amazon EMRでは、デフォルトのSnappy形式に転送データを圧縮する)、転送におけるオーバーヘッドが高いのは明らかである。





行インデックス 列インデックス 計算結果


1 1	3.6
1 2	2.2
1 3	9.7
1 4	7.5
1 5	9.1
1 6	7.4
1 7	1.5
1 8	3.7
1 9	6.7
1 10	7.9
2 1	5.9
2 2	3.8
2 3	7.1
2 4	7.1
2 5	0.7
2 6	2.6
2 7	1.6
2 8	3.7
2 9	1.4
2 10	5.7
3 1	2.6
3 2	8.5
3 3	4.2
3 4	7.3
3 5	4.3
3 6	7.5
3 7	3.3
3 8	6.9
3 9	8.1
3 10	2.7
4 1	7.1
4 2	4.4
4 3	8.1
4 4	0.8
4 5	2.5
4 6	5.3
4 7	7.2
4 8	2.6
4 9	4.7
4 10	4.2
5 1	3.7
5 2	6.9
5 3	6.6
5 4	9.9
5 5	9.3
5 6	4.9
5 7	4.3
5 8	5.3
5 9	2.1
5 10	5.8
6 1	4.5
6 2	5.2
6 3	9.2
6 4	7.1
6 5	5.5
6 6	2.8
6 7	3.5
6 8	6.4
6 9	5.8
6 10	8.2
7 1	0.9
7 2	2.3
7 3	1.9
7 4	1.2
7 5	6.4
7 6	1.8
7 7	5.2
7 8	6.8
7 9	6.1
7 10	0.5
8 1	7.1
8 2	7.7
8 3	3.0
8 4	4.1
8 5	0.2
8 6	9.3
8 7	9.6
8 8	1.5
8 9	2.3
8 10	1.5
9 1	2.0
9 2	0.8
9 3	5.1
9 4	7.9
9 5	9.6
9 6	4.2
9 7	4.4
9 8	9.0
9 9	9.9
9 10	0.9
10 1	8.2
10 2	3.6
10 3	8.7
10 4	2.9
10 5	4.8
10 6	5.6
10 7	5.7
10 8	1.1
10 9	6.8
10 10	7.0


具体的には、行列積 C=ABを計算するにあたり、行列Aの行と、行列Bの列をそれぞれM個とN個のブロックに分解し(下図)、部分行列単位で計算を行う。(一番シンプルでストレートフォワードな方法かと思う)




  • Bは転置(B’)として作成しておかなくてはならない。これは、データの読み込みを行うMapperが、「どの行から読むか分からない」ため、転置せずにおくと、列が途中で分断されてしまう。また、Mapperのこの性質より、データの各行の先頭には、インデックス(1から始まる連番)を付与する。
  • カスタマイズしたキーを用意したので、キークラスにhasHash()メソッドを用意しておく。PartitionerはデフォルトのHashPartitionerを利用するが、ソースコードを見ると、キーのhashCode()メソッドを使ってPartitioningを行っているため。これを実装しないと、並列演算した際に正しくキー単位でreduceにデータが渡されない。

開発機はMac Book Pro(Mountain Lion)。Hadoopのバージョンは1.1.2である。


package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 *  Matrix Multiplication on Hadoop Map Reduce(Step1)
 *   author : tetsuya.odaka@gmail.com
 *   tested on Hadoop1.1.2 
 *   Split the Large Scale Matrix to SubMatrices.
 *   Split size (Number Of Rows or Columns) can be specified by arguments.
 *   This should be decided according to your resources.
 *   Partitioner and Conditioner are not implemented here.
 *   Can calculate real numbers (format double) and be expected.
 *   This program is distributed under ASF2.0 LICENSE.
public class TransformMatrix {
	 *  Map Class
	 *  read MatrixA and decompose its elements to blocks
    public static class Map extends Mapper<LongWritable, Text, IntWritable, Text>{
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	String keyArr[] = strArr[0].split(" ");

            // retrieve from configuration
            boolean tr 	= Boolean.parseBoolean(context.getConfiguration().get("transpose"));	// row block size

            	int n= Integer.parseInt(keyArr[1]);	// number of column
            	context.write(new IntWritable(n), value);
            	int n= Integer.parseInt(keyArr[0]);	// number of row
            	context.write(new IntWritable(n), value);
	 * Reduce Class
    public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{

        protected void reduce(IntWritable key, Iterable<Text> values, Context context) 
        		throws IOException, InterruptedException{

    		HashMap<Integer, Double> aMap = new HashMap<Integer, Double>();

    		// retrieve from configuration
            boolean tr 	= Boolean.parseBoolean(context.getConfiguration().get("transpose"));	// row block size
        	for(Text value: values){
            	String strVal = value.toString();
            	String strArr[] = strVal.split("\t");
            	String keyArr[] = strArr[0].split(" ");
            	Double val = Double.parseDouble(strArr[1]);
                BigDecimal bd = new BigDecimal(val);
    			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 

    			if(tr) {


        	Set<Integer> setA = aMap.keySet();
        	Set<Integer> sortedSetA = new TreeSet<Integer>(setA);
        	StringBuffer sb = new StringBuffer();
        	for(int indexA : sortedSetA){
        		if(indexA > 1) sb.append(" ");
            context.write(key, new Text(sb.toString()));

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    	Date startProc = new Date(System.currentTimeMillis());
    	System.out.println("process started at " + startProc);
    	Configuration conf = new Configuration();
        	conf.set("transpose", "true"); // transpose
        	conf.set("transpose", "false"); // 

        Job job = new Job(conf, "MatrixMultiplication");



        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, Map.class); // matrixA
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // output path

        boolean success = job.waitForCompletion(true);

    	Date endProc = new Date(System.currentTimeMillis());
    	System.out.println("process ended at " + endProc);



package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 *  Matrix Multiplication on Hadoop Map Reduce
 *   author : tetsuya.odaka@gmail.com
 *   tested on Hadoop1.2 
 *   Split the Large Scale Matrix to SubMatrices.
 *   Split size (Number Of Rows or Columns) can be specified by arguments.
 *   This should be decided according to your resources.
 *   Partitioner and Conditioner are not implemented here.
 *   Can calculate real numbers (format double) and be expected.
 *   This program is distributed under ASF2.0 LICENSE.
public class MatrixMult {

	 *  IndexPair Class
	 * reduce用のキーを、MatrixAの行ブロック番号、MatrixBの列ブロック番号、要素番号にする。
	 * customized key for reduce function consists of row BlockNum of MatrixA, MatrixB, and number of elements.
	public static class IndexPair implements WritableComparable<MatrixMult.IndexPair> {
		public int index1;
		public int index2;
		public IndexPair() {
		public IndexPair(int index1, int index2) {
			this.index1 = index1;
			this.index2 = index2;

		public void write (DataOutput out)
			throws IOException
		public void readFields (DataInput in)
			throws IOException
			index1 = in.readInt();
			index2 = in.readInt();

		public int compareTo(MatrixMult.IndexPair o) {
			if (this.index1 < o.index1) {
				return -1;
			} else if (this.index1 > o.index1) {
				return +1;
			if (this.index2 < o.index2) {
				return -1;
			} else if (this.index2 > o.index2) {
				return +1;
			return 0;

		 * hasHash() is used by HashPartitionar.
		public int hashCode(){
			int ib = this.index1;
			int jb = this.index2;
			int num = ib * Integer.MAX_VALUE + jb;
			int hash = new Integer(num).hashCode();
			return Math.abs(hash);

	 *  MapA Class
	 * Matrix Aのデータを読み込んで、行をブロックに分解する。
	 *  read MatrixA and decompose it to blocks
    public static class MapA extends Mapper<LongWritable, Text, MatrixMult.IndexPair, Text>{
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	int i= Integer.parseInt(strArr[0]);
        	String v= strArr[1];

            int m = 0;

            // retrieve from configuration
            int IB 	= Integer.parseInt(context.getConfiguration().get("IB"));
            int N 	= Integer.parseInt(context.getConfiguration().get("N"));

            if(i%IB == 0){
            	m = i/IB; 
            	m = i/IB + 1;             	
            for(int j=1;j<(N+1);j++){
            	context.write(new MatrixMult.IndexPair(m,j), new Text("0"+","+i+","+v));

	 * MapB Class
	 * Matrix B'のデータを読み込んで、行をブロックに分解する。
	 *  read MatrixB and decompose it to blocks
    public static class MapB extends Mapper<LongWritable, Text, MatrixMult.IndexPair, Text>{
        protected void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String strArr[] = value.toString().split("\t");
        	int k= Integer.parseInt(strArr[0]);
        	String v= strArr[1];
            int n = 0;

            // retrieve from configuration
            int KB 	= Integer.parseInt(context.getConfiguration().get("KB"));
            int M 	= Integer.parseInt(context.getConfiguration().get("M"));
            if(k%KB == 0){
            	n = k/KB; 
            	n = k/KB + 1;             	
            for(int j=1;j<(M+1);j++){
            	context.write(new MatrixMult.IndexPair(j,n), new Text("1"+","+k+","+v));

	 * Reduce Class
    public static class Reduce extends Reducer<MatrixMult.IndexPair, Text, Text, DoubleWritable>{
        protected void reduce(MatrixMult.IndexPair key, Iterable<Text> values, Context context) 
        		throws IOException, InterruptedException{

    		List<RowContents> aList = new ArrayList<RowContents>();
        	List<RowContents> bList = new ArrayList<RowContents>();
        	Map<String,List<RowContents>> cMap = new HashMap<String,List<RowContents>>();

    		for(Text value: values){
            	String strVal = value.toString();

            	String mtx;
            	String sRow;
            	String[] strArray = strVal.split(",");
        		if(Integer.parseInt(strArray[0])==0) {
        			mtx = "A";
        			mtx = "B";
    			sRow = strArray[1]+","+strArray[2];

            	cMap.get(mtx).add(new RowContents(sRow));
    		for(RowContents ra : cMap.get("A")){
        		for(RowContents rb : cMap.get("B")){
        			int indexA = ra.index;
        			int indexB = rb.index;
        			double sum = 0;
        			for(int i=0;i<ra.lstRow.size();i++){
        				sum += ra.lstRow.get(i)*rb.lstRow.get(i);
                    BigDecimal bd = new BigDecimal(sum);
        			BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP); 
                    context.write(new Text(indexA + " " + indexB+ " "), new DoubleWritable(r.doubleValue()));
    	public class RowContents {
    		public String 	strRow;
    		public int 		index;			// means row index
    		public List<Double> 	lstRow;	// list of elements of row.

    		public RowContents() {

    		public RowContents(String strRow) {
    			this.strRow = strRow;
    			this.lstRow = new ArrayList<Double>();
    		public void calculate(){
    			String[] strArr = this.strRow.split(",");
    			this.index = Integer.parseInt(strArr[0]);
    			String[] aArr 	= strArr[1].split(" ");
                for(int i=0; i<aArr.length; i++){

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    	Date startProc = new Date(System.currentTimeMillis());
    	System.out.println("process started at " + startProc);
    	Configuration conf = new Configuration();
        int I = Integer.parseInt(args[3]); // Num of Row of MatrixA
		int K = Integer.parseInt(args[4]); // Num of Row of MatrixB'

		int IB = Integer.parseInt(args[5]); // RowBlock Size of MatrixA
		int KB = Integer.parseInt(args[6]); // RowBlock Size of MatrixB'
		int M =0;
		if(I%IB == 0){
			M = I/IB;
			M = I/IB+1;

		int N =0;
		if(K%KB == 0){
			N = K/KB;
			N = K/KB+1;
    	conf.set("I", args[3]); // Num of Row of MatrixA
    	conf.set("K", args[4]); // Num of Row of MatrixB'
    	conf.set("IB", args[5]); // RowBlock Size of MatrixA
    	conf.set("KB", args[6]); // RowBlock Size of MatrixB'
    	conf.set("M", new Integer(M).toString());
    	conf.set("N", new Integer(N).toString());
    	Job job = new Job(conf, "MatrixMultiplication");



        // Mapperごとに読み込むファイルを変える。
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapA.class); // matrixA
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapB.class); // matrixB
        FileOutputFormat.setOutputPath(job, new Path(args[2])); // output path
		System.out.println("num of MatrixA RowBlock(M) is "+M);
		System.out.println("num of MatrixB RowBlock(N) is "+N);

		boolean success = job.waitForCompletion(true);

    	Date endProc = new Date(System.currentTimeMillis());
    	System.out.println("process ended at " + endProc);


Amazon Elastic MapReduceを使ったスケーラビリテリーの調査

Amazon ElasticMapReduceの以下のクラスタで実行時間の比較を行った。

リージョン US Standard
インスタンスタイプ m1.small
マスタ・インスタンスグループ 1インスタンス
コア・インスタンスグループ 8インスタンス
タスク・インスタンスグループ 10インスタンス
  • 5000行×5000列同士の積、10000行×10000列同士の積の計算を行う。
  • 10000行×10000列の行列と10000行のベクトルの積、20000行×20000列の行列と20000行のベクトルの積の計算を行う。


行列Aの変換(sec) 行列Bの変換(sec) 行列の乗算(sec) 実行時間(sec)
5000行×5000列同士 189 189 1174 1552(25min,52sec)
10000行×10000列同士 398 396 6921 7715(128min,35sec)






行列の変換(sec) ベクトルの変換(sec) 乗算(sec) 実行時間(sec)
10000 398 56 82 536(8min,56sec)
20000 1156 56 164 1376(22min,56sec)