設計製造ソリューション(DMS)+3D&バーチャルリアリティ展(IVR)に行ってきた。

東京ビッグサイトで行われている展示会に行ってきた。見たのは、設計製造ソリューション展(DMS)と3D&バーチャルリアリティ展(IVR)の2つ。

製造業育ちなので、設計製造ソリューションの方は必須という意識があるのだが、お隣のブースの3D&バーチャルリアリティもとても面白かった。
3Dのエリアは、なんといっても3Dプリンターにものすごく人が集まっていて、熱気がスゴい。自分もいろいろな種類のプリンターが見られて勉強になった。

こんな感じで、Personalとか書いて売っているものもある。個人で所有できます、というステージに入っている。グルーガンのような樹脂を射出するハンディタイプのものなら19800円のものがあった(でも、これはプリンターではありません)。ホビー系としてはとても面白そう。

ここも3Dプリンターのブース。人の数がスゴい。

個人的にとてもスゴいと思ったのが、積木製作さん&朝日テレビメディアプレックスさんのVRのブース。(話を聞くのに夢中になってしまって、写真を取り忘れてしまった)
iMacがおいてあって、3D用のゴーグルをかけると、ゲームのような仮想現実(VR)の世界に入ってしまう(ゴーグルをすると、完全に入ってしまう)。ここまでは、他のブース(例えば、トレーニング目的のシミュレータとかもそう)と変わらないのだが、仮想現実の画面がChromeレンダリングされていてビックリしてしまった。
iMac+Google Chromeに、PSのゲームみたいな3D画像がぐるぐる動いている。この画像がとてもリアルに出来ていて、ガラス面への映り込みまでキッチリCGになってる。ヒラヒラ飛ぶアバターに、iMacのカメラでとった自分のビデオが(リアルタイムに)はめ込まれて、他のアバターとチャットできる。
これが、Chrome上で動いてる!!
開発をされたのは、朝日テレビメディアプレックスさんのまだ20代と思われる人。あんまりスゴいのでお話を聞いてみると、WebGLやWebSocket etc.といったhtml5で動いているとのこと。なんでここまでできるのだろうと思っていると、学生時代からエンジンの開発にも参加していると聞いて、スゴく元気&国際的な若い日本人に会えてうれしくなってしまった。明日から、ロンドンのブラウザー開発者のカンファレンスに行かれると聞いた。素晴らしい!!

あとは、Citrixさんの3D CADを仮想デスクトップに置く、というのも興味深かった。3D CAD(ソリッドモデル)は(これも)レンダリングに時間がかかるので、昔(10年位前)はワークステーションでやっていたし、ハイパフォーマンスなマシンでなくちゃ、という感覚があった。
NVIDAと共同開発したvGPU(仮想グラフィッフ用チップ)をXenServerのハイパーバーザーに載せて、その上に仮想サーバーか、仮想デスクトップを載せる。前者の構成は、Meta Frameとほぼ同じものになる。
いずれにせよ。高価な3D CADのマシンはいりません、という構成ができてしまう。
Xenはもともとオープンソースだったが、Xen Sourceという会社ができて、すぐにCitrixに買収されてしまった。これが5,6年前のことだから、こういうソリューションは当時、すでに頭にあったのだなぁ、と思ってしまった。

CAD関連は展示会のメイン。Citrixさんの仮想、朝日メデイアプレックスさんのVR。どっちも3DモデリングへのITの適用。きっと、3D CADはブラウザーベースになってしまう、と思った。
そうしたら、すぐに3Dプリンターで(技術)試作はできてしまう。ブラウザーベースなら、どこにいてもできるし、3DプリンターがPersonalになっている以上、世界中のどこでもできてしまう。
素晴らしい!!

その他、AR(拡張現実)のブースにも立ち寄ったが、これもかなり面白い。
文書の中に動的にビデオを埋め込んでしまうデモを見せてもらった。VRとARの(個人的な理解による)違いは、前者が完全にCGの世界に入ってしまうのに対して、後者が現実を写す画像ストリームをパターン認識して、そこになんらかを埋め込んでしまうところにある(と思っている)。文書の中に動的にビデオを持ち込めれば、本の形態が変わってしまう(前にテレビでみたことがあるから、もうなってる?)し、広告のあり方も変わってしまう。スゴいものができてるなぁ。

最後に、母校の東京工業大学が出展していたので立ち寄った。

展示していたのはスパコン(当然ですが、持ってこられないので、写真とか能力の説明とかの展示です)。ロマンがありますね。
このブログではしばらく分散コンピューティングをやってきたが、スパコンでなくちゃできない計算がある。たとえば、天気予報。的中率が昔よりよくなっているのは、コンピュータの能力があがったからに他ならない。
立体地図も解像度を上げれば上げただけシミュレーション精度はあがる。そもそも流体力学の方程式は解析的に解けないので、コンピュータで近似計算するしかない。
4m幅の道路は10mのメッシュでは潰れてしまうが、1mのメッシュなら描き出すことができる。それをできるだけ広い範囲に拡大できれば尚良い。風雨の話にしろ、地震の影響にせよ、カオスな話(地震は「べき乗則」と最近本で読んだが)なので、遠くで発生した出来事が大きく拡大して波及する(バタフライ効果)。
昔、汎用機を使ってた人なら「水冷式」だったのを覚えているのだけど、今は伝導率の低い「油」で冷やしているのだそうだ。
当然、ファンなんかついてないし、外部記憶装置はSSDだから回転物はないとのこと。
夏に見学会があると聞いたので、これは見に行かなくてはならない。

Amazon Elastic MapReduce : Hadoop2.4環境でDistributed Cacheを使う。

ようやく以前やりたかったことができた。
というのは、「S3バケットにおいたテキストファイルをDistributed Cacheとして使う」こと。

twasinkさんがGitHubに公開してくれているコードのおかげ。Thank you.

