Amazon Elastic MapReduce : Hadoop2.4環境で標本平均を計算する(Ruby Client)。

ワードカウントに続き、今回も簡単なHadoopMapreduceのサンプル・プログラムを作ってみる。

昨年の記事を参考にして、Amazon Elastic MapReduceのHadoop2.4環境で算術平均を算出する。

データの形式:ビッグデータを視野に入れて。

ビッグデータを視野に入れつつ、データの形式は下表のように仮定しておく。ここで、行はサンプル、列は変量を意味することとする。

今年5月の応用統計学会にて、東大工学部の山西教授による「潜在空間からのディープナレッジの発見」というセッションを聞いた。
その中で、ビッグデータの本質を

とおっしゃっていた。書籍などでは言われる「5V」より、山中先生のこの定義の方が明確だと思う。例えば、Volumeといった場合の本質的な問題は「サンプルサイズではなく、次元の大きさ」というのが、統計学者、機械学習学者(計算工学者)らの共通認識と思う。データの性質を損なわずに、サンプルサイズを下げることを「サンプリング」と呼んで、コンピュータがなかった時代には統計学の重要な領域だった(今でも大切です)のだから、サンプルサイズは「大きな問題」ではない(山中先生も同様のことをおっしゃっていた)。

これに対して、次元が高次になると、

  • 機械学習系でも、超高次元の「特徴」(統計学では「変量」という)空間上では、識別(超)平面が決定できない、といった問題が発生してしまう。「次元の呪い」。
  • 統計学では、(機械学習との大きな違いとして)「仮定するデータが密である」という点が、計算コストの面で大きな問題になる。

後者について補足すると、「疎なデータは、密なデータの1つのケース(多くは、簡単なケース)」として捉えられていることに起因するだろう。解析的な問題は、一般的な問題に取り組む方が効用が大きい。
しかし、相関係数のような(中学生でも計算できる)統計量が「次元が増えた途端に計算できないものになる(計算コストが幾何級数的に増加する。具体的には、データがロードできない、メモリーがリークするといったことになる)」。こういった問題をHadoop MapReduceで解決できるのだろうか、ということが、昨年、このブログにアップした「行列積の計算シリーズ」、「相関係数の計算シリーズ」の動機だった。

前置きが長くなったが、上表のような多変量データを想定した場合、ビッグデータに固有な問題は「行の大きさではなく列の大きさ」である。


まずは、算術平均をMapReduceで計算するプログラムをHadoop2.4で作成してみる。

1変量の場合のプログラム

1変量の場合は、ビッグデータの定義に当てはならないのだが、MapReduceという分散処理フレームワークを評価する上では、面白いサンプルだと思う。
以下に1変量の場合に算術平均を計算するプログラムを示す。同様のコードは、書籍やWebによく掲載されている。

/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * Hadoop2.4.0のMapReduceで算術平均を計算する。
 * 
 * author: tetsuya.odaka@gmail.com
 * 
 */
@SuppressWarnings("unused")
public class MeanCalc {

	/*
	 * Map
	 * Text Fileを読んで、スペースで区切って、(1,値)で書き出す。
	 * 
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,IntWritable,DoubleWritable>{
    	private final static IntWritable one = new IntWritable(1);

        private String mapTaskId;
        private String inputFile;
        private int noRecords = 0;

        /*
         * Mapperの初期化
         * 
         */
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }

        /*
         * Mapperの本体
         * 
         */
        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tk = new StringTokenizer(line);
            while(tk.hasMoreTokens()){
            	++noRecords;
                double dval = Double.parseDouble(tk.nextToken());
                context.write(one, new DoubleWritable(dval));
            }
        }
    }

	/*
	 * Reduce
	 * (1,値)を集約後、算術平均を計算する
	 */
    public static class Reduce extends Reducer<IntWritable, DoubleWritable, LongWritable, DoubleWritable>{
    	private String 	reduceTaskId;
        private int 	noKeys = 0;
        
        /*
         * Reducerの初期化
         * 
         */
        public void setup(Context context) {
        	reduceTaskId = MRJobConfig.TASK_ATTEMPT_ID;
          }
        
        /*
         * Reducerの本体
         * 
         */
        public void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) 
        		throws IOException, InterruptedException{
        	long 	noValues = 0;
            double 	sum = 0;
            for(DoubleWritable value: values){
            	++noValues;
                sum += value.get();
            }

            if(noValues != 0) sum = sum/noValues;
            
            context.write(new LongWritable(noValues), new DoubleWritable(sum));
            ++noKeys;
        }
    }

    public static void main(String[] args) throws Exception {

        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));
        
        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(MeanCalc.class);
        
        // Mapperクラス, Reducerクラスの定義
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // MapperのOutputKeyとOutputValueのクラス定義
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        // 入出力フォルダーの指定
        // インプラントフォルダーは引数からとる。
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);

        // 入出力ファイルの形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        // MapReduceジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }
        
        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));

    }
}

