Amazon Elastic MapReduce : Hadoop2.4環境で100万変量(10GB)の算術平均を計算する。

前々回の記事では、1変量の算術平均を計算した。
今回は、以下の形式(多変量データ)の「各列」、つまり「各変量」の平均値を計算する。前々回の記事で述べたように、「超多変量」=「超高次元」がビッグデータの本質の1つである。

せっかくなので、自分の手元のMacBook ProCore i5)のRでは処理が厳しいくらいの大きさのデータを、Amazon Elastic MapReduceで処理し、処理時間とスケール・アウトさせたときの様子を見みようと思う。
後述するが、変量(特徴)の数は1,000,000、サンプルサイズは1,000とおく。多変量解析のテキストでは、サンプル・サイズ>変量の数、となっているのが普通だが、ビッグデータの場合にはこれが逆転するケースが発生する。(例えば、1webページ中の語彙、1人の遺伝情報など)

データ

以下に、データ構造とデータファイル(MapReduceに読ませるテキストファイル)の関係を示す。

データの形式は以下とする。

  • 先頭の数値は「列番号」を表す。
  • 数値の区切りは半角スペースにする。
  • データはテキストファイルにいれる。
  • 1つの変量のデータは必ず1つのファイルに入れる。

各テキストファイルについてMapperを動かして、変量間の処理が交錯しないようにする。そうすれば、処理をスケールアウトさせることができる。

データの作成には、下のjavaプログラムを使用した。

  • 列(変量)の数は1,000,000(m)。
  • 各列について、数値を1,000個(n)用意する。
  • 数値は、10Bytes-Lengthの[0,10]の一様乱数。
  • 10,000変量分を1つのテキストファイルに格納する。

以下にまとめると、

テキストファイルの数 100
各テキストファイルのサイズ 約100MB
1テキストファイル中の変量数 10,000
総変量数 1,000,000
サンプルサイズ 1,000
総データ数 1,000,000,000
総ファイルサイズ 10GB

となる。これ位のサイズになると、自分のMacBook Pro(Core i5,2.5GHz,Mem:8GB)に載せたRでは厳しい。

package com.tetsuyaodaka;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;

/**
 * Create Sample Date for Hadoop Calculation.
 * 
 * Create using uniform distribution [0.10] 
 * 
 * args: 0: directory path of the output file (filename must be included)
 * 
 * @author tetsuya.odaka@gmail.com
 *
 */
public class CreateThreadTestData {

	public static void main(String[] args) {
		System.out.println("start");
		
		// 変量数
		int numV = 1000000;
		// サンプルサイズ
		int numS=1000;
		// 1ファイル中のレコード数
		int numR=10000;
		
		// ファイル数
		int numF=1;
		if(numV%numR == 0){
			numF = numV/numR;
		}else{
			numF = numV/numR+1;
		}
		
		long seq = 0;
		for(int j=1;j<numF+1;j++){
			String fileName = "data"+j+".txt";
			String filePath = args[0]+"/"+fileName;
		
			try {
				PrintWriter outFile 
				= new PrintWriter(
						new BufferedWriter(
								new OutputStreamWriter(
										new FileOutputStream(filePath),"UTF-8")));
				
				// レコード数
				for(int k=0;k<numR;k++){
					// Seq. No.
					outFile.print(++seq+" ");
					// サンプルサイズ
					for(int i=0;i<numS;i++){
						double rnd = Math.random()*10;
						BigDecimal bd = new BigDecimal(rnd);
						BigDecimal r = bd.setScale(8, BigDecimal.ROUND_HALF_UP); 
						if(i == (numR-1)){
							outFile.print(r);
						}else{
							outFile.print(r+" ");
						}
					}
					outFile.print(System.getProperty("line.separator"));
				}
				outFile.close();
				
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}
		}
		System.out.println("end");
		
	}

}

サンプルサイズが1,000と小さいのは、先のブログに述べたように、ビッグデータの本質的な難しさはサンプルサイズの大きさではなく、次元の高さによるからである。

各テキストファイルの大きさは、Amazon EMRの標準ブロックサイズ(134MB)に入る程度になっている。

変量ごとの平均を算出するMapReduceプログラム

以下にMapReduceプログラムを示す。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
    */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Hadoop2.4.0のMapReduceで算術平均を計算する。(多変量の場合)
 * 
 * author : tetsuya.odaka@gmail.com
 * 
 */
@SuppressWarnings("unused")
public class MultiMeanCalc {

	/*
	 * Map
	 * 列番号 数値 数値と並んでいるText Fileを読んで、(列番号,値)のキー・バリューを書き出す。
	 * 
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,DoubleWritable>{

		private String mapTaskId;
		private String inputFile;
        
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }
        
        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
        		String str = value.toString();
        		int index = str.indexOf(" ");
        		
        		String keyStr = str.substring(0, index);
        		String valArr[] = str.substring(index+1).split(" ");
        		
        		IntWritable outKey = new IntWritable(Integer.parseInt(keyStr));
        		
        		for(String valStr : valArr){
        			double dval = Double.parseDouble(valStr);
                    context.write(outKey, new DoubleWritable(dval));
        		}
        }
    }

	/*
	 * Reduce
	 * (列番号,値)を集約後、算術平均を計算する
	 */
    public static class Reduce extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable>{
		private String 	reduceTaskId;
        