Distributed Cacheを使いたいケースはままあって、例えば、1つの変量を基準として処理を分散したい場合がそれに当てはまる。目的変数に対して説明変数を対比させるなど。

Distributed Cacheについては、Amazon Elastic MapReduceにも説明がある(こちら)し、象本にも当然書いてある(こちらは純粋なDistributed Cacheに説明)のだが、S3をプログラム内で設定して使う、というサンプルを見つけられないでいた。

以下に、サンプルプログラムをメモしておく。
サンプルを探している間に、ToolRunnerをつかってJobを実行するサンプルがあったので、(StackOverFlow:Hadoop JobConf class is deprecated , need updated example、inquireさんThank you)同様に書いてみた。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 */
package com.tetsuyaodaka;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;

/**
 * Hadoop2.4.0のMapReduceで、S3にあるファイルをDistribution Cacheとして使う。
 * 
 * 参考:
 * S3バケットにあるテキストファイルをDistribution Cacheとして使う。
 * https://gist.github.com/twasink/8813628
 * 
 * ToolRunnerの実行:
 * http://stackoverflow.com/questions/8603788/hadoop-jobconf-class-is-deprecated-need-updated-example
 * 
 */
@SuppressWarnings("unused")
public class DistCache extends Configured implements Tool{

	/*
	 * Map
	 * Text Fileを読んで、スペースで区切って、(1,値)で書き出す。
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,DoubleWritable>{
    	private final static IntWritable one = new IntWritable(1);
		private int noRecords;

        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tk = new StringTokenizer(line);
            while(tk.hasMoreTokens()){
            	++noRecords;
                double dval = Double.parseDouble(tk.nextToken());
                context.write(one, new DoubleWritable(dval));
            }
        }
        
        @Override
        /*
         * Mapのsetup()メソッドでDistribution Cacheを読み込む。
         * 
         * https://gist.github.com/twasink/8813628がオリジナル。
         * 
         * 読み込んだら、クラスメンバー変数に入れればよい。
         * 
         */
        protected void setup(Context context) throws IOException, InterruptedException {
            if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
                URI cacheFileUri = context.getCacheFiles()[0];
                
                if (cacheFileUri != null) {
                	// Cacheファイルが見つかったら、Stringにしてstdoutに出力
                	System.out.println("Contents Of Cache File: " + FileUtils.readFileToString(new File("./cacheFile")));
                } else {
                    System.out.println("NO CACHE FILE");
                }
            } else {
                System.out.println("NO CACHE FILES AT ALL");
            }
        }
	}

	@Override
	public int run(String[] args) throws Exception {

        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(DistCache.class);

        // Mapperクラスの定義
        job.setMapperClass(Map.class);

        // OutputKeyとOutputValueのクラス定義
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        
        // 入出力フォルダーの指定
        // インプラントフォルダーは引数からとる。
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);

        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //S3バケットにあるテキストファイルをDistribution Cacheとして設定する。
        String s3File = "s3n://banyantreeus/EMR/distibution-cache/data1.txt";
        job.addCacheFile(new URI(s3File + "#cacheFile"));
		
        // 実行
	    job.submit();
        
		return 0;
	}

    public static void main(String[] args) throws Exception {
    	
        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // ToolRunnerのrunメソッドをOverrideして実行させる。
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        ToolRunner.run(new DistCache(), otherArgs);
        
        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}


プログラムでは、処理開始時にS3バケットにあるテキストデータをDistributed Cacheに設定し、Mapのsetup()でそれを取得してプリントする。Mapの処理に特に意味はない。この出力結果は、マネージメント・コンソールのStep、Job、TaskAttemptと手繰って、stdoutで確認できる。

助かった。皆さん、本当にありがとう。

Amazon Elastic MapReduce : Hadoop2.4環境で100万変量(10GB)の算術平均を計算する。

前々回の記事では、1変量の算術平均を計算した。
今回は、以下の形式(多変量データ)の「各列」、つまり「各変量」の平均値を計算する。前々回の記事で述べたように、「超多変量」=「超高次元」がビッグデータの本質の1つである。

せっかくなので、自分の手元のMacBook ProCore i5)のRでは処理が厳しいくらいの大きさのデータを、Amazon Elastic MapReduceで処理し、処理時間とスケール・アウトさせたときの様子を見みようと思う。
後述するが、変量(特徴)の数は1,000,000、サンプルサイズは1,000とおく。多変量解析のテキストでは、サンプル・サイズ>変量の数、となっているのが普通だが、ビッグデータの場合にはこれが逆転するケースが発生する。(例えば、1webページ中の語彙、1人の遺伝情報など)

データ

以下に、データ構造とデータファイル(MapReduceに読ませるテキストファイル)の関係を示す。

データの形式は以下とする。

  • 先頭の数値は「列番号」を表す。
  • 数値の区切りは半角スペースにする。
  • データはテキストファイルにいれる。
  • 1つの変量のデータは必ず1つのファイルに入れる。

各テキストファイルについてMapperを動かして、変量間の処理が交錯しないようにする。そうすれば、処理をスケールアウトさせることができる。

データの作成には、下のjavaプログラムを使用した。

  • 列(変量)の数は1,000,000(m)。
  • 各列について、数値を1,000個(n)用意する。
  • 数値は、10Bytes-Lengthの[0,10]の一様乱数。
  • 10,000変量分を1つのテキストファイルに格納する。

以下にまとめると、

テキストファイルの数 100
各テキストファイルのサイズ 約100MB
1テキストファイル中の変量数 10,000
総変量数 1,000,000
サンプルサイズ 1,000
総データ数 1,000,000,000
総ファイルサイズ 10GB

となる。これ位のサイズになると、自分のMacBook Pro(Core i5,2.5GHz,Mem:8GB)に載せたRでは厳しい。

package com.tetsuyaodaka;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;

