Amazon Elastic MapReduce : Hadoop2.4環境でDistributed Cacheを使う。

ようやく以前やりたかったことができた。
というのは、「S3バケットにおいたテキストファイルをDistributed Cacheとして使う」こと。

twasinkさんがGitHubに公開してくれているコードのおかげ。Thank you.

Distributed Cacheを使いたいケースはままあって、例えば、1つの変量を基準として処理を分散したい場合がそれに当てはまる。目的変数に対して説明変数を対比させるなど。

Distributed Cacheについては、Amazon Elastic MapReduceにも説明がある(こちら)し、象本にも当然書いてある(こちらは純粋なDistributed Cacheに説明)のだが、S3をプログラム内で設定して使う、というサンプルを見つけられないでいた。

以下に、サンプルプログラムをメモしておく。
サンプルを探している間に、ToolRunnerをつかってJobを実行するサンプルがあったので、(StackOverFlow:Hadoop JobConf class is deprecated , need updated example、inquireさんThank you)同様に書いてみた。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 */
package com.tetsuyaodaka;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;

/**
 * Hadoop2.4.0のMapReduceで、S3にあるファイルをDistribution Cacheとして使う。
 * 
 * 参考:
 * S3バケットにあるテキストファイルをDistribution Cacheとして使う。
 * https://gist.github.com/twasink/8813628
 * 
 * ToolRunnerの実行:
 * http://stackoverflow.com/questions/8603788/hadoop-jobconf-class-is-deprecated-need-updated-example
 * 
 */
@SuppressWarnings("unused")
public class DistCache extends Configured implements Tool{

	/*
	 * Map
	 * Text Fileを読んで、スペースで区切って、(1,値)で書き出す。
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,DoubleWritable>{
    	private final static IntWritable one = new IntWritable(1);
		private int noRecords;

        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tk = new StringTokenizer(line);
            while(tk.hasMoreTokens()){
            	++noRecords;
                double dval = Double.parseDouble(tk.nextToken());
                context.write(one, new DoubleWritable(dval));
            }
        }
        
        @Override
        /*
         * Mapのsetup()メソッドでDistribution Cacheを読み込む。
         * 
         * https://gist.github.com/twasink/8813628がオリジナル。
         * 
         * 読み込んだら、クラスメンバー変数に入れればよい。
         * 
         */
        protected void setup(Context context) throws IOException, InterruptedException {
            if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
                URI cacheFileUri = context.getCacheFiles()[0];
                
                if (cacheFileUri != null) {
                	// Cacheファイルが見つかったら、Stringにしてstdoutに出力
                	System.out.println("Contents Of Cache File: " + FileUtils.readFileToString(new File("./cacheFile")));
                } else {
                    System.out.println("NO CACHE FILE");
                }
            } else {
                System.out.println("NO CACHE FILES AT ALL");
            }
        }
	}

	@Override
	public int run(String[] args) throws Exception {

        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(DistCache.class);

        // Mapperクラスの定義
        job.setMapperClass(Map.class);

        // OutputKeyとOutputValueのクラス定義
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        
        // 入出力フォルダーの指定
        // インプラントフォルダーは引数からとる。
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);

        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //S3バケットにあるテキストファイルをDistribution Cacheとして設定する。
        String s3File = "s3n://banyantreeus/EMR/distibution-cache/data1.txt";
        job.addCacheFile(new URI(s3File + "#cacheFile"));
		
        // 実行
	    job.submit();
        
		return 0;
	}

    public static void main(String[] args) throws Exception {
    	
        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // ToolRunnerのrunメソッドをOverrideして実行させる。
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        ToolRunner.run(new DistCache(), otherArgs);
        
        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}


プログラムでは、処理開始時にS3バケットにあるテキストデータをDistributed Cacheに設定し、Mapのsetup()でそれを取得してプリントする。Mapの処理に特に意味はない。この出力結果は、マネージメント・コンソールのStep、Job、TaskAttemptと手繰って、stdoutで確認できる。

助かった。皆さん、本当にありがとう。