Hadoop MapReduce: 平均、分散、標準偏差計算のスケーラビリティー評価(Amazon EMR)

今回のログでは、先の2回のログ(標本平均分散・標準偏差)で作成したプログラムのスケーラビリティーを、Amazon Elastic MapReduceで評価してみたい。

変量をm、サンプルサイズをnとすると、普通の統計の世界では、m << nと考える(仮定する)のが一般的である。例えば、国勢調査などを考えると、家族構成や所得などが変量にあたり、世帯数がサンプルとなる。
これに対して、MapReduceは(理屈的には)mとnのサイズに上記のような仮定を必要としていない。nもmはSplitされる仕掛けとなっている。

ビッグデータを論じる際に、センサーデータがよく引き合いにだされる。どう分析するかはその目的で決定される訳であるが、地震研究(余地)のような場合には、日本各地に設置されたセンサー1つ1つについて、その振動の異常を検出するのが主たる目的である。言葉をかえれば、センサーの総数がmとなり、地震予知に関係する特定の変量の測定値(の数)がnとなる。(ただし、気象庁のページを見ると、地震観測点の数は4200であって、オーダーとしては小さいものとなっている)
ビッグデータを使ったマーケティングでも、基本的な考え方は「数値をつかった1 to 1マーケティング」であり、この場合にセンサーにあたるのは顧客である。
エコノミスト誌」(6/4号;使える統計学)のTwitter分析では、つぶやかれた語句の数がmとなり、日ごとあるいは、週ごとの出現頻度が観測値、観測期間がnに値する。

このように、ビッグデータ論の性質で面白いのは、m >> nの場合を論じていることであると思う。

したがって、スケーラビリティを評価する場合にあたっても、mがnより極めて大きい場合を想定して評価してみる。

評価するためクラスタは以下の2つを用意した。

クラスタ

クラスタ1では、マスターインスタンスグループ(Name Node 兼 JobTracker)1台、コアインスタンスグループ(Data Node 兼 TaskTracker)2台、タスクインスタンスグループ(TaskTracker)3台の構成とした。
プログラムを実行するのは、コアとタスクの5台となる。
インスタンスは、U.S.Standard(N.Virginia)のm1.smallを利用した。EC2のインスタンスタイプはとても分かりづらいがページを参照する限り、市販のPCと同等(もしくは、それ以下)と考えられる。


クラスタ

クラスタ2では、マスターインスタンスグループ(Name Node 兼 JobTracker)1台、コアインスタンスグループ(Data Node 兼 TaskTracker)4台、タスクインスタンスグループ(TaskTracker)6台の構成とした。
プログラムを実行するのは、コアとタスクの10台となる。
インスタンスは、U.S.Standard(N.Virginia)のm1.smallを利用した。

サンプルデータの作成

サンプルは以下のプログラムで作成した。
mとnを指定し、そのサイズの行列を書き出す。要素は、[0,10]の一様乱数を小数点以下1桁で四捨五入したものである。

import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;

/**
 * Create Sample Date for Hadoop Calculation.
 * 
 * Columns are independent each another.
 * Create using uniform distribution [0.10] 
 * 
 * args: 0: path of the output file (filename must be included)
 *       1: number of rows (n) 
 *       2: number of columns (m)
 * 
 * @author tetsuya.odaka@gmail.com
 *
 */
public class CreateData {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		System.out.println("start");
		try {
			PrintWriter outFile 
				= new PrintWriter(
						new BufferedWriter(
								new OutputStreamWriter(
										new FileOutputStream(args[0]),"UTF-8")));
		// num of rows(n)
		int sizeR = Integer.parseInt(args[1]);
		// num of cols(m)
		int sizeC = Integer.parseInt(args[2]);
		
		for(int i=0;i<sizeR;i++){
			for(int j=0;j<sizeC;j++){
				double rnd = Math.random()*10;
				BigDecimal bd = new BigDecimal(rnd);
				BigDecimal r = bd.setScale(1, BigDecimal.ROUND_HALF_UP); 
				if(j==(sizeC-1)){
					outFile.print(r);
				}else{
					outFile.print(r+" ");
				}
			}
			outFile.print(System.getProperty("line.separator"));
		}
			
			outFile.close();
		} catch (UnsupportedEncodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("end");
		
	}

}