/**
 * Create Sample Date for Hadoop Calculation.
 * 
 * Create using uniform distribution [0.10] 
 * 
 * args: 0: directory path of the output file (filename must be included)
 * 
 * @author tetsuya.odaka@gmail.com
 *
 */
public class CreateThreadTestData {

	public static void main(String[] args) {
		System.out.println("start");
		
		// 変量数
		int numV = 1000000;
		// サンプルサイズ
		int numS=1000;
		// 1ファイル中のレコード数
		int numR=10000;
		
		// ファイル数
		int numF=1;
		if(numV%numR == 0){
			numF = numV/numR;
		}else{
			numF = numV/numR+1;
		}
		
		long seq = 0;
		for(int j=1;j<numF+1;j++){
			String fileName = "data"+j+".txt";
			String filePath = args[0]+"/"+fileName;
		
			try {
				PrintWriter outFile 
				= new PrintWriter(
						new BufferedWriter(
								new OutputStreamWriter(
										new FileOutputStream(filePath),"UTF-8")));
				
				// レコード数
				for(int k=0;k<numR;k++){
					// Seq. No.
					outFile.print(++seq+" ");
					// サンプルサイズ
					for(int i=0;i<numS;i++){
						double rnd = Math.random()*10;
						BigDecimal bd = new BigDecimal(rnd);
						BigDecimal r = bd.setScale(8, BigDecimal.ROUND_HALF_UP); 
						if(i == (numR-1)){
							outFile.print(r);
						}else{
							outFile.print(r+" ");
						}
					}
					outFile.print(System.getProperty("line.separator"));
				}
				outFile.close();
				
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}
		}
		System.out.println("end");
		
	}

}

サンプルサイズが1,000と小さいのは、先のブログに述べたように、ビッグデータの本質的な難しさはサンプルサイズの大きさではなく、次元の高さによるからである。

各テキストファイルの大きさは、Amazon EMRの標準ブロックサイズ(134MB)に入る程度になっている。

変量ごとの平均を算出するMapReduceプログラム

以下にMapReduceプログラムを示す。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
    */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Hadoop2.4.0のMapReduceで算術平均を計算する。(多変量の場合)
 * 
 * author : tetsuya.odaka@gmail.com
 * 
 */
@SuppressWarnings("unused")
public class MultiMeanCalc {

	/*
	 * Map
	 * 列番号 数値 数値と並んでいるText Fileを読んで、(列番号,値)のキー・バリューを書き出す。
	 * 
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,DoubleWritable>{

		private String mapTaskId;
		private String inputFile;
        
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }
        
        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
        		String str = value.toString();
        		int index = str.indexOf(" ");
        		
        		String keyStr = str.substring(0, index);
        		String valArr[] = str.substring(index+1).split(" ");
        		
        		IntWritable outKey = new IntWritable(Integer.parseInt(keyStr));
        		
        		for(String valStr : valArr){
        			double dval = Double.parseDouble(valStr);
                    context.write(outKey, new DoubleWritable(dval));
        		}
        }
    }

	/*
	 * Reduce
	 * (列番号,値)を集約後、算術平均を計算する
	 */
    public static class Reduce extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable>{
		private String 	reduceTaskId;
        
        public void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) 
        		throws IOException, InterruptedException{
        	long 	noValues = 0;
            double 	sum = 0;
            for(DoubleWritable value: values){
            	++noValues;
                sum += value.get();
            }

            if(noValues > 0) sum = sum/noValues;
            
            context.write(key, new DoubleWritable(sum));
        }
    }
    

	public static void main(String[] args) throws Exception {
    	
        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(MeanCalc.class);
        
        // Mapperクラス, Reducerクラスの定義
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // OutputKeyとOutputValueのクラス定義
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        
        // 入出力フォルダーの指定
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);
        
        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        // MapReduceジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }

        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}

実行:11台構成

まず、Amazon Elastic MapReduceクラスタを以下のサイズとした。

ノード サイズ 台数
マスタ m1.middle 1
コア m1.middle 5
タスク m1.middle 5

ログをS3に採るようにクラスタを起動し、実行はRuby Client。

elastic-mapreduce --jar s3n://[your bucket]/[your folder]/hadoopsample.jar --main-class com.tetsuyaodaka.MultiMeanCalc --args s3n://[your bucket]/[your input folder],s3n://[your bucket]/[your input folder] --step-name MiltiMeanCalculation --jobflow j-XXXXXXXXX

タイプスタンプはS3バケットのstdoutに吐き出される。

Program Start: 2014/06/12 9:18:28
Output folder is s3n://XXXXXXXXXXX/EMR/output/20140612/10344
Program END: 2014/06/12 9:38:32

実行時間は、20min.04sec.(1,204sec.)と決して速くはない。

以下は、マネージメントコンソールからタスク(Task_attempt)の一覧を表示したところ。
起動したタスクは、Mapが200、Reduceタスクが39。エラーの発生はなかった。

結果を保存したS3バケットスクリーンショットを以下に示す。39のreduceから均等に結果が吐き出されている。

出力は以下のように、キー(変量のSeq)+タブ+平均値+改行となっている。

39	4.94539519773
78	4.911648371779998
117	5.014055572980005
156	4.970061421849994
195	4.937069895429997
234	5.206232056259996
273	4.994578578919996
312	4.897684452960004
351	5.095053427910006
390	4.970409691689994
429	4.889084266230005
468	4.95468362101
507	5.107228051750001
546	4.844947219119992
...

スケールアウトさせる。

では、クラスタサイズを大きくしたときに、計算がスケールするか確認する。

クラスタ0(上の構成)
マスタノード数 1
コアノード数 5
タスクノード数 5
総ノード数 11
Mapタスク数 200
Reduceタスク数 39
実行時間 20min.04sec.(1,204sec.)
Map起動までのレイテンシー 38sec
Map処理時間 14min.23sec
Reduce起動までのレイテンシー 2min.9sec