        public void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) 
        		throws IOException, InterruptedException{
        	long 	noValues = 0;
            double 	sum = 0;
            for(DoubleWritable value: values){
            	++noValues;
                sum += value.get();
            }

            if(noValues > 0) sum = sum/noValues;
            
            context.write(key, new DoubleWritable(sum));
        }
    }
    

	public static void main(String[] args) throws Exception {
    	
        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(MeanCalc.class);
        
        // Mapperクラス, Reducerクラスの定義
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // OutputKeyとOutputValueのクラス定義
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        
        // 入出力フォルダーの指定
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);
        
        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        // MapReduceジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }

        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}

実行:11台構成

まず、Amazon Elastic MapReduceクラスタを以下のサイズとした。

ノード サイズ 台数
マスタ m1.middle 1
コア m1.middle 5
タスク m1.middle 5

ログをS3に採るようにクラスタを起動し、実行はRuby Client。

elastic-mapreduce --jar s3n://[your bucket]/[your folder]/hadoopsample.jar --main-class com.tetsuyaodaka.MultiMeanCalc --args s3n://[your bucket]/[your input folder],s3n://[your bucket]/[your input folder] --step-name MiltiMeanCalculation --jobflow j-XXXXXXXXX

タイプスタンプはS3バケットのstdoutに吐き出される。

Program Start: 2014/06/12 9:18:28
Output folder is s3n://XXXXXXXXXXX/EMR/output/20140612/10344
Program END: 2014/06/12 9:38:32

実行時間は、20min.04sec.(1,204sec.)と決して速くはない。

以下は、マネージメントコンソールからタスク(Task_attempt)の一覧を表示したところ。
起動したタスクは、Mapが200、Reduceタスクが39。エラーの発生はなかった。

結果を保存したS3バケットスクリーンショットを以下に示す。39のreduceから均等に結果が吐き出されている。

出力は以下のように、キー(変量のSeq)+タブ+平均値+改行となっている。

39	4.94539519773
78	4.911648371779998
117	5.014055572980005
156	4.970061421849994
195	4.937069895429997
234	5.206232056259996
273	4.994578578919996
312	4.897684452960004
351	5.095053427910006
390	4.970409691689994
429	4.889084266230005
468	4.95468362101
507	5.107228051750001
546	4.844947219119992
...

スケールアウトさせる。

では、クラスタサイズを大きくしたときに、計算がスケールするか確認する。

クラスタ0(上の構成)
マスタノード数 1
コアノード数 5
タスクノード数 5
総ノード数 11
Mapタスク数 200
Reduceタスク数 39
実行時間 20min.04sec.(1,204sec.)
Map起動までのレイテンシー 38sec
Map処理時間 14min.23sec
Reduce起動までのレイテンシー 2min.9sec

(注記)

  • Map起動までのレイテンシー:プログラム開始から最初のMapタスクが起動されるまでの時間
  • Reduce起動までのレイテンシー:プログラム開始から最初のReduceタスクが起動されるまでの時間
  • Map処理時間:最初のMapタスクが起動してから、最後のタスクが終了するまでの時間。
クラスタ
マスタノード数 1
コアノード数 10
タスクノード数 10
総ノード数 21
Mapタスク数 200
Reduceタスク数 39
実行時間 10min.02sec.(602sec.)
Map起動までのレイテンシー 35sec
Map処理時間 7min.59sec
Reduce起動までのレイテンシー 1min.56sec
クラスタ
マスタノード数 1
コアノード数 20
タスクノード数 20
総ノード数 41
Mapタスク数 200
Reduceタスク数 39
実行時間 6min.23sec.(382sec.)
Map起動までのレイテンシー 40sec
Map処理時間 4min.5sec
Reduce起動までのレイテンシー 2min.5sec
散布図

横軸に総ノード数、縦軸に実行時間をとったグラフを示す。

クラスタ0(11台構成)からクラスタ1(21台構成)へは、面白いように実行時間が半分となった。この結果を外挿すれば、クラスタ2(43台構成)では300sec.程度となるが、382sec.と1分ほど大きい数字となっている。

クラスタサイズを大きくしたことによる、Map起動までのレイテンシーを見ると、初期処理のレイテンシー増加(S3からHDFSへの配置)は見られず、Reduceタスク起動までのレイテンシーも変わらない(Reduceは非常に速く起動していることが分かる)。
クラスター1から2では、Mapの処理時間はスケールしているから、Reduceがボトルネックになっている可能性がある。(Reduceの数を増やせば、スケールするかな)