以下は、上のプログラムで生成した、m=9, n=20のデータである。

3.5 9.9 1.9 4.3 9.9 4.1 5.0 3.9 4.4
0.3 8.6 6.5 4.5 4.7 3.9 0.1 4.2 9.3
0.5 4.5 3.0 8.8 3.6 7.1 6.0 9.8 4.5
3.6 6.5 0.2 4.0 9.0 6.0 8.0 1.9 5.9
2.2 6.4 9.2 5.0 1.6 2.6 6.0 6.0 1.8
3.3 5.9 0.4 7.3 8.1 7.6 1.4 5.7 6.3
2.6 4.0 0.8 8.8 3.9 4.9 9.1 9.8 1.7
4.2 3.1 4.4 7.8 5.9 3.4 3.9 9.3 8.0
0.6 8.1 8.5 0.9 4.4 4.3 7.1 9.8 8.8
4.5 5.4 8.1 3.3 7.6 7.4 9.0 8.7 1.3
1.8 8.2 0.3 6.4 1.3 8.3 1.4 8.2 3.7
3.7 9.3 9.0 4.8 6.7 7.1 2.8 7.1 3.0
9.1 3.2 8.8 4.6 9.8 9.6 3.0 5.4 6.1
4.6 2.0 9.5 6.9 9.1 8.8 0.9 5.8 4.4
8.6 4.4 1.5 7.4 0.5 3.6 6.2 0.7 4.1
3.7 1.9 7.6 8.6 8.3 4.4 4.3 6.0 1.7
9.2 9.1 8.8 4.6 3.0 8.1 8.5 5.5 5.0
1.6 3.0 2.5 4.6 5.5 1.8 1.3 3.2 3.2
8.8 9.1 5.8 1.9 1.6 0.7 3.1 6.2 6.9
5.7 2.1 0.3 3.3 1.9 4.4 4.5 10.0 8.4

標本平均の算出プログラムのスケーラビリテリーを評価する。

標本平均の演算回数は、m、nのそれぞれに関して線形である。

m=100万、n=100として、クラスター1と2を比較する。

クラスタ数を倍にすることで、実行時間は53%に短縮された。

実行時間(sec) num of map task num of reduce task
クラスタ 625 6 8
クラスタ 333 6 17

ほぼ線形になっているといえるだろう。


クラスタ2で、m=100万、n=100の場合とm=100万、n=1000の場合を比較する。
実行時間(sec) num of map task num of reduce task
m=100万、n=100 333 6 17
m=100万、n=1000 1955 60 17

演算回数は10倍になるが、実行時間は6倍程度(5.87倍)に収まっている。

分散、標準偏差算出プログラムのスケーラビリテリーを評価する。

分散、標準偏差の演算回数は、m、nのそれぞれに関して線形である。

m=100万、n=100として、クラスター1と2を比較する。

クラスタ数を倍にすることで、実行時間は62%に短縮した。

実行時間(sec) num of map task num of reduce task
クラスタ 727 14 8
クラスタ 452 14 17

クラスタ2で、m=100万、n=100の場合とm=100万、n=1000の場合を比較する。

ここでも、演算回数は10倍になるが、実行時間は5倍(4.91倍)程度に収まっている。

実行時間(sec) num of map task num of reduce task
m=100万、n=100 452 14 17
m=100万、n=1000 2222 77 17

まとめ

同一のデータを使用した場合、クラスター数を倍にすることで、計算時間はほぼ半分に短縮された(線形性)
ただ、同じクラスタ構成で、データサイズを10倍にした場合、計算時間が5-6倍程度に落ち着いている。可能性としては、

  • データサイズをあげた場合、同一クラスタでもMapper数が増加していること。(スロットをフルに活用している)
  • 実行速度が、演算回数ではなく、他の要因(Amazon S3からのデータ伝送など)に支配されている。

が考えられる。

これらの性質については、継続して考察して行きたい。