(注記)

  • Map起動までのレイテンシー:プログラム開始から最初のMapタスクが起動されるまでの時間
  • Reduce起動までのレイテンシー:プログラム開始から最初のReduceタスクが起動されるまでの時間
  • Map処理時間:最初のMapタスクが起動してから、最後のタスクが終了するまでの時間。
クラスタ
マスタノード数 1
コアノード数 10
タスクノード数 10
総ノード数 21
Mapタスク数 200
Reduceタスク数 39
実行時間 10min.02sec.(602sec.)
Map起動までのレイテンシー 35sec
Map処理時間 7min.59sec
Reduce起動までのレイテンシー 1min.56sec
クラスタ
マスタノード数 1
コアノード数 20
タスクノード数 20
総ノード数 41
Mapタスク数 200
Reduceタスク数 39
実行時間 6min.23sec.(382sec.)
Map起動までのレイテンシー 40sec
Map処理時間 4min.5sec
Reduce起動までのレイテンシー 2min.5sec
散布図

横軸に総ノード数、縦軸に実行時間をとったグラフを示す。

クラスタ0(11台構成)からクラスタ1(21台構成)へは、面白いように実行時間が半分となった。この結果を外挿すれば、クラスタ2(43台構成)では300sec.程度となるが、382sec.と1分ほど大きい数字となっている。

クラスタサイズを大きくしたことによる、Map起動までのレイテンシーを見ると、初期処理のレイテンシー増加(S3からHDFSへの配置)は見られず、Reduceタスク起動までのレイテンシーも変わらない(Reduceは非常に速く起動していることが分かる)。
クラスター1から2では、Mapの処理時間はスケールしているから、Reduceがボトルネックになっている可能性がある。(Reduceの数を増やせば、スケールするかな)

Amazon Elastic MapReduce : MapとReduceのスレッドセーフ実験。

前回の記事で、Hadoop2にしたら随分たくさんMapとReduceが動いたので、スレッドじゃないよね、と思ってしまった。特にMapperは起動の仕方でプログラムがデタラメになってしまうので、ちょっと調べてみた。

Hadoop2.4のAPIをみると、Mapperをつかってる限りでは、Runnableを引き継いでくることはないようで、Javadocのdescriptionの下の方に、

Applications may write a custom MapRunnable to exert greater control on map processing e.g. multi-threaded Mappers etc.

と書いてあって、MapRunnableを使うとスレッドで処理できるよ、となっている。
ちなみに、MapRunnableのJavadocをみると、runメソッドがあって、これをoverrideすることで、Mapの動きを変えられるようになっているようだ。Yahooデベロッパー・ネットワークの「Hadoopを使いこなす」にも同様の記事が乗っている。こちらのインターフェイルもrunnableなわけでなく、runnableにできるよ、ということと理解した。MapperもReducerもstaticだし。

とはいうものの、やってみたら違ってたではいけないので、簡単な実験をしてみた。

実験には、

  • キー:1から100までの数値
  • バリュー:小数点を入れて10桁の数値×10,000。区切り文字は半角スペース。
  • Mapへの入力方式:各キーにつき1つのテキストファイルを作成して、Mapに入力する。

というデータをつかった。
たとえば、キーが6のときは以下のような格好になって、data6.txtに保管する。

6 2.98677076 1.22031748 7.13013497 4.67325002 9.55793889 9.26554861 ......

バイト数は、(10+1)*10,000+2=110,002なので、おおよそ110Kになる。

備忘のため、以下にこれを生成したjavaアプリケーションを以下に示しておく。

package com.tetsuyaodaka;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;

/**
 * Create Sample Date for Hadoop Calculation.
 * 
 * Create using uniform distribution [0.10] 
 * 
 * args: 0: directory path of the output file (filename must be included)
 * 
 * @author tetsuya.odaka@gmail.com
 *
 */
public class CreateThreadTestData {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		System.out.println("start");
		
		int numF = 100;
		
