Amazon Elastic MapReduce : MapとReduceのスレッドセーフ実験。

前回の記事で、Hadoop2にしたら随分たくさんMapとReduceが動いたので、スレッドじゃないよね、と思ってしまった。特にMapperは起動の仕方でプログラムがデタラメになってしまうので、ちょっと調べてみた。

Hadoop2.4のAPIをみると、Mapperをつかってる限りでは、Runnableを引き継いでくることはないようで、Javadocのdescriptionの下の方に、

Applications may write a custom MapRunnable to exert greater control on map processing e.g. multi-threaded Mappers etc.

と書いてあって、MapRunnableを使うとスレッドで処理できるよ、となっている。
ちなみに、MapRunnableのJavadocをみると、runメソッドがあって、これをoverrideすることで、Mapの動きを変えられるようになっているようだ。Yahooデベロッパー・ネットワークの「Hadoopを使いこなす」にも同様の記事が乗っている。こちらのインターフェイルもrunnableなわけでなく、runnableにできるよ、ということと理解した。MapperもReducerもstaticだし。

とはいうものの、やってみたら違ってたではいけないので、簡単な実験をしてみた。

実験には、

  • キー:1から100までの数値
  • バリュー:小数点を入れて10桁の数値×10,000。区切り文字は半角スペース。
  • Mapへの入力方式:各キーにつき1つのテキストファイルを作成して、Mapに入力する。

というデータをつかった。
たとえば、キーが6のときは以下のような格好になって、data6.txtに保管する。

6 2.98677076 1.22031748 7.13013497 4.67325002 9.55793889 9.26554861 ......

バイト数は、(10+1)*10,000+2=110,002なので、おおよそ110Kになる。

備忘のため、以下にこれを生成したjavaアプリケーションを以下に示しておく。

package com.tetsuyaodaka;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;

/**
 * Create Sample Date for Hadoop Calculation.
 * 
 * Create using uniform distribution [0.10] 
 * 
 * args: 0: directory path of the output file (filename must be included)
 * 
 * @author tetsuya.odaka@gmail.com
 *
 */
public class CreateThreadTestData {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		System.out.println("start");
		
		int numF = 100;
		
		for(int j=1;j<numF+1;j++){
			String fileName = "data"+j+".txt";
			String filePath = args[0]+"/"+fileName;
		
			try {
				PrintWriter outFile 
				= new PrintWriter(
						new BufferedWriter(
								new OutputStreamWriter(
										new FileOutputStream(filePath),"UTF-8")));
		
				outFile.print(j+" ");

				int sizeC=10000;
				for(int i=0;i<sizeC;i++){
					double rnd = Math.random()*10;
					BigDecimal bd = new BigDecimal(rnd);
					BigDecimal r = bd.setScale(8, BigDecimal.ROUND_HALF_UP); 
					if(i==(sizeC-1)){
						outFile.print(r);
					}else{
						outFile.print(r+" ");
					}
				}
				outFile.print(System.getProperty("line.separator"));
				outFile.close();
			} catch (UnsupportedEncodingException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (FileNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		System.out.println("end");
		
	}

}


以下が実験につかったMapReduceプログラム。
やっていることは単純で、上のテキスト・ファイル(110K)を読んで、キーを分離してReduceに渡し、そのまま出力する。
ブロックサイズ(Amazon EMRだと134217728=134M)、m1.middleのメモリーサイズ(Mem: 3,843,032k total)と比較してデータが小さいので、一気にいけるはず。
処理されるデータを確認する(データが切れたりしないことを確認する)ために、Map.map()、Reduce.reduce()、main()内で内容を出力する。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Hadoop2.4.0のMapReduceの挙動を確認する。
 * 
 * author: tetsuya.odaka@gmail.com
 * 
 */
@SuppressWarnings("unused")
public class MultiThread {

	/*
	 * Map
	 * 列番号 数値 数値と並んでいるText Fileを読んで、(列番号,値 値)のキー・バリューを書き出す。
	 * map()の中でTaskIdと、ファイルの内容等を書き出す。
	 * 
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,Text>{

        private String mapTaskId;
        private String inputFile;
        
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }

        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String str = value.toString();
    		int index = str.indexOf(" ");
        	System.out.println("mapTaskId:"+mapTaskId);
       		System.out.println("inputFile:"+inputFile);
       		System.out.println("index:"+index);
        	System.out.println("str:"+str);
        		
        	// 1つ目の数字以降がデータ
        	context.write(new IntWritable(Integer.parseInt(str.substring(0, index))), new Text(str.substring(index+1)));
        }
    }

	/*
	 * Reduce
	 * キーとバリューをそのまま出力する。
	 * reduce()の中でTaskIdやkey,value等を書き出す。
	 */
    public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{
    	private String 	reduceTaskId;
        private int 	noKeys = 0;
        
        public void setup(Context context) {
        	reduceTaskId = MRJobConfig.TASK_ATTEMPT_ID;
          }
        
        public void reduce(IntWritable key, Iterable<Text> values, Context context) 
        		throws IOException, InterruptedException{
        	++noKeys;
        	int 	noValues = 0;
        	for(Text value: values){
            	++noValues;
            	String str=value.toString();
        		System.out.println("reduceTaskId:"+reduceTaskId);
        		System.out.println("noKeys:"+noKeys);
        		System.out.println("key-reduce:"+key);
        		System.out.println("value-reduce:"+str);
            	context.write(new IntWritable(noValues), value);
            }
        }
    }

    public static void main(String[] args) throws Exception {
    	
        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(MultiThread.class);
        
        // Mapperクラス, Reducerクラスの定義
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // MapperのOutputKeyとOutputValueのクラス定義
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        // 入出力フォルダーの指定
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);
        
        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        // MapReduceジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }

        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}

結果

main関数内に書いたprintは、クラスタ作成時に指定した、s3バケット内(stdout)にでる。

これを見ると、処理にかかった時間は、21min10sec。

Program Start: 2014/06/12 09:31:04
Output folder is s3n://XXXXXXXXX/EMR/output/20140612/35199
Program END: 2014/06/12 09:52:24


次にMapReduceの状況をみるには、マネージメント・コンソールでステップを選択して、タスクの一覧を表示する。

ここで、View attemptをクリックしてstdoutを開けば、Map.map()とReduce.reduce()内にかいたprintの結果をみることができる。

mapタスクの数 100
reduceタスクの数 7

となっていて、各mapタスクで1つのテキストファイルを読み込んでいる(10000個の数字も問題なく読んでいる)ことが確認できた。
Hadoop1のとき、Mapのスロットが開かない!!と感じることがあったので、Hadoop2ではこの辺りが改善されている(みたい)。
象本には、小さいファイルを使う事で負荷が分散できる(データロードにかかる時間の、全体の処理時間に対する割合が小さくなる)と書いてある。逆に、小さすぎても良くない、とも書いてあるけど、Mapがこれだけ上がってくると、ロードの効率化が図れると思った。

reduceタスクは、(曖昧な記憶によれば)キーのハッシュ値から除算をしてReducerへの割当を決定している。
Reduceタスク6(r_000006)に割り振られたキーは、6-97の14つ(Reduceが7つなので、7の倍数になっている)。
以下のような配分で100個のキーが綺麗に分かれた。このうち、r_000005では2回のattemptが実行されたが(早く終わったのかな)、2回目のattemptはなにもせずにkillされた。

タスクID 処理したキーの数
r_000006 14
r_000005 14
r_000004 14
r_000003 14
r_000002 15
r_000001 15
r_000000 14