このプログラムの面白い点は、MapからReduceへのデータの引き継ぎに「1」という「たった1つのキー」を使用していること。これは「1つのReducerへデータが殺到する」ことを意味している(なので、このままではあんまり筋がいいプログラムとは言えないだろう)。

さて、「これを回避できるのか」と考えると、

  • Combinerが使えないか?
  • 分散して部分和を求めて、それを統合する。(MapReduceを繋ぐ)

とかが頭をよぎる。前者だと、Combinerで平均をとって、Reducerで平均の平均をとる、といったロジックになるだろう(処理されるかどうか保証されないことを考慮に入れて)。とすると、両者は同じロジックにいきつく。

上に述べたように(全部のデータを使わずに)サンプリングをして大枠のイメージをつかむということも考えられるだろう。サンプルが同一の分布から発生していると仮定すれば、(実務上は)全部を使うこともない。こんな風に、このサンプルだけ見ても、とても面白い題材になっている。

備忘のため、使い方を以下に書いておく。基本はワードカウントのときと同じで、Javaアプリケーションとして起動する。
入力データは「スペース区切りの数値」とし、入力用フォルダーに入れておく。

以下は、実行時のコンフィグ定義。

プログラムへの引数として、入力用テキストファイルの保管フォルダー、出力用テキストの保管フォルダーを指定する。MapReduceの出力先として既存のフォルダーを指定するとエラーになるので、指定したフォルダーしたにランダムにフォルダーを追加する仕様とした。

以下は、引数(Proguramu Arguments)。

/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/mean/input /Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/mean/output

以下は、Dパラ(VM Arguments)で、ワードカウントのときと同じに設定する。

-Xmx1024m -Djava.security.krb5.realm=OX.AC.UK -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk -Dlog4j.configuration="file:/Users/tetsuya/hadoop/etc/hadoop/log4j.properties" -DHadoop.home.dir=/Users/tetsuya/hadoop -Dhadoop.id.str=host -Dhadoop.root.logger=INFO,console -Dhadoop.policy.file=/Users/tetsuya/hadoop/etc/hadoop/hadoop-policy.xml

1変量のプログラムAmazon Elastic MapReduceで処理する。

データ

Rを使って、[0,10]の一様乱数を100,000個作った。
区切り(スペース)と改行(LF)を含んで、1データ当たり9Bytesなので、100,000個でおおよそ900Kとなる。このサイズであれば、まだまだブロック・サイズに余裕がある。
hadoopの定義を見ると(先日の記事に掲載)。hdfs-site.xmlのdfs.block.sizeが134,217,728Bytesに設定されている。なので、このサイズであれば、150ファイル近く(134,217/900=149.13)が1ブロックに収まる勘定。

2.352948 3.432219 4.308323 6.655889 9.904773
2.259036 7.563224 1.308103 8.79912 1.004995
3.355776 4.82223 9.765647 1.299829 7.307348
.....

これを、s3バケットにアップロードする。

実行と結果

プログラムを前回同様にjarアーカイブとし書き出し、s3バケットにアップロードする。

クラスタは以下の構成とした。

ノード サイズ 台数
マスタ m1.middle 1
コア m1.middle 1
タスク m1.middle 1

実行は、Ruby Clientで行う。

elastic-mapreduce --jar s3n://[your bucket]/[your folder]/hadoopsample.jar --main-class com.tetsuyaodaka.MeanCalc --args s3n://[your bucket]/[your input folder],s3n://[your bucket]/[your input folder] --step-name MeanCalculation --jobflow j-XXXXXXXXX

実行時間は、3分弱と(この程度の計算なので)決して速くはない(というか、遅い)。

以下は、マネージメントコンソールからタスク(Task_attempt)の一覧を表示したところ。Mapが8,、Reduceが7あり、Hadoop1のときよりもタスクが多い印象。ブロックサイズから見ると、投機的に実行された(attemptとあるのが気になる)にせよ、多い気がする。

以下が出力フォルダー。Reduceが7つ動いたためその数分の出力ファイルがあるが、中身があるのは1つのみで、「実質的に処理を行ったReduceは1つ」と分かる(上に述べた通り)。


以下がログのフォルダー。

stdoutに(javaプログラムでprint指定した)以下の出力がでる。

Program Start: 2014/06/11 14:21:53
Output folder is s3n://banyantreeus/EMR/output/20140611/88231
Program END: 2014/06/11 14:24:40


多変量の場合に進もうと思ったが、長くなったので一旦切ります。

2014/6/19:つづきの記事を書きました。
Amazon Elastic MapReduce : Hadoop2.4環境で100万変量(10GB)の算術平均を計算する。