		for(int j=1;j<numF+1;j++){
			String fileName = "data"+j+".txt";
			String filePath = args[0]+"/"+fileName;
		
			try {
				PrintWriter outFile 
				= new PrintWriter(
						new BufferedWriter(
								new OutputStreamWriter(
										new FileOutputStream(filePath),"UTF-8")));
		
				outFile.print(j+" ");

				int sizeC=10000;
				for(int i=0;i<sizeC;i++){
					double rnd = Math.random()*10;
					BigDecimal bd = new BigDecimal(rnd);
					BigDecimal r = bd.setScale(8, BigDecimal.ROUND_HALF_UP); 
					if(i==(sizeC-1)){
						outFile.print(r);
					}else{
						outFile.print(r+" ");
					}
				}
				outFile.print(System.getProperty("line.separator"));
				outFile.close();
			} catch (UnsupportedEncodingException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (FileNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		System.out.println("end");
		
	}

}


以下が実験につかったMapReduceプログラム。
やっていることは単純で、上のテキスト・ファイル(110K)を読んで、キーを分離してReduceに渡し、そのまま出力する。
ブロックサイズ(Amazon EMRだと134217728=134M)、m1.middleのメモリーサイズ(Mem: 3,843,032k total)と比較してデータが小さいので、一気にいけるはず。
処理されるデータを確認する(データが切れたりしないことを確認する)ために、Map.map()、Reduce.reduce()、main()内で内容を出力する。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Hadoop2.4.0のMapReduceの挙動を確認する。
 * 
 * author: tetsuya.odaka@gmail.com
 * 
 */
@SuppressWarnings("unused")
public class MultiThread {

	/*
	 * Map
	 * 列番号 数値 数値と並んでいるText Fileを読んで、(列番号,値 値)のキー・バリューを書き出す。
	 * map()の中でTaskIdと、ファイルの内容等を書き出す。
	 * 
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,Text>{

        private String mapTaskId;
        private String inputFile;
        
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }

        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{

        	String str = value.toString();
    		int index = str.indexOf(" ");
        	System.out.println("mapTaskId:"+mapTaskId);
       		System.out.println("inputFile:"+inputFile);
       		System.out.println("index:"+index);
        	System.out.println("str:"+str);
        		
        	// 1つ目の数字以降がデータ
        	context.write(new IntWritable(Integer.parseInt(str.substring(0, index))), new Text(str.substring(index+1)));
        }
    }

	/*
	 * Reduce
	 * キーとバリューをそのまま出力する。
	 * reduce()の中でTaskIdやkey,value等を書き出す。
	 */
    public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{
    	private String 	reduceTaskId;
        private int 	noKeys = 0;
        
        public void setup(Context context) {
        	reduceTaskId = MRJobConfig.TASK_ATTEMPT_ID;
          }
        
        public void reduce(IntWritable key, Iterable<Text> values, Context context) 
        		throws IOException, InterruptedException{
        	++noKeys;
        	int 	noValues = 0;
        	for(Text value: values){
            	++noValues;
            	String str=value.toString();
        		System.out.println("reduceTaskId:"+reduceTaskId);
        		System.out.println("noKeys:"+noKeys);
        		System.out.println("key-reduce:"+key);
        		System.out.println("value-reduce:"+str);
            	context.write(new IntWritable(noValues), value);
            }
        }
    }

    public static void main(String[] args) throws Exception {
    	
        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(MultiThread.class);
        
        // Mapperクラス, Reducerクラスの定義
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // MapperのOutputKeyとOutputValueのクラス定義
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        // 入出力フォルダーの指定
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);
        
        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        // MapReduceジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }

        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}

結果

main関数内に書いたprintは、クラスタ作成時に指定した、s3バケット内(stdout)にでる。

これを見ると、処理にかかった時間は、21min10sec。

Program Start: 2014/06/12 09:31:04
Output folder is s3n://XXXXXXXXX/EMR/output/20140612/35199
Program END: 2014/06/12 09:52:24


次にMapReduceの状況をみるには、マネージメント・コンソールでステップを選択して、タスクの一覧を表示する。

ここで、View attemptをクリックしてstdoutを開けば、Map.map()とReduce.reduce()内にかいたprintの結果をみることができる。

mapタスクの数 100
reduceタスクの数 7

となっていて、各mapタスクで1つのテキストファイルを読み込んでいる(10000個の数字も問題なく読んでいる)ことが確認できた。
Hadoop1のとき、Mapのスロットが開かない!!と感じることがあったので、Hadoop2ではこの辺りが改善されている(みたい)。
象本には、小さいファイルを使う事で負荷が分散できる(データロードにかかる時間の、全体の処理時間に対する割合が小さくなる)と書いてある。逆に、小さすぎても良くない、とも書いてあるけど、Mapがこれだけ上がってくると、ロードの効率化が図れると思った。

reduceタスクは、(曖昧な記憶によれば)キーのハッシュ値から除算をしてReducerへの割当を決定している。
Reduceタスク6(r_000006)に割り振られたキーは、6-97の14つ(Reduceが7つなので、7の倍数になっている)。
以下のような配分で100個のキーが綺麗に分かれた。このうち、r_000005では2回のattemptが実行されたが(早く終わったのかな)、2回目のattemptはなにもせずにkillされた。

タスクID 処理したキーの数
r_000006 14
r_000005 14
r_000004 14
r_000003 14
r_000002 15
r_000001 15
r_000000 14

Amazon Elastic MapReduce : Hadoop2.4環境で標本平均を計算する(Ruby Client)。

ワードカウントに続き、今回も簡単なHadoopMapreduceのサンプル・プログラムを作ってみる。

昨年の記事を参考にして、Amazon Elastic MapReduceのHadoop2.4環境で算術平均を算出する。

データの形式:ビッグデータを視野に入れて。

ビッグデータを視野に入れつつ、データの形式は下表のように仮定しておく。ここで、行はサンプル、列は変量を意味することとする。

今年5月の応用統計学会にて、東大工学部の山西教授による「潜在空間からのディープナレッジの発見」というセッションを聞いた。
その中で、ビッグデータの本質を

とおっしゃっていた。書籍などでは言われる「5V」より、山中先生のこの定義の方が明確だと思う。例えば、Volumeといった場合の本質的な問題は「サンプルサイズではなく、次元の大きさ」というのが、統計学者、機械学習学者(計算工学者)らの共通認識と思う。データの性質を損なわずに、サンプルサイズを下げることを「サンプリング」と呼んで、コンピュータがなかった時代には統計学の重要な領域だった(今でも大切です)のだから、サンプルサイズは「大きな問題」ではない(山中先生も同様のことをおっしゃっていた)。

これに対して、次元が高次になると、

  • 機械学習系でも、超高次元の「特徴」(統計学では「変量」という)空間上では、識別(超)平面が決定できない、といった問題が発生してしまう。「次元の呪い」。
  • 統計学では、(機械学習との大きな違いとして)「仮定するデータが密である」という点が、計算コストの面で大きな問題になる。

後者について補足すると、「疎なデータは、密なデータの1つのケース(多くは、簡単なケース)」として捉えられていることに起因するだろう。解析的な問題は、一般的な問題に取り組む方が効用が大きい。
しかし、相関係数のような(中学生でも計算できる)統計量が「次元が増えた途端に計算できないものになる(計算コストが幾何級数的に増加する。具体的には、データがロードできない、メモリーがリークするといったことになる)」。こういった問題をHadoop MapReduceで解決できるのだろうか、ということが、昨年、このブログにアップした「行列積の計算シリーズ」、「相関係数の計算シリーズ」の動機だった。

前置きが長くなったが、上表のような多変量データを想定した場合、ビッグデータに固有な問題は「行の大きさではなく列の大きさ」である。


まずは、算術平均をMapReduceで計算するプログラムをHadoop2.4で作成してみる。

1変量の場合のプログラム

1変量の場合は、ビッグデータの定義に当てはならないのだが、MapReduceという分散処理フレームワークを評価する上では、面白いサンプルだと思う。
以下に1変量の場合に算術平均を計算するプログラムを示す。同様のコードは、書籍やWebによく掲載されている。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * Hadoop2.4.0のMapReduceで算術平均を計算する。
 * 
 * author: tetsuya.odaka@gmail.com
 * 
 */
@SuppressWarnings("unused")
public class MeanCalc {

	/*
	 * Map
	 * Text Fileを読んで、スペースで区切って、(1,値)で書き出す。
	 * 
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,DoubleWritable>{
    	private final static IntWritable one = new IntWritable(1);

        private String mapTaskId;
        private String inputFile;
        private int noRecords = 0;

        /*
         * Mapperの初期化
         * 
         */
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }

        /*
         * Mapperの本体
         * 
         */
        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tk = new StringTokenizer(line);
            while(tk.hasMoreTokens()){
            	++noRecords;
                double dval = Double.parseDouble(tk.nextToken());
                context.write(one, new DoubleWritable(dval));
            }
        }
    }

	/*
	 * Reduce
	 * (1,値)を集約後、算術平均を計算する
	 */
    public static class Reduce extends Reducer<IntWritable, DoubleWritable, LongWritable, DoubleWritable>{
    	private String 	reduceTaskId;
        private int 	noKeys = 0;
        
        /*
         * Reducerの初期化
         * 
         */
        public void setup(Context context) {
        	reduceTaskId = MRJobConfig.TASK_ATTEMPT_ID;
          }
        
        /*
         * Reducerの本体
         * 
         */
        public void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) 
        		throws IOException, InterruptedException{
        	long 	noValues = 0;
            double 	sum = 0;
            for(DoubleWritable value: values){
            	++noValues;
                sum += value.get();
            }

            if(noValues != 0) sum = sum/noValues;
            
            context.write(new LongWritable(noValues), new DoubleWritable(sum));
            ++noKeys;
        }
    }

    public static void main(String[] args) throws Exception {

        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(MeanCalc.class);
        
        // Mapperクラス, Reducerクラスの定義
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // MapperのOutputKeyとOutputValueのクラス定義
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        // 入出力フォルダーの指定
        // インプラントフォルダーは引数からとる。
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);

        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        // MapReduceジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }
        
        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));

    }
}

