Amazon Elastic MapReduce : Hadoop2.4環境で100万変量(10GB)の算術平均を計算する。
前々回の記事では、1変量の算術平均を計算した。
今回は、以下の形式(多変量データ)の「各列」、つまり「各変量」の平均値を計算する。前々回の記事で述べたように、「超多変量」=「超高次元」がビッグデータの本質の1つである。
せっかくなので、自分の手元のMacBook Pro(Core i5)のRでは処理が厳しいくらいの大きさのデータを、Amazon Elastic MapReduceで処理し、処理時間とスケール・アウトさせたときの様子を見みようと思う。
後述するが、変量(特徴)の数は1,000,000、サンプルサイズは1,000とおく。多変量解析のテキストでは、サンプル・サイズ>変量の数、となっているのが普通だが、ビッグデータの場合にはこれが逆転するケースが発生する。(例えば、1webページ中の語彙、1人の遺伝情報など)
データ
以下に、データ構造とデータファイル(MapReduceに読ませるテキストファイル)の関係を示す。
データの形式は以下とする。
- 先頭の数値は「列番号」を表す。
- 数値の区切りは半角スペースにする。
- データはテキストファイルにいれる。
- 1つの変量のデータは必ず1つのファイルに入れる。
各テキストファイルについてMapperを動かして、変量間の処理が交錯しないようにする。そうすれば、処理をスケールアウトさせることができる。
データの作成には、下のjavaプログラムを使用した。
- 列(変量)の数は1,000,000(m)。
- 各列について、数値を1,000個(n)用意する。
- 数値は、10Bytes-Lengthの[0,10]の一様乱数。
- 10,000変量分を1つのテキストファイルに格納する。
以下にまとめると、
テキストファイルの数 | 100 |
---|---|
各テキストファイルのサイズ | 約100MB |
1テキストファイル中の変量数 | 10,000 |
総変量数 | 1,000,000 |
サンプルサイズ | 1,000 |
総データ数 | 1,000,000,000 |
総ファイルサイズ | 10GB |
となる。これ位のサイズになると、自分のMacBook Pro(Core i5,2.5GHz,Mem:8GB)に載せたRでは厳しい。
package com.tetsuyaodaka; import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; /** * Create Sample Date for Hadoop Calculation. * * Create using uniform distribution [0.10] * * args: 0: directory path of the output file (filename must be included) * * @author tetsuya.odaka@gmail.com * */ public class CreateThreadTestData { public static void main(String[] args) { System.out.println("start"); // 変量数 int numV = 1000000; // サンプルサイズ int numS=1000; // 1ファイル中のレコード数 int numR=10000; // ファイル数 int numF=1; if(numV%numR == 0){ numF = numV/numR; }else{ numF = numV/numR+1; } long seq = 0; for(int j=1;j<numF+1;j++){ String fileName = "data"+j+".txt"; String filePath = args[0]+"/"+fileName; try { PrintWriter outFile = new PrintWriter( new BufferedWriter( new OutputStreamWriter( new FileOutputStream(filePath),"UTF-8"))); // レコード数 for(int k=0;k<numR;k++){ // Seq. No. outFile.print(++seq+" "); // サンプルサイズ for(int i=0;i<numS;i++){ double rnd = Math.random()*10; BigDecimal bd = new BigDecimal(rnd); BigDecimal r = bd.setScale(8, BigDecimal.ROUND_HALF_UP); if(i == (numR-1)){ outFile.print(r); }else{ outFile.print(r+" "); } } outFile.print(System.getProperty("line.separator")); } outFile.close(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } } System.out.println("end"); } }
サンプルサイズが1,000と小さいのは、先のブログに述べたように、ビッグデータの本質的な難しさはサンプルサイズの大きさではなく、次元の高さによるからである。
各テキストファイルの大きさは、Amazon EMRの標準ブロックサイズ(134MB)に入る程度になっている。
変量ごとの平均を算出するMapReduceプログラム
以下にMapReduceプログラムを示す。
/* Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package com.tetsuyaodaka; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Hadoop2.4.0のMapReduceで算術平均を計算する。(多変量の場合) * * author : tetsuya.odaka@gmail.com * */ @SuppressWarnings("unused") public class MultiMeanCalc { /* * Map * 列番号 数値 数値と並んでいるText Fileを読んで、(列番号,値)のキー・バリューを書き出す。 * * (注意) Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、 * 必ず、LongWritableにする。 */ public static class Map extends Mapper<LongWritable,Text,IntWritable,DoubleWritable>{ private String mapTaskId; private String inputFile; public void setup(Context context) { mapTaskId = MRJobConfig.TASK_ATTEMPT_ID; inputFile = MRJobConfig.MAP_INPUT_FILE; } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String str = value.toString(); int index = str.indexOf(" "); String keyStr = str.substring(0, index); String valArr[] = str.substring(index+1).split(" "); IntWritable outKey = new IntWritable(Integer.parseInt(keyStr)); for(String valStr : valArr){ double dval = Double.parseDouble(valStr); context.write(outKey, new DoubleWritable(dval)); } } } /* * Reduce * (列番号,値)を集約後、算術平均を計算する */ public static class Reduce extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable>{ private String reduceTaskId; public void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException{ long noValues = 0; double sum = 0; for(DoubleWritable value: values){ ++noValues; sum += value.get(); } if(noValues > 0) sum = sum/noValues; context.write(key, new DoubleWritable(sum)); } } public static void main(String[] args) throws Exception { // プログラムの開始時刻をstdoutに書く。 Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); System.out.println("Program Start: "+sdf.format(date)); // Jobインスタンスの生成 Job job = Job.getInstance(new Configuration()); job.setJarByClass(MeanCalc.class); // Mapperクラス, Reducerクラスの定義 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); // OutputKeyとOutputValueのクラス定義 job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(DoubleWritable.class); // 入出力フォルダーの指定 FileInputFormat.addInputPath(job, new Path(args[0])); // アウトプットフォルダーは動的に作成する。 Long dTmp = Math.round(Math.random()*100000); String subDir = args[1]+"/"+dTmp.toString(); FileOutputFormat.setOutputPath(job, new Path(subDir)); System.out.println("Output folder is "+subDir); // 入出力ファイルの形式 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // MapReduceジョブの実行 boolean success = job.waitForCompletion(true); if(success){ System.out.println("MapReduce Job endded successfully."); }else{ System.out.println("Error! MapReduce Job abnormally endded."); } // プログラムの終了時刻をstdoutに書く。 date = new Date(); System.out.println("Program END: "+sdf.format(date)); } }
実行:11台構成
まず、Amazon Elastic MapReduceのクラスタを以下のサイズとした。
ノード | サイズ | 台数 |
---|---|---|
マスタ | m1.middle | 1 |
コア | m1.middle | 5 |
タスク | m1.middle | 5 |
ログをS3に採るようにクラスタを起動し、実行はRuby Client。
elastic-mapreduce --jar s3n://[your bucket]/[your folder]/hadoopsample.jar --main-class com.tetsuyaodaka.MultiMeanCalc --args s3n://[your bucket]/[your input folder],s3n://[your bucket]/[your input folder] --step-name MiltiMeanCalculation --jobflow j-XXXXXXXXX
タイプスタンプはS3バケットのstdoutに吐き出される。
Program Start: 2014/06/12 9:18:28 Output folder is s3n://XXXXXXXXXXX/EMR/output/20140612/10344 Program END: 2014/06/12 9:38:32
実行時間は、20min.04sec.(1,204sec.)と決して速くはない。
以下は、マネージメントコンソールからタスク(Task_attempt)の一覧を表示したところ。
起動したタスクは、Mapが200、Reduceタスクが39。エラーの発生はなかった。
結果を保存したS3バケットのスクリーンショットを以下に示す。39のreduceから均等に結果が吐き出されている。
出力は以下のように、キー(変量のSeq)+タブ+平均値+改行となっている。
39 4.94539519773 78 4.911648371779998 117 5.014055572980005 156 4.970061421849994 195 4.937069895429997 234 5.206232056259996 273 4.994578578919996 312 4.897684452960004 351 5.095053427910006 390 4.970409691689994 429 4.889084266230005 468 4.95468362101 507 5.107228051750001 546 4.844947219119992 ...
スケールアウトさせる。
では、クラスタサイズを大きくしたときに、計算がスケールするか確認する。
クラスタ0(上の構成)
マスタノード数 | 1 |
---|---|
コアノード数 | 5 |
タスクノード数 | 5 |
総ノード数 | 11 |
Mapタスク数 | 200 |
Reduceタスク数 | 39 |
実行時間 | 20min.04sec.(1,204sec.) |
Map起動までのレイテンシー | 38sec |
Map処理時間 | 14min.23sec |
Reduce起動までのレイテンシー | 2min.9sec |
(注記)
クラスタ1
マスタノード数 | 1 |
---|---|
コアノード数 | 10 |
タスクノード数 | 10 |
総ノード数 | 21 |
Mapタスク数 | 200 |
Reduceタスク数 | 39 |
実行時間 | 10min.02sec.(602sec.) |
Map起動までのレイテンシー | 35sec |
Map処理時間 | 7min.59sec |
Reduce起動までのレイテンシー | 1min.56sec |
クラスタ2
マスタノード数 | 1 |
---|---|
コアノード数 | 20 |
タスクノード数 | 20 |
総ノード数 | 41 |
Mapタスク数 | 200 |
Reduceタスク数 | 39 |
実行時間 | 6min.23sec.(382sec.) |
Map起動までのレイテンシー | 40sec |
Map処理時間 | 4min.5sec |
Reduce起動までのレイテンシー | 2min.5sec |
散布図
横軸に総ノード数、縦軸に実行時間をとったグラフを示す。
クラスタ0(11台構成)からクラスタ1(21台構成)へは、面白いように実行時間が半分となった。この結果を外挿すれば、クラスタ2(43台構成)では300sec.程度となるが、382sec.と1分ほど大きい数字となっている。
クラスタサイズを大きくしたことによる、Map起動までのレイテンシーを見ると、初期処理のレイテンシー増加(S3からHDFSへの配置)は見られず、Reduceタスク起動までのレイテンシーも変わらない(Reduceは非常に速く起動していることが分かる)。
クラスター1から2では、Mapの処理時間はスケールしているから、Reduceがボトルネックになっている可能性がある。(Reduceの数を増やせば、スケールするかな)