このプログラムの面白い点は、MapからReduceへのデータの引き継ぎに「1」という「たった1つのキー」を使用していること。これは「1つのReducerへデータが殺到する」ことを意味している(なので、このままではあんまり筋がいいプログラムとは言えないだろう)。

さて、「これを回避できるのか」と考えると、

  • Combinerが使えないか?
  • 分散して部分和を求めて、それを統合する。(MapReduceを繋ぐ)

とかが頭をよぎる。前者だと、Combinerで平均をとって、Reducerで平均の平均をとる、といったロジックになるだろう(処理されるかどうか保証されないことを考慮に入れて)。とすると、両者は同じロジックにいきつく。

上に述べたように(全部のデータを使わずに)サンプリングをして大枠のイメージをつかむということも考えられるだろう。サンプルが同一の分布から発生していると仮定すれば、(実務上は)全部を使うこともない。こんな風に、このサンプルだけ見ても、とても面白い題材になっている。

備忘のため、使い方を以下に書いておく。基本はワードカウントのときと同じで、Javaアプリケーションとして起動する。
入力データは「スペース区切りの数値」とし、入力用フォルダーに入れておく。

以下は、実行時のコンフィグ定義。

プログラムへの引数として、入力用テキストファイルの保管フォルダー、出力用テキストの保管フォルダーを指定する。MapReduceの出力先として既存のフォルダーを指定するとエラーになるので、指定したフォルダーしたにランダムにフォルダーを追加する仕様とした。

以下は、引数(Proguramu Arguments)。

/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/mean/input /Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/mean/output

以下は、Dパラ(VM Arguments)で、ワードカウントのときと同じに設定する。

-Xmx1024m -Djava.security.krb5.realm=OX.AC.UK -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk -Dlog4j.configuration="file:/Users/tetsuya/hadoop/etc/hadoop/log4j.properties" -DHadoop.home.dir=/Users/tetsuya/hadoop -Dhadoop.id.str=host -Dhadoop.root.logger=INFO,console -Dhadoop.policy.file=/Users/tetsuya/hadoop/etc/hadoop/hadoop-policy.xml

1変量のプログラムAmazon Elastic MapReduceで処理する。

データ

Rを使って、[0,10]の一様乱数を100,000個作った。
区切り(スペース)と改行(LF)を含んで、1データ当たり9Bytesなので、100,000個でおおよそ900Kとなる。このサイズであれば、まだまだブロック・サイズに余裕がある。
hadoopの定義を見ると(先日の記事に掲載)。hdfs-site.xmlのdfs.block.sizeが134,217,728Bytesに設定されている。なので、このサイズであれば、150ファイル近く(134,217/900=149.13)が1ブロックに収まる勘定。

2.352948 3.432219 4.308323 6.655889 9.904773
2.259036 7.563224 1.308103 8.79912 1.004995
3.355776 4.82223 9.765647 1.299829 7.307348
.....

これを、s3バケットにアップロードする。

実行と結果

プログラムを前回同様にjarアーカイブとし書き出し、s3バケットにアップロードする。

クラスタは以下の構成とした。

ノード サイズ 台数
マスタ m1.middle 1
コア m1.middle 1
タスク m1.middle 1

実行は、Ruby Clientで行う。

elastic-mapreduce --jar s3n://[your bucket]/[your folder]/hadoopsample.jar --main-class com.tetsuyaodaka.MeanCalc --args s3n://[your bucket]/[your input folder],s3n://[your bucket]/[your input folder] --step-name MeanCalculation --jobflow j-XXXXXXXXX

実行時間は、3分弱と(この程度の計算なので)決して速くはない(というか、遅い)。

以下は、マネージメントコンソールからタスク(Task_attempt)の一覧を表示したところ。Mapが8,、Reduceが7あり、Hadoop1のときよりもタスクが多い印象。ブロックサイズから見ると、投機的に実行された(attemptとあるのが気になる)にせよ、多い気がする。

以下が出力フォルダー。Reduceが7つ動いたためその数分の出力ファイルがあるが、中身があるのは1つのみで、「実質的に処理を行ったReduceは1つ」と分かる(上に述べた通り)。


以下がログのフォルダー。

stdoutに(javaプログラムでprint指定した)以下の出力がでる。

Program Start: 2014/06/11 14:21:53
Output folder is s3n://banyantreeus/EMR/output/20140611/88231
Program END: 2014/06/11 14:24:40


多変量の場合に進もうと思ったが、長くなったので一旦切ります。

2014/6/19:つづきの記事を書きました。
Amazon Elastic MapReduce : Hadoop2.4環境で100万変量(10GB)の算術平均を計算する。

Amazon Elastic MapReduce : Hadoop2.4環境でワードカウントのプログラムを実行する(Ruby Client & SSH)。

前回の記事では、前々回に作成したワードカウントのサンプルをAWSマネージメントコンソールから実行した。
GUIから実行するのは簡単なのだが、回数が多くなってくると面倒くさい。なので、Elactic MapReduce Client Rubyでの実行方法と、SSHでログインして(Hadoopを生で使う)実行方法を以下にメモしておく。

Elastic MapReduce Ruby Clientで、ワードカウントを実行する。

Elastic MapReduce Ruby Clientの使い方については、
先日の記事にかいた。

  • プログラム(jarファイル:hadoopsample.jar)はS3に保管。
  • 入力となるテキストファイルもS3に保管。
  • 出力もS3に書き出し

という、前回の記事と同じ条件で実行する。

この場合、クラスタが起動後、以下のコマンド1つでステップの追加と実行ができる。

elastic-mapreduce --jar s3n://[your bucket]/[your folder]/hadoopsample.jar --main-class com.tetsuyaodaka.WordCount --args s3n://[your bucket]/[your input folder],s3n://[your bucket]/[your input folder] --step-name WordCount --jobflow j-XXXXXXXXX

j-XXXXXXXXXは、Job Flow ID。
引数は、(ターミナルからの実行と違い)カンマ区切りとなす。
mainクラスも指定でき、(ターミナルから指定した場合と違って)実行中のGUIにMain Classが正確に表示される。

SSHでマスタノードにログインして、ワードカウントを実行する。

クラスタ起動後に、sftpでマスタノードにプログラムと入力テキストを送り、SSHでログインして実行する。
手続きとしては、

  1. sftpでhadoopsample.jarとinput.txt(入力テキスト)を送る(以前の記事)。
  2. SSHでログインする。
  3. HDFS上に入力用(出力用)ディレクトリを作成する。
  4. HDFSの入力用フォルダーにinput.txtをコピーする。
  5. hadoopsample.jar内のWordCountを実行する。
  6. 結果をHDFSからOS上に取り出す。

という流れになる。

HDFS上に入出力用のディレクトリを作成する。

sftpで、マスタノードに、プログラム(hadoopsample.jar)と入力ファイル(input.txt)を送ったら、SSHでログインする。
以下のコマンドで、HDFS上に入力用(出力用)フォルダーを作成する。hadoop fsコマンドではなくて、hdfsコマンド。

[hadoop@ip-10-91-156-75 ~]$ hdfs dfs -mkdir -p /input
[hadoop@ip-10-91-156-75 ~]$ hdfs dfs -mkdir -p /output

Hadoop2.4のAMIを使った場合、Web GUIHDFSブラウジングできる。

HDFSの入力用フォルダーにinput.txtをコピーする。

以下のコマンドで、HDFS上の/input/に入力テキストをコピーする。

[hadoop@ip-10-91-156-75 ~]$ hdfs dfs -put input.txt /input/

以下がinputフォルダーをブラウジングしたところ。

hadoopsample.jar内のWordCountを実行する。

ここまで準備ができたら、以下のコマンドでワードカウントを実行する。
(/output/の下に2014/0610フォルダーを作って、出力を書き込む)

hadoop jar hadoopsample.jar com.tetsuyaodaka.WordCount /input /output/2014/0610

以下のログが画面に流れる(必要に応じてリダイレクトする。)。

14/06/10 05:59:17 INFO client.RMProxy: Connecting to ResourceManager at /10.91.156.75:9022
14/06/10 05:59:18 INFO client.RMProxy: Connecting to ResourceManager at /10.91.156.75:9022
14/06/10 05:59:19 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/10 05:59:19 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
14/06/10 05:59:19 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 77cfa96225d62546008ca339b7c2076a3da91578]
14/06/10 05:59:19 INFO mapred.FileInputFormat: Total input paths to process : 1
14/06/10 05:59:20 INFO mapreduce.JobSubmitter: number of splits:2
14/06/10 05:59:20 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1402369340436_0003
14/06/10 05:59:20 INFO impl.YarnClientImpl: Submitted application application_1402369340436_0003
14/06/10 05:59:20 INFO mapreduce.Job: The url to track the job: http://10.91.156.75:9046/proxy/application_1402369340436_0003/
14/06/10 05:59:20 INFO mapreduce.Job: Running job: job_1402369340436_0003
14/06/10 05:59:34 INFO mapreduce.Job: Job job_1402369340436_0003 running in uber mode : false
14/06/10 05:59:34 INFO mapreduce.Job:  map 0% reduce 0%
14/06/10 05:59:49 INFO mapreduce.Job:  map 50% reduce 0%
14/06/10 06:00:00 INFO mapreduce.Job:  map 100% reduce 0%
14/06/10 06:00:17 INFO mapreduce.Job:  map 100% reduce 100%
14/06/10 06:00:18 INFO mapreduce.Job: Job job_1402369340436_0003 completed successfully
14/06/10 06:00:19 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=1459
		FILE: Number of bytes written=296895
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=2142
		HDFS: Number of bytes written=1329
		HDFS: Number of read operations=9
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=2
		Launched reduce tasks=1
		Data-local map tasks=2
		Total time spent by all maps in occupied slots (ms)=66351
		Total time spent by all reduces in occupied slots (ms)=65636
		Total time spent by all map tasks (ms)=22117
		Total time spent by all reduce tasks (ms)=16409
		Total vcore-seconds taken by all map tasks=22117
		Total vcore-seconds taken by all reduce tasks=32818
		Total megabyte-seconds taken by all map tasks=16985856
		Total megabyte-seconds taken by all reduce tasks=16802816
	Map-Reduce Framework
		Map input records=15
		Map output records=214
		Map output bytes=2155
		Map output materialized bytes=1644
		Input split bytes=184
		Combine input records=214
		Combine output records=165
		Reduce input groups=150
		Reduce shuffle bytes=1644
		Reduce input records=165
		Reduce output records=150
		Spilled Records=330
		Shuffled Maps =2
		Failed Shuffles=0
		Merged Map outputs=2
		GC time elapsed (ms)=427
		CPU time spent (ms)=3310
		Physical memory (bytes) snapshot=783855616
		Virtual memory (bytes) snapshot=3907485696
		Total committed heap usage (bytes)=598089728
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1958
	File Output Format Counters 
		Bytes Written=1329
結果をHDFSからOS上に取り出す。

以下は、/output下をブラウズしたところ。

HDFSからOS上にファイルを取り出すには、以下のコマンドを実行すればいい。

hdfs dfs -get /output/2014/

上の画面を実行後、2014/06へ移動してlsを実行したところ。

[hadoop@ip-10-91-156-75 0610]$ ls
_SUCCESS  part-00000

Amazon Elastic MapReduce : Hadoop2.4でワードカウントのプログラムを実行する。

前回の記事で作成したワードカウントをAmzon Elastic MapReduceで実行してみる。

まず、プログラムと入出力ファイルの置き場所、プログラムの実行方法を決めておく。
ここまでの検証で、プログラム(jarファイル)の置き場所は、

  • S3
  • マスタノード

の2パターン、入出力ファイルの置き場所は、

の2パターン。
実行方法は「色々あるのかな」と思うが、

  • マネージメントコンソールから、Custom Jarのステップを登録する。
  • Ruby Clientからステップを登録する。
  • マスタノードにログインして実行する。

といったところを押さえておけばいいだろう。

以下では、

  • AWSマネージメント・コンソールでクラスタを起動する(やり方は以前の記事
  • プログラムはS3に置く。
  • 入出力もS3に置く。
  • AWSマネージメント・コンソールからCustom Jarステップとしてステップを登録して実行する。

の手順を示す。


開発は引き続き、MacBook Pro(OSX 10.9.3)+Java6+Eclipseで行う。(NetBeansがいいよ、という声も聞こえるのだけど、とりいそぎ慣れたツールを選択)

Jarファイルの作成

Eclipseプロジェクトを選択後、右クリックしてエクスポート>Java>jar fileを選ぶ。

必要なプログラムだけを選択して、jarファイルを書き出す。
このとき、manifestを作ってmainクラスを指定しておけば、EMRに登録した際に、間違いなく目的のクラスが実行されるのだが、実行するするプログラムを変えるたびに、いちいちmanifestを書き換えるのは面倒くさい。ステップを登録する際にmainクラスを指定できるので、manifestはデフォルトのままにする。

jarファイルをS3に配備する。

S3の適当なバケット/フォルダーに書き出したjar(この例だと、hadoopsample.jar)を置く。

サンプルプログラムなので、フォルダーの冗長性を下げる(価格が少しお安いため)。

入力ファイルをS3に配備する。

S3の適当なバケット/フォルダーに先ほどの例で使用したテキストファイルを配備する。ここでも低冗長化ストレージを選択する。

クラスタにステップを登録し、プログラムを実行する。

マネージメントコンソールでクラスタの詳細を表示して、Add Stepsよりステップを登録する。
AWSの公式マニュアルはこちら(日本語のページは古いGUIでの説明になっているが、英語ページは最新)。

step type Custom JAR
name Word Count(任意のステップ名)
JAR S3 Location s3n://[your bucket]/[your folder]/hadoopsample.jar
Arguments [FQCN of main class] s3n://[your bucket]/[your input folder]/ s3n://[your bucket]/[your output folder]/

とする。

上に書いたように、Argumentsの最初でmainクラスを指定できる。そのあとは、arg[0]から順に引数が続くが、公式マニュアルに書いてあるように、区切りはスペースにする。


以上で登録が終わると、ステップが実行される。

結果の確認

S3のフォルダーを参照して結果を確認する。

実行ログ:S3フォルダー内

実行ログの書き出し先は、クラスタ起動時に指定する。なぜか、stderrに出力されている。なにか悪かったのかなぁ。

実行ログ:マネージメントコンソール

以下では、Map、Reduceジョブごとのログの見方を示しておく。

Stepsのところで、view all jobsをクリックすると、ジョブの一覧が表示される。

これを辿って行くと、Map、Reduceジョブごとのログが参照できる。今回のジョブでは、Mapジョブが2つ、Reduceジョブが1つ実行された。

つづきは、
Amazon Elastic MapReduce : Hadoop2.4環境でワードカウントのプログラムを実行する(Ruby Client & SSH)。