Amazon Elastic MapReduce : bootstrap-actionのwgetがうまくいかない。

今回は、ちょっとした備忘。
Amazon Elastic MapReduceの環境を初期設定するために、bootstrap actionsというのが定義できる。クラスタ起動時にHadoopが開始される前、各インスタンスに対して実行される。(詳しくはAWS Documentation(英語)を参照)
ここでテンプレートをつかって、Hadoopの設定を変更したり、S3に保存したShellを動かしてクラスタインスタンスにソフトをインストしたりできる。後者は、Custom Bootstrap Actionと呼ばれている。

ここで、initial.shというshellをS3に保存し、以下のことをしようとしたのだが、どうもうまくいかない。

  1. 日付を日本時間に合わせる。
  2. S3cmdのコンフィギュレーションファイル(.s3fg)を作成する。
  3. S3cmdをsourceforgeCDNからダウンロード&解凍する。

(注)S3syncはgems2.2で、AMI上でnokogiriのインストがうまくいかないので、深入りしなかった。

上の3ステップで、うまく行かないのは最後のところ。以下にinitial.shを示しておく。
注意するのは、exitコードをつけておかないといけない位と思う。

#!/bin/sh
set -e

#時刻設定
sudo rm /etc/localtime 
sudo ln -s /usr/share/zoneinfo/Asia/Tokyo /etc/localtime

#s3cmd
#一度、最新バージョンのs3cmdを導入後、 --configureをして作成して、それを元に作成する(もっと長い)。
echo "[default]
access_key = [your access key]
secret_key = [your secret key]" > ~/.s3cfg

#s3cmd
wget -S -T 30 -t 5 http://superb-dca3.dl.sourceforge.net/project/s3tools/s3cmd/1.5.0-beta1/s3cmd-1.5.0-beta1.tar.gz
tar zxfv s3cmd-1.5.0-beta1.tar.gz
export PATH=$PATH:/home/hadoop/s3cmd-1.5.0-beta1

exit 0

ここでうまくいかないのは、wgetインスタンスをUSにしているので、s3cmdのダウンロード先もUSに変更したのだがうまくいかない(タイムアウトも30sec、5回に設定)。

AWS Documentにもサンプルがあるんだけどなぁ。
ログでもみようかな。

EclipseでHadoop2.4の開発環境を作る&ワードカウントのプログラムを作成する。

前回に引き続き、使用するHadoopのバージョンは2.4。1年前の記事をもとに、Eclipseで開発環境を作る。

開発を行うクライアントは、MacBook Pro (OSX 10.9.3)。クライアントのJavaのバージョンは以下。

MacBook-Pro:~ tetsuya$ java -version
java version "1.6.0_65"
Java(TM) SE Runtime Environment (build 1.6.0_65-b14-462-11M4609)
Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-462, mixed mode)

ダウンロード

hadoopのダウンロードページから、hadoop-2.4.0.tar.gzをダウンロードする。

クライアントの設定

ダウンロードしたhadoop-2.4.0.tar.gzをホームに解凍し、hadoopというシンボリックリンクを張る。

lrwxr-xr-x    1 tetsuya  staff        13  6  7 11:50 hadoop -> hadoop-2.4.0/
drwxr-xr-x@  12 tetsuya  staff       408  3 31 18:15 hadoop-2.4.0

ワードカウントのプログラムを作成する。

プロジェクトの作成と外部JARの登録

新しくJavaプロジェクトを作成する。
プロパティーJavaのビルド・パスに、「Add External JARS」で~/hadoop/share/hadoop下に含まれるjarファイルを登録する。

追加するjarファイルが存在するディレクトリは以下。

  • common
  • common/lib
  • tools/lib(common/libとjarがだぶってる)
  • hdfs
  • mapreduce
  • yarn
クラスの作成:ワードカウントのプログラムの作成。

ワードカウントを行うJavaプログラムを作成する。

やって見た感じでは、MapReduceの処理そのものはそのまま動くし、殆ど変わらない。
以下に具体的にEclipseでの開発手順とプログラム、実行方法と結果を示す。


新規にパッケージを作成して、クラス(FQCN=com.tetsuyaodaka.WordCount)を追加する。


下に示すプログラムは、至る所で見られる代物で、次のロジックになっている。

  • Mapperでは、スペース区切りの文書を単語(ワード)に分け、(ワード,1)というキーバリューに分ける。
  • Reducerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。

上のロジックは、以下と書いても同じなので、ReduceクラスをCombinerとして使う。

  • Mapperでは、スペース区切りの文書を単語(ワード)に分け、(ワード,1)というキーバリューに分ける。
  • Combinerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。
  • Reducerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。

CombinerはMapperの後に(Mapperと同一のマシンで)処理されるが、処理が実行される保証はない。

/*
/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.

 */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;

/**
 * Hadoop2.4.0のMapReduceでワードカウントする。
 * 
 * author: tetsuya.odaka@gmail.com
 * 
 */

@SuppressWarnings("unused")
public class WordCount{

	/*
	 * Map
	 * Text Fileを読んで、スペースで区切って、(ワード,1)で書き出す。
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{
    	private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

		private String mapTaskId;
		private String inputFile;
		private int noRecords = 0;
        
		/*
		 * Mapperの初期化
		 */
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }

        /*
         * Mapper本体
         * 
         * 入力ファイルから1行ずつ読み込んで処理を行う。
         */
        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tk = new StringTokenizer(line);
            while(tk.hasMoreTokens()){
            	++noRecords;
                word.set(tk.nextToken());
                context.write(word, one);
            }
        }
    }

	/*
	 * Reduce
	 * (ワード,1)を集約して、ワードの出現回数を出力する。
	 * 
	 * 注記:
	 * Combinerでも使う。
	 */
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    	private String reduceTaskId;
		private int noKeys = 0;
        
		/*
		 * Reducerの初期化
		 * 
		 */
        public void setup(Context context) {
        	reduceTaskId = MRJobConfig.TASK_ATTEMPT_ID;
          }
        
        /*
         * Reducer本体
         * 
         * 同一キーの値(value)がItarableに入って、処理が開始。
         * Comberで、Wordに対してカウントをとっても大丈夫なので、Combinerとしても使う。
         */
        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        		throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable value: values){
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
            ++noKeys;
        }
    }

    public static void main(String[] args) throws Exception {

        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));

        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCount.class);
        
        // Mapper, Combiner, Reducerの定義
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);

        // OutPutKey, OutPutValueの定義
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 入力・出力ファイルのフォルダー定義:S3バケットからとる。
        // インプラントフォルダーは引数からとる。
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);

        // 入力・出力ファイルの形式の定義
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // ジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }

        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}


1.0.3の時に使っていたorg.apache.hadoop.mapreduce.Jobのコンストラクタが全てdeprecatedになっている(APIドキュメント)になっているので、staticなFactoryを使うように修正した。

実行時には、

  1. 入力テキストを保持するフォルダー
  2. 出力を保持するフォルダー

を指定する。

/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/input /Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/output

入力を保持するフォルダーには、スペース区切りのテキストを突っ込んでおけば良い。出力フォルダーを作成しておくと、「File Already Exists」というIOExceptionがでる。なので、この配下に、乱数を使ってフォルダーを作成するようにする。(このIOExceptionは、「上書きでせっかく作ったデータが消えるのを防止するため」と象本に書いてある)

以下が、プロジェクトのフォルダー構成。

クラスの実行:ワードカウントのプログラムの実行。

サンプルを実行してみる。サンプルには、USA Today紙の記事(3-alarm fire near Rockefeller Center)を使ってみた。

この記事をコピペして、入力フォルダーにinput.txtとして保存する。これで、Javaアプリケーションの実行をすれば動くのだが、log4jのappenderがないよと言って来たり、Macのセキュリティーエラーが出たりするので(こちらの記事を参考にさせていただいた)、Dパラ(VM Arguments)を以下のように設定しておく。

-Xmx1024m 
-Djava.security.krb5.realm=OX.AC.UK 
-Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk 
-Dlog4j.configuration="file:/Users/tetsuya/hadoop/etc/hadoop/log4j.properties" 
-DHadoop.home.dir=/Users/tetsuya/hadoop 
-Dhadoop.id.str=host -Dhadoop.root.logger=INFO,console 
-Dhadoop.policy.file=/Users/tetsuya/hadoop/etc/hadoop/hadoop-policy.xml


以下がコンソール・ログ。Nativeのローダーで警告が出たが、動くことは確認できた。

14/06/07 15:33:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/07 15:33:50 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/06/07 15:33:50 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/06/07 15:33:50 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
14/06/07 15:33:50 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/07 15:33:50 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
14/06/07 15:33:50 INFO mapred.FileInputFormat: Total input paths to process : 1
14/06/07 15:33:50 INFO mapreduce.JobSubmitter: number of splits:1
14/06/07 15:33:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local601616696_0001
14/06/07 15:33:50 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/staging/tetsuya601616696/.staging/job_local601616696_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/06/07 15:33:50 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/staging/tetsuya601616696/.staging/job_local601616696_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/06/07 15:33:51 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/local/localRunner/tetsuya/job_local601616696_0001/job_local601616696_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/06/07 15:33:51 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/local/localRunner/tetsuya/job_local601616696_0001/job_local601616696_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/06/07 15:33:51 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
14/06/07 15:33:51 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/06/07 15:33:51 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
14/06/07 15:33:51 INFO mapreduce.Job: Running job: job_local601616696_0001
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Waiting for map tasks
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Starting task: attempt_local601616696_0001_m_000000_0
14/06/07 15:33:51 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
14/06/07 15:33:51 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
14/06/07 15:33:51 INFO mapred.MapTask: Processing split: file:/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/input/input.txt:0+1305
14/06/07 15:33:51 INFO mapred.MapTask: numReduceTasks: 1
14/06/07 15:33:51 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
14/06/07 15:33:51 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
14/06/07 15:33:51 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
14/06/07 15:33:51 INFO mapred.MapTask: soft limit at 83886080
14/06/07 15:33:51 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
14/06/07 15:33:51 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 
14/06/07 15:33:51 INFO mapred.MapTask: Starting flush of map output
14/06/07 15:33:51 INFO mapred.MapTask: Spilling map output
14/06/07 15:33:51 INFO mapred.MapTask: bufstart = 0; bufend = 2155; bufvoid = 104857600
14/06/07 15:33:51 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213544(104854176); length = 853/6553600
14/06/07 15:33:51 INFO mapred.MapTask: Finished spill 0
14/06/07 15:33:51 INFO mapred.Task: Task:attempt_local601616696_0001_m_000000_0 is done. And is in the process of committing
14/06/07 15:33:51 INFO mapred.LocalJobRunner: attempt_local601616696_0001_m_000000_0 processed 100
14/06/07 15:33:51 INFO mapred.Task: Task 'attempt_local601616696_0001_m_000000_0' done.
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local601616696_0001_m_000000_0
14/06/07 15:33:51 INFO mapred.LocalJobRunner: map task executor complete.
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Starting task: attempt_local601616696_0001_r_000000_0
14/06/07 15:33:51 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
14/06/07 15:33:51 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
14/06/07 15:33:51 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2d397e5c
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=745517888, maxSingleShuffleLimit=186379472, mergeThreshold=492041824, ioSortFactor=10, memToMemMergeOutputsThreshold=10
14/06/07 15:33:51 INFO reduce.EventFetcher: attempt_local601616696_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
14/06/07 15:33:51 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local601616696_0001_m_000000_0 decomp: 1930 len: 1934 to MEMORY
14/06/07 15:33:51 INFO reduce.InMemoryMapOutput: Read 1930 bytes from map-output for attempt_local601616696_0001_m_000000_0
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1930, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1930
14/06/07 15:33:51 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
14/06/07 15:33:51 INFO mapred.Merger: Merging 1 sorted segments
14/06/07 15:33:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1922 bytes
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merged 1 segments, 1930 bytes to disk to satisfy reduce memory limit
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merging 1 files, 1934 bytes from disk
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
14/06/07 15:33:51 INFO mapred.Merger: Merging 1 sorted segments
14/06/07 15:33:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1922 bytes
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/06/07 15:33:51 INFO mapred.Task: Task:attempt_local601616696_0001_r_000000_0 is done. And is in the process of committing
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/06/07 15:33:51 INFO mapred.Task: Task attempt_local601616696_0001_r_000000_0 is allowed to commit now
14/06/07 15:33:51 INFO output.FileOutputCommitter: Saved output of task 'attempt_local601616696_0001_r_000000_0' to file:/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/output/_temporary/0/task_local601616696_0001_r_000000
14/06/07 15:33:51 INFO mapred.LocalJobRunner: attempt_local601616696_0001_r_000000_0 processed 100 > reduce
14/06/07 15:33:51 INFO mapred.Task: Task 'attempt_local601616696_0001_r_000000_0' done.
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local601616696_0001_r_000000_0
14/06/07 15:33:51 INFO mapred.LocalJobRunner: reduce task executor complete.
14/06/07 15:33:52 INFO mapreduce.Job: Job job_local601616696_0001 running in uber mode : false
14/06/07 15:33:52 INFO mapreduce.Job:  map 100% reduce 100%
14/06/07 15:33:52 INFO mapreduce.Job: Job job_local601616696_0001 completed successfully
14/06/07 15:33:52 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=6870
		FILE: Number of bytes written=451235
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=15
		Map output records=214
		Map output bytes=2155
		Map output materialized bytes=1934
		Input split bytes=124
		Combine input records=214
		Combine output records=150
		Reduce input groups=150
		Reduce shuffle bytes=1934
		Reduce input records=150
		Reduce output records=150
		Spilled Records=300
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=4
		Total committed heap usage (bytes)=395567104
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1305
	File Output Format Counters 
		Bytes Written=1349


以下が結果。ストップワードや区切り文字、大文字・小文字といった(実用上での)問題はあるが、まずはOK。

10:30	1
130	1
1920s	1
2014	1
48th	1
49th	1
A	3
According	1
All	1
Associated	1
Center.	1
Copyright	1
Department	1
FDNY	1
Fire	1
Firefighters	1
Flames	1
Friday	1
Friday's	1
Gargantiel	1
Kirby	1
Manhattan,	1
More	1
New	2
One	1
Press.	1
Rockefeller	1
TGI	1
The	3
This	1
Twitter	1
Witness	1
York	1
York's	1
a	5
about	1
according	1
acrid.	1
after	1
air	1
an	1
and	8
area	2
around	2
as	1
basement	2
be	1
blaze	2
broadcast,	1
broke	1
brought	1
building	1
building.	1
built	1
bus	1
but	1
called	1
came	1
cause	1
caused	1
city	1
clouded	1
control	1
darker,	1
delays	1
department	1
dining	1
fifth	1
fire	3
firefighter	1
firefighters	1
first	1
flames	2
floor.	1
floors,	1
fourth	1
from	4
grew	1
has	1
he	1
hours	1
hurt	1
in	6
injury,	1
investigation.	1
it	1
kitchens	1
likely	1
material	1
may	1
midtown	1
minor	1
near	1
nearby	1
no	1
not	1
occupies	1
of	4
offices	1
officials	2
on	3
one	1
or	1
out	1
p.m.,	1
photo	1
published,	1
records,	1
redistributed	1
rerouted	1
reserved.	1
restaurant	2
rewritten	1
rights	1
roof	2
rooftop	1
routes.	1
said	3
said.	1
scene.	1
seriously	1
shooting	1
shot	1
showed	1
six-story	1
sixth.	1
skyline	1
smoke	2
spokesman	1
spokesman.	1
streets	1
sustained	1
system	1
than	1
the	18
thicker	1
third	1
through	1
to	3
traffic	1
transit	1
turning	1
two	1
under	2
vent.	1
ventilation	1
was	3
watched	1
were	1
with	1

続きの記事

Amazon Elastic MapReduceを使う:Hadoop2.4でサンプルを実行してみる。

今回から(覚悟を決めて)Hadoop2のAMIを使ってみる。
Hadoop2系は、昨年の秋、バージョン0.23から2.2というバージョン番号が付与されたプロダクト(Hadoop1系は0.20からメジャーバージョンが付与された)。YARN(Yet Another Resource Negotiator)という分散処理のフレームワークに乗っかっていて、1系とは大きくアーキテクチャが違っている。「MapReduceはその中の1つにすぎない」ということになっている。
MapReduce1もサポートされているようなので少し安心。

2系についてのお勉強には、以下のサイトを参考にさせていただいた(象本が新しくなっているのに今更気がついたので、今度のお小遣いで買わなくちゃ)。

  1. 「Hadoop 2」データ処理とサービスの同時実行が可能な安定版が公開 ITPro
  2. MR2とYARNの手短な解説(cloudera)
  3. Hadoop2 Going beyound MapReduce
  4. 完全分散モードの設計メモとYARNの仕様(Open Groove)
  5. Hadoop0.23 YARN

Amazon Elastic MapReduceでは、1系はHadoop1.0.3だけが選択できて(0.20も可能)、2系の方がAMIが充実している(2系を使ってみる理由)。
2.2以上は、選択できるインスタンス・サイズがmidium以上になっている。節約のため、スポットインスタンスを使う。
スポットインスタンスは、「Amazon Web Services>購入オプション>Amazon EC2 スポットインスタンス」で参照できる。使用するリージョンで適宜調べる。自分の場合、us-east(N.Virginia)を使う。

Hadoop2.4のAMIにSSHで接続する。

クラスタを起動して、SSHでログイン(ユーザー:hadoop)し、ls-alした結果は以下。

[hadoop@ip-10-181-195-85 ~]$ ls -al
合計 36
drwxr-xr-x 6 hadoop hadoop 4096  6月  4 03:01 .
drwxr-xr-x 4 root   root   4096  5月 13 18:24 ..
-rwxrwxr-- 1 hadoop hadoop   94  5月 13 18:25 .bash_profile
-rwxrwxr-- 1 hadoop hadoop 1558  5月 13 18:27 .bashrc
drwx------ 2 hadoop hadoop 4096  6月  4 03:01 .ssh
drwxr-xr-x 9 hadoop hadoop 4096  5月 13 18:27 .versions
lrwxrwxrwx 1 root   root     40  5月 13 18:27 Cascading-2.5-SDK -> /home/hadoop/.versions/Cascading-2.5-SDK
lrwxrwxrwx 1 hadoop hadoop   32  5月 13 18:24 bin -> /home/hadoop/.versions/2.4.0/bin
lrwxrwxrwx 1 hadoop hadoop   39  5月 13 18:24 conf -> /home/hadoop/.versions/2.4.0/etc/hadoop
drwxr-xr-x 3 hadoop hadoop 4096  5月 13 18:24 contrib
lrwxrwxrwx 1 hadoop hadoop   32  5月 13 18:24 etc -> /home/hadoop/.versions/2.4.0/etc
lrwxrwxrwx 1 hadoop hadoop   71  5月 13 18:24 hadoop-examples.jar -> /home/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.0.jar
lrwxrwxrwx 1 root   root     34  5月 13 18:27 hive -> /home/hadoop/.versions/hive-0.11.0
drwxr-xr-x 2 hadoop hadoop 4096  5月 13 18:27 lib
lrwxrwxrwx 1 hadoop hadoop   36  5月 13 18:24 libexec -> /home/hadoop/.versions/2.4.0/libexec
lrwxrwxrwx 1 hadoop hadoop   33  5月 13 18:25 mahout -> /home/hadoop/.versions/mahout-0.9
lrwxrwxrwx 1 root   root     33  5月 13 18:27 pig -> /home/hadoop/.versions/pig-0.12.0
lrwxrwxrwx 1 hadoop hadoop   33  5月 13 18:24 sbin -> /home/hadoop/.versions/2.4.0/sbin
lrwxrwxrwx 1 hadoop hadoop   34  5月 13 18:24 share -> /home/hadoop/.versions/2.4.0/share

Cascading SDKホームページ)の2.5(最新バージョン)が配備されている。
Cascadingは、MapReduceのパイプラインを構築するライブラリー。サイバーエージェントの方がとても親切なPigとの比較レポートを公開してくださっている(こちら)。

Apache Mahout(0.9)もセットアップされている。

javaのバージョンは、1.7.0。

[hadoop@ip-XX-XX-XX-XX ~]$ java -version
java version "1.7.0_60-ea"
Java(TM) SE Runtime Environment (build 1.7.0_60-ea-b02)
Java HotSpot(TM) 64-Bit Server VM (build 24.60-b04, mixed mode)

rubyのバージョンは2.0.0。hadoop1.0.3のAMIでは、ruby1.8.7がインストされていた。s3syncがruby1.9.2以上を要求するので、こちらの方が都合がよさそう。

[hadoop@ip-XX-XX-XX-XX ~]$ ruby -v
ruby 2.0.0p451 (2014-02-24 revision 45167) [x86_64-linux]

Hadoop2.4のAMIのコンフィギュレーション

以下は、m1.mediumのインスタンス2つ(マスタ、コア)のクラスタを起動した場合。2系からyarn-site.xmlが追加。

以下に、hadoopの設定ファイルの内容を列記しておく。Hadoop1と同じ3つの設定ファイルに加えて、yarn-site.xmlが追加されてる。

/home/hadoop/conf/core-site.xmlhadoop全体の設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>hadoop.proxyuser.hadoop.groups</name><value>*</value></property>
  <property><name>fs.s3n.impl</name><value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value></property>
  <property><name>fs.s3.impl</name><value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value></property>
  <property><name>fs.default.name</name><value>hdfs://[your internal address]:9000</value></property>
  <property><name>hadoop.tmp.dir</name><value>/mnt/var/lib/hadoop/tmp</value></property>
  <property><name>fs.s3n.awsSecretAccessKey</name><value>[your was secret access key]</value></property>
  <property><name>fs.s3n.awsAccessKeyId</name><value>[your was access key]</value></property>
  <property><name>fs.s3.buffer.dir</name><value>/mnt/var/lib/hadoop/s3</value></property>
  <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value></property>
  <property><name>fs.s3bfs.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
  <property><name>hadoop.metrics.defaultFile</name><value>/home/hadoop/conf/hadoopDefaultMetricsList</value></property>
  <property><name>fs.s3.awsSecretAccessKey</name><value>[your was secret access key]</value></property>
  <property><name>hadoop.proxyuser.hadoop.hosts</name><value>*</value></property>
  <property><name>fs.s3bfs.awsAccessKeyId</name><value>[your was access key]</value></property>
  <property><name>fs.s3bfs.awsSecretAccessKey</name><value>[your was secret access key]</value></property>
  <property><name>hadoop.metrics.list</name><value>TotalLoad,CapacityTotalGB,UnderReplicatedBlocks,CapacityRemainingGB,PendingDeletionBlocks,PendingReplicationBlocks,CorruptBlocks,CapacityUsedGB,numLiveDataNodes,numDeadDataNodes,MissingBlocks</value></property>
  <property><name>io.compression.codec.lzo.class</name><value>com.hadoop.compression.lzo.LzoCodec</value></property>
  <property><name>fs.s3.awsAccessKeyId</name><value>[your was access key]</value></property>
</configuration>
/home/hadoop/conf/hdfs-site.xmlHDFSの設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>dfs.datanode.max.xcievers</name><value>4096</value></property>
  <property><name>dfs.datanode.https.address</name><value>0.0.0.0:9402</value></property>
  <property><name>dfs.datanode.du.reserved</name><value>536870912</value></property>
  <property><name>dfs.namenode.handler.count</name><value>64</value></property>
  <property><name>io.file.buffer.size</name><value>65536</value></property>
  <property><name>dfs.block.size</name><value>134217728</value></property>
  <property><name>dfs.data.dir</name><value>/mnt/var/lib/hadoop/dfs</value></property>
  <property><name>dfs.secondary.http.address</name><value>0.0.0.0:9104</value></property>
  <property><name>dfs.replication</name><value>1</value></property>
  <property><name>dfs.http.address</name><value>0.0.0.0:9101</value></property>
  <property><name>dfs.https.address</name><value>0.0.0.0:9202</value></property>
  <property><name>dfs.datanode.http.address</name><value>0.0.0.0:9102</value></property>
  <property><name>dfs.datanode.address</name><value>0.0.0.0:9200</value></property>
  <property><name>dfs.name.dir</name><value>/mnt/var/lib/hadoop/dfs-name</value></property>
  <property><name>dfs.webhdfs.enabled</name><value>true</value></property>
  <property><name>dfs.datanode.ipc.address</name><value>0.0.0.0:9201</value></property>
</configuration>
/home/hadoop/conf/mapred-site.xmlMapReduceの設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>mapred.output.committer.class</name><value>org.apache.hadoop.mapred.DirectFileOutputCommitter</value></property>
  <property><name>mapreduce.task.io.sort.mb</name><value>200</value></property>
  <property><name>mapreduce.reduce.cpu.vcores</name><value>2</value></property>
  <property><name>mapred.output.direct.EmrFileSystem</name><value>true</value></property>
  <property><name>mapreduce.task.io.sort.factor</name><value>48</value></property>
  <property><name>mapreduce.jobhistory.address</name><value>ip-[your internal address].ec2.internal:10020</value></property>
  <property><name>mapreduce.job.userlog.retain.hours</name><value>48</value></property>
  <property><name>mapreduce.framework.name</name><value>yarn</value></property>
  <property><name>mapreduce.reduce.java.opts</name><value>-Xmx768m</value></property>
  <property><name>mapreduce.map.java.opts</name><value>-Xmx512m</value></property>
  <property><name>mapreduce.reduce.shuffle.parallelcopies</name><value>20</value></property>
  <property><name>mapreduce.map.memory.mb</name><value>768</value></property>
  <property><name>hadoop.job.history.user.location</name><value>none</value></property>
  <property><name>mapreduce.reduce.speculative</name><value>true</value></property>
  <property><name>mapreduce.application.classpath</name><value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,/usr/share/aws/emr/emr-fs/lib/*,/usr/share/aws/emr/lib/*</value></property>
  <property><name>mapreduce.map.cpu.vcores</name><value>1</value></property>
  <property><name>mapreduce.map.speculative</name><value>true</value></property>
  <property><name>mapred.output.direct.NativeS3FileSystem</name><value>true</value></property>
  <property><name>mapreduce.job.maps</name><value>2</value></property>
  <property><name>mapreduce.reduce.memory.mb</name><value>1024</value></property>
  <property><name>mapreduce.map.output.compress</name><value>true</value></property>
  <property><name>mapred.local.dir</name><value>/mnt/var/lib/hadoop/mapred</value></property>
  <property><name>mapreduce.job.reduces</name><value>1</value></property>
  <property><name>mapreduce.jobhistory.webapp.address</name><value>ip-[your internal address].ec2.internal:19888</value></property>
  <property><name>yarn.app.mapreduce.am.job.task.listener.thread-count</name><value>60</value></property>
  <property><name>yarn.app.mapreduce.am.resource.mb</name><value>1024</value></property>
  <property><name>mapreduce.job.jvm.numtasks</name><value>20</value></property>
  <property><name>mapreduce.map.output.compress.codec</name><value>org.apache.hadoop.io.compress.SnappyCodec</value></property>
</configuration>
/home/hadoop/conf/yarn-site.xml(YARNの設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>yarn.nodemanager.container-executor.class</name><value>org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor</value></property>
  <property><name>yarn.resourcemanager.scheduler.client.thread-count</name><value>64</value></property>
  <property><name>yarn.nodemanager.webapp.address</name><value>0.0.0.0:9035</value></property>
  <property><name>yarn.nodemanager.container-manager.thread-count</name><value>64</value></property>
  <property><name>yarn.resourcemanager.webapp.address</name><value>ip-[your internal address].ec2.internal:9026</value></property>
  <property><name>yarn.nodemanager.vmem-pmem-ratio</name><value>5</value></property>
  <property><name>yarn.log.server.url</name><value>http://ip-[your internal address].ec2.internal:19888/jobhistory/logs/</value></property>
  <property><name>yarn.nodemanager.resource.memory-mb</name><value>2048</value></property>
  <property><name>yarn.nodemanager.resource.cpu-vcores</name><value>4</value></property>
  <property><name>yarn.nodemanager.localizer.client.thread-count</name><value>20</value></property>
  <property><name>yarn.web-proxy.address</name><value>[your internal address]:9046</value></property>
  <property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
  <property><name>yarn.resourcemanager.scheduler.class</name><value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value></property>
  <property><name>yarn.resourcemanager.admin.address</name><value>[your internal address]:9025</value></property>
  <property><name>yarn.resourcemanager.resource-tracker.address</name><value>[your internal address]:9023</value></property>
  <property><name>yarn.resourcemanager.scheduler.address</name><value>[your internal address]:9024</value></property>
  <property><name>yarn.nodemanager.localizer.fetch.thread-count</name><value>20</value></property>
  <property><name>yarn.resourcemanager.address</name><value>[your internal address]:9022</value></property>
  <property><name>yarn.application.classpath</name><value>$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*,/usr/share/aws/emr/emr-fs/lib/*,/usr/share/aws/emr/lib/*</value></property>
  <property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name><value>org.apache.hadoop.mapred.ShuffleHandler</value></property>
  <property><name>yarn.scheduler.minimum-allocation-mb</name><value>256</value></property>
  <property><name>yarn.nodemanager.localizer.address</name><value>0.0.0.0:9033</value></property>
  <property><name>yarn.nodemanager.address</name><value>0.0.0.0:9103</value></property>
  <property><name>yarn.scheduler.maximum-allocation-mb</name><value>2048</value></property>
  <property><name>yarn.resourcemanager.resource-tracker.client.thread-count</name><value>64</value></property>
  <property><name>yarn.resourcemanager.client.thread-count</name><value>64</value></property>
</configuration>

サンプルの実行

ためしに、サンプル(モンテカルロ法でパイを求める)を実行してみる。

[hadoop@ip-10-181-195-85 ~]$ hadoop jar hadoop-examples.jar pi 10 10000
Number of Maps  = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
14/06/04 04:43:56 INFO client.RMProxy: Connecting to ResourceManager at /10.181.195.85:9022
14/06/04 04:43:57 INFO input.FileInputFormat: Total input paths to process : 10
14/06/04 04:43:57 INFO mapreduce.JobSubmitter: number of splits:10
14/06/04 04:43:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1401850967351_0001
14/06/04 04:43:58 INFO impl.YarnClientImpl: Submitted application application_1401850967351_0001
14/06/04 04:43:58 INFO mapreduce.Job: The url to track the job: http://10.181.195.85:9046/proxy/application_1401850967351_0001/
14/06/04 04:43:58 INFO mapreduce.Job: Running job: job_1401850967351_0001
14/06/04 04:44:14 INFO mapreduce.Job: Job job_1401850967351_0001 running in uber mode : false
14/06/04 04:44:14 INFO mapreduce.Job:  map 0% reduce 0%
14/06/04 04:44:29 INFO mapreduce.Job:  map 10% reduce 0%
14/06/04 04:44:39 INFO mapreduce.Job:  map 20% reduce 0%
14/06/04 04:44:49 INFO mapreduce.Job:  map 30% reduce 0%
14/06/04 04:45:00 INFO mapreduce.Job:  map 40% reduce 0%
14/06/04 04:45:15 INFO mapreduce.Job:  map 50% reduce 0%
14/06/04 04:45:26 INFO mapreduce.Job:  map 60% reduce 0%
14/06/04 04:45:36 INFO mapreduce.Job:  map 70% reduce 0%
14/06/04 04:45:47 INFO mapreduce.Job:  map 80% reduce 0%
14/06/04 04:46:00 INFO mapreduce.Job:  map 90% reduce 0%
14/06/04 04:46:11 INFO mapreduce.Job:  map 100% reduce 0%
14/06/04 04:46:23 INFO mapreduce.Job:  map 100% reduce 100%
14/06/04 04:46:23 INFO mapreduce.Job: Job job_1401850967351_0001 completed successfully
14/06/04 04:46:23 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=107
		FILE: Number of bytes written=1080316
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=2690
		HDFS: Number of bytes written=215
		HDFS: Number of read operations=43
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=3
	Job Counters 
		Launched map tasks=10
		Launched reduce tasks=1
		Data-local map tasks=10
		Total time spent by all maps in occupied slots (ms)=295995
		Total time spent by all reduces in occupied slots (ms)=42416
		Total time spent by all map tasks (ms)=98665
		Total time spent by all reduce tasks (ms)=10604
		Total vcore-seconds taken by all map tasks=98665
		Total vcore-seconds taken by all reduce tasks=21208
		Total megabyte-seconds taken by all map tasks=75774720
		Total megabyte-seconds taken by all reduce tasks=10858496
	Map-Reduce Framework
		Map input records=10
		Map output records=20
		Map output bytes=180
		Map output materialized bytes=350
		Input split bytes=1510
		Combine input records=0
		Combine output records=0
		Reduce input groups=2
		Reduce shuffle bytes=350
		Reduce input records=20
		Reduce output records=0
		Spilled Records=40
		Shuffled Maps =10
		Failed Shuffles=0
		Merged Map outputs=10
		GC time elapsed (ms)=1409
		CPU time spent (ms)=10620
		Physical memory (bytes) snapshot=3488313344
		Virtual memory (bytes) snapshot=13513596928
		Total committed heap usage (bytes)=2752421888
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1180
	File Output Format Counters 
		Bytes Written=97
Job Finished in 147.258 seconds
Estimated value of Pi is 3.14120000000000000000

Name Node(HDFS)のWeb UI

http://[your master public DNS]:9101/

にアクセスすれば、以下の画面が表示される。1系とは随分見た目が違う。

以下は、HDFS上のディレクトリをブラウズしたところ。

ログも見られる。

Resource ManagerのWeb UI

Hadoopの2系では、JobTrackerではなく、ResourceManager(Application Manager)によってジョブが管理されている。Capacity Schedulerの管理UIが見られるはずなのだが、これにはハマってしまった。EC2にかかっているSecurity Groupで9100が開いていたので、てっきりそれだと思い込んだが、ポートは9026(SSHでログインし後にポートが表示されるのをすっかり忘れていた)。

なので、EC2のElasticMapreduce-masterのセキュリティーグループを変更する。

ElasticMapreduce-masterを選択し、画面下のInboundにあるEditボタンをクリックする。とりあえず、以下のように、9026を空ける。

これで以下のURLにアクセスする。

http://[your master public DNS]:9026/

以下がCapacity Schedulerの管理UI(トップ)

サンプル(モンテカルロ法でパイを求める)のジョブを表示したところ。

ログはTools(下)を展開すると出てくる。

こんな感じ。

Amazon Elastic MapReduceを使う:SSHでマスタノードにログインする。sftpでファイルを転送する。

Amazon Elastic MapReduceはEC2インスタンスで構成されている。ということは、SSHなどでログインが可能だし、クライアントから直接データやプログラムが配備できる。
AWSのサイトにも書いてあるし、(日付が古いが)「Amazon Elastic MapReduce ドキュメント」にも書いてある。

1年前の記事を参考に、現在の環境でテストしてみる。

マネージメント・コンソールでEC2を選択して、Instancesをみる。以下が、Runningしているインスタンスだけ表示した画面。

この中にマスタノードがいるわけだが、(1年前と比べて)見つけるのが簡単になっている。
以下は、Elastic MapReduceのコンソール画面だが、画面上にMaster Public DNS(Name)が表示されていて、これがマスタノードのPublic Nameとなっている。

接続する先はこのインスタンスなので、先の3つのEC2インスタンスから選択して、どんなポートが開いているか、Security Groupを調べる。

Security Groupを調べるには、上の画面の右下にある「Security Group」の「view rules」をクリックする。

(不用心な話だが)22番と80番が開いていることが分かる。9100はJobTracker、9101はHDFSの管理画面のため。

試しに、

https://[your public DNS of Master Node]:9100

としてみると、以下のようにJobTrackerが表示される。

次に、

https://[your public DNS of Master Node]:9101

とすると、HDFSの操作画面が表示される。

ただし、「Browse the filesystem」は、リンクの差し先がInternal IPを指していて、ブラウザーで表示できない。普通は、この画面からHDFS上のファイルの操作ができるが、HDFSを操作するには、SSHでログイン後行うことになる(1年前も同じだったように記憶している)。

さて、SSHでマスタノードにログインしてみる。
クライアント・マシンは、Mac Book Pro(OS X 10.9.3)を用いた。
ターミナルを開き、キーペアを指定して、以下のようにマスタノードに接続する。見ての通り、ログインにはhadoopユーザーを使用する。

ssh -i ~/.ssh/[your key-pair name] hadoop@[your public DNS of Master Node]

以下が、ログイン直後の画面。

ログインできたら、環境を軽く見ておく。
Javaは、1.7.0.40。

hadoop@ip-XX-XX-XX-XX:~$ java -version
java version "1.7.0_40"
Java(TM) SE Runtime Environment (build 1.7.0_40-b43)
Java HotSpot(TM) 64-Bit Server VM (build 24.0-b56, mixed mode)

Hadoop関連のバージョンは起動時に指定するが、hadoopのホームディレクトリ/.versionsで調べることができる。

hadoop 1.0.3
hbase 0.92
hive 0.11.0
pig 0.11.1.1

である。

ちなみに、rubyは1.8.7がインストされている。

hadoop@ip-XX-XX-XX-XX:~$ ruby -version
ruby 1.8.7 (2010-08-16 patchlevel 302)

hadoopのホームディレクトリには、hadoopの本体が入っているので、モンテカルロ法による円周率の近似計算を行ってみる。

hadoop@ip-XX-XX-XX-XX:~$ hadoop jar hadoop-examples-1.0.3.jar pi 10 10000
Number of Maps  = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
14/06/03 01:55:51 INFO mapred.JobClient: Default number of map tasks: 10
14/06/03 01:55:51 INFO mapred.JobClient: Default number of reduce tasks: 1
14/06/03 01:55:52 INFO security.ShellBasedUnixGroupsMapping: add hadoop to shell userGroupsCache
14/06/03 01:55:52 INFO mapred.JobClient: Setting group to hadoop
14/06/03 01:55:52 INFO mapred.FileInputFormat: Total input paths to process : 10
14/06/03 01:55:53 INFO mapred.JobClient: Running job: job_201406030041_0002
14/06/03 01:55:54 INFO mapred.JobClient:  map 0% reduce 0%
14/06/03 01:56:18 INFO mapred.JobClient:  map 10% reduce 0%
14/06/03 01:56:24 INFO mapred.JobClient:  map 20% reduce 0%
14/06/03 01:56:30 INFO mapred.JobClient:  map 30% reduce 0%
14/06/03 01:56:36 INFO mapred.JobClient:  map 50% reduce 0%
14/06/03 01:56:39 INFO mapred.JobClient:  map 70% reduce 0%
14/06/03 01:56:42 INFO mapred.JobClient:  map 90% reduce 0%
14/06/03 01:56:45 INFO mapred.JobClient:  map 100% reduce 0%
14/06/03 01:56:51 INFO mapred.JobClient:  map 100% reduce 30%
14/06/03 01:56:57 INFO mapred.JobClient:  map 100% reduce 100%
14/06/03 01:57:02 INFO mapred.JobClient: Job complete: job_201406030041_0002
14/06/03 01:57:02 INFO mapred.JobClient: Counters: 31
14/06/03 01:57:02 INFO mapred.JobClient:   Job Counters 
14/06/03 01:57:02 INFO mapred.JobClient:     Launched reduce tasks=1
14/06/03 01:57:02 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=115362
14/06/03 01:57:02 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/06/03 01:57:02 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/06/03 01:57:02 INFO mapred.JobClient:     Rack-local map tasks=2
14/06/03 01:57:02 INFO mapred.JobClient:     Launched map tasks=10
14/06/03 01:57:02 INFO mapred.JobClient:     Data-local map tasks=8
14/06/03 01:57:02 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=37673
14/06/03 01:57:02 INFO mapred.JobClient:   File Input Format Counters 
14/06/03 01:57:02 INFO mapred.JobClient:     Bytes Read=1180
14/06/03 01:57:02 INFO mapred.JobClient:   File Output Format Counters 
14/06/03 01:57:02 INFO mapred.JobClient:     Bytes Written=97
14/06/03 01:57:02 INFO mapred.JobClient:   FileSystemCounters
14/06/03 01:57:02 INFO mapred.JobClient:     FILE_BYTES_READ=113
14/06/03 01:57:02 INFO mapred.JobClient:     HDFS_BYTES_READ=2450
14/06/03 01:57:02 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=275211
14/06/03 01:57:02 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=215
14/06/03 01:57:02 INFO mapred.JobClient:   Map-Reduce Framework
14/06/03 01:57:02 INFO mapred.JobClient:     Map output materialized bytes=360
14/06/03 01:57:02 INFO mapred.JobClient:     Map input records=10
14/06/03 01:57:02 INFO mapred.JobClient:     Reduce shuffle bytes=360
14/06/03 01:57:02 INFO mapred.JobClient:     Spilled Records=40
14/06/03 01:57:02 INFO mapred.JobClient:     Map output bytes=180
14/06/03 01:57:02 INFO mapred.JobClient:     Total committed heap usage (bytes)=1529450496
14/06/03 01:57:02 INFO mapred.JobClient:     CPU time spent (ms)=10500
14/06/03 01:57:02 INFO mapred.JobClient:     Map input bytes=240
14/06/03 01:57:02 INFO mapred.JobClient:     SPLIT_RAW_BYTES=1270
14/06/03 01:57:02 INFO mapred.JobClient:     Combine input records=0
14/06/03 01:57:02 INFO mapred.JobClient:     Reduce input records=20
14/06/03 01:57:02 INFO mapred.JobClient:     Reduce input groups=20
14/06/03 01:57:02 INFO mapred.JobClient:     Combine output records=0
14/06/03 01:57:02 INFO mapred.JobClient:     Physical memory (bytes) snapshot=1891930112
14/06/03 01:57:02 INFO mapred.JobClient:     Reduce output records=0
14/06/03 01:57:02 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=6932238336
14/06/03 01:57:02 INFO mapred.JobClient:     Map output records=20
Job Finished in 71.073 seconds
Estimated value of Pi is 3.14120000000000000000

このとき、ブラウザーからJob Trackerを見たものが以下。PiEstimaterが実行されていることが確認できる。

ジョブを監視するときは、このWeb画面が便利である。上の画面でジョブの内容をみると、以下のように実行中のMapプロセス、Reduceプロセスなどを調べることができる。

最後に、マスタノードにsftpでデータを送ってみる。
ローカルにtext.txt(空ファイル)を作成し、以下のようにログイン後に転送する。

MacBook-Pro:~ tetsuya$ sftp -i ~/.ssh/[your key-pair name] hadoop@[public DNS Name of Master Node]
sftp> put test.txt test.txt 
Uploading test.txt to /home/hadoop/test.txt
test.txt                                      100%    0     0.0KB/s   00:00

マスタノード側で確認すると、以下のように転送されていることが確認できる。

hadoop@ip-XX-XX-XX-XX:~$ ls -al
total 100
drwxr-xr-x 15 hadoop hadoop  4096 Jun  3 01:59 .
drwxr-xr-x  3 root   root    4096 Oct  3  2013 ..
-rwxrwxr--  1 hadoop hadoop    57 Oct  3  2013 .bash_profile
-rwxrwxr--  1 hadoop hadoop   321 Oct  3  2013 .bashrc
drwx------  2 hadoop hadoop  4096 Jun  3 00:34 .ssh
drwxr-xr-x  6 hadoop hadoop  4096 Jun  3 00:59 .versions
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 01:01 bin
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 conf
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 contrib
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 etc
lrwxrwxrwx  1 hadoop hadoop    62 Jun  3 00:41 hadoop-ant-1.0.3.jar -> /home/hadoop/.versions/1.0.3/share/hadoop/hadoop-ant-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    33 Jun  3 00:41 hadoop-ant.jar -> /home/hadoop/hadoop-ant-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    65 Jun  3 00:41 hadoop-client-1.0.3.jar -> /home/hadoop/.versions/1.0.3/share/hadoop/hadoop-client-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    63 Jun  3 00:41 hadoop-core-1.0.3.jar -> /home/hadoop/.versions/1.0.3/share/hadoop/hadoop-core-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    34 Jun  3 00:41 hadoop-core.jar -> /home/hadoop/hadoop-core-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    67 Jun  3 00:41 hadoop-examples-1.0.3.jar -> /home/hadoop/.versions/1.0.3/share/hadoop/hadoop-examples-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    38 Jun  3 00:41 hadoop-examples.jar -> /home/hadoop/hadoop-examples-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    70 Jun  3 00:41 hadoop-minicluster-1.0.3.jar -> /home/hadoop/.versions/1.0.3/share/hadoop/hadoop-minicluster-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    64 Jun  3 00:41 hadoop-tools-1.0.3.jar -> /home/hadoop/.versions/1.0.3/share/hadoop/hadoop-tools-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    35 Jun  3 00:41 hadoop-tools.jar -> /home/hadoop/hadoop-tools-1.0.3.jar
lrwxrwxrwx  1 hadoop hadoop    34 Jun  3 00:59 hive -> /home/hadoop/.versions/hive-0.11.0
drwxr-xr-x  3 hadoop hadoop 12288 Jun  3 01:01 lib
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 lib64
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 libexec
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 native
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 sbin
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 templates
-rw-r--r--  1 hadoop hadoop     0 Jun  3 01:59 test.txt
drwxr-xr-x  2 hadoop hadoop  4096 Jun  3 00:41 webapps

これをHDFS上にコピーし、HDFSのlsコマンドを使ってみる。

hadoop@ip-XX-XX-XX-XX:~$ hadoop fs -copyFromLocal test.txt .
hadoop@ip-XX-XX-XX-XX:~$ hadoop fs -ls .
Found 1 items
-rw-r--r--   1 hadoop supergroup          0 2014-06-03 02:01 /user/hadoop/test.txt

まとめると、

  • SSHは(一年前と同様)利用可能
  • sftpも利用可能
  • javaのバージョンはJava7

となっている。

AWS Identity and Access Managementに移行する。

以前より、アカウントのSecurity Credentialsにアクセスすると、「AWS Identity and Access Managementに移行せよ」とか、Secret Access Keyを見ようとすると「見られなくなるよ」とか出てくるのは分かってたんだけど、放っておいた。例えば、Security Credintialsにアクセスすると、以下のような画面がでる。

当初、認証系の統合ツールがなかったので追加されたのだろうか?
細かい設定ができそうだが、

  • 1つのアカウントで(権限の異なる)複数のユーザーが作成できる。
  • Access Key、Secret Access Key、X.509証明書などの管理をユーザーごとにできる(AWS Multi-Factor Authenticationというのも使えるらしい)。
  • Secret Access Keyは生成時に表示・ダウンロードできるが、無くした場合、サイドAccess Keyを作らなくてはならない。
  • 今後は、IAMを通してログインしなくてはならない。

というところが違う(ような雰囲気)。

認証を移行することが目的なので、どんどん進めて行く。
手順としては、

  • ユーザーの作成
  • グループの作成と権限の設定
  • ユーザーへのグループのアサイ

という感じ。


以下は、IAMにサインアップ後の画面。

まず。ユーザーを追加してしまう。左メニューペインにあるUsersをクリックする。

ここで「Create User」ボタンをクリックすると、以下の画面で「ユーザー名(IAMでのログインネーム)」を設定する。デフォルトで、Access Keyを同時に追加する、となっているので、特別必要がなければ、「Create」する。

すると、Userが追加できたことを知らせる画面に遷移する。
ここで、よーく見ると、「Security Credentialがダウンロードできるのは、これが最後だ」と書いてある。

ここで、Downloadすると、cvs形式でAccess KeyとSecret Access Keyが落ちてくる。もしくは、上の画面で「Show User Security Credential」をクリックすると、以下のようにkeyが表示される。

いずれにしても、Secret Access Keyを見られるはこれが最後なので、大切に保管しておく。無くしてしまうと、Access Keyから再作成しなければならず、かなり痛い(後述)。

ユーザーが登録できたら、パスワードを設定しておく。Userの一覧を表示して、パスワードを設定したいユーザーを設定する。ユーザーを作成して状態だと、PasswordがNoになっている。

Manage Passwordをクリックして、Passwordを設定する。自分でパスワードを決めたい場合には、「Assign a custom password」を選択すると設定するためのフィールドが表示される。

パスワードポリシーも設定できる。

このままだと、今登録したユーザーでは重要な機能を使う事ができない。なので、グループを作成し、Admin権限を付与し、ユーザーをそのグループに紐づける。

Groupsの画面で「Create New Group」をクリックする。最初にグループの名を入力する。

次に、Permissionを設定するが、テンプレートが用意されているので、そこから適当に選ぶことができる。

ここで設定したパーミッションは、次の画面でカスタマイズすることができる。設定はjson形式になっているようだ。

次に、(先に作成した)ユーザーを紐づける。

あとは、画面にしたがえばいい。

IAMを通して、AWSにログインするには、

  • (アカウントに与えられた)URLにアクセスする。
  • AWSのマネージメント・コンソールを開いて)サービスの一覧からIAMを選択する。

の方法がある。

前者は、IAMのダッシュボードの下に表記(IAM users sign-in link)されているので、ブックマークしておけばよい。

どちらの場合も、以下のログイン画面が出るので、登録したユーザーでログインする。

これで使える状態になるので、忘れないように、ローカルにあるAccess KeyとSecret Access Keyを含んだ設定ファイルを書き換える。
移行した感じでは、EC2やS3は新しいユーザーえログインしても環境が引き継がれていたが、EMRは新しいユーザーとして認識されてしまった。
いずれにしても、早めにIAMに移行してしまった方が良さそう。

最後に、Secret Access Keyを無くした、忘れた場合について、備忘を残しておく。
先にも書いたが、この場合、Access Key を作り直さなくてはいけない。下図は、AccessKeyを二つ持つユーザーの属性を表示したところ(Access Keyはユーザー当たり2つまで登録できる)。

ここで、Manage Access Keyをクリックする。

Secret Access Keyを無くしてしまったAccess Keyを削除し、その後、Create Access Keyをすれば、新しいAccess KeyとSecret Access Keyを取得できる。

最初、なくしたらどうするんだろう、と思いましたが、「Where's my secret access key?」に助けてもらいました。

Amazon Elastic MapReduceを使う:Rubyクライアント・ツールを使う。

Ruby製のクライアント・ツールのダウンロード、ジョブフローの作成(クラスタの作成)、ステップの追加、ジョブフローの停止を行う。
基本的に1年前の記事を追って行う。

クライアントは、Mac Book Pro(OSX 10.9.3)。Rubyのバージョンは以下。

バージョン 2.0.0p451 (2014-02-24 revision 45167) [universal.x86_64-darwin13]

クライアントツールのダウンロードとクライアントの設定

Amazon Web ServicesDeveloper Tool>Amazon Elastic MapReduce Ruby Clientからダウンロードする。

ダウンロードしたら、ホームディレクトリにおき、.bashrcに以下を追加してパスを通す。

PATH=$HOME/elastic-mapreduce-ruby:$PATH

次に、.credentials.jsonを作成して、以下の内容を記載する。(以下では、リージョンとしてUSA-Standard(us-east-1=N.Virginia)を使っている)

{
'access-id':'[your access id]',
'private-key':'[your private key]',
'keypair':'[your key pair file name]',
'key-pair-file':'[your key pair file name].pem',
'region':'us-east-1',
'log-uri':'s3n://[your bucket]/[log directory name]'
}

ジョブフロー(クラスタ)の作成

以下の構成でクラスタを作成する。

Node Instance Type Quantity
マスタ m1.small 1
コア m1.small 1
タスク m1.small 1

クラスタの名前は、Spot Clusterとし、ステップを後で追加するためWaitingの状態にしておく。

ターミナル(Mac Book)で以下のように入力し、実行。

elastic-mapreduce --create --alive --name Spot-Cluster --instance-group master --instance-type m1.small --instance-count 1 --instance-group core --instance-type m1.small --instance-count 1 --instance-group task --instance-type m1.small --instance-count 1

すると、j-XXXXXXXXXXというJob Flow IDが戻る。
マネージメント・コンソールからも、以下のようにチャンと見える。

ステップの追加:Word Count のサンプルを実行する。

Word Countのサンプルを実行するステップを追加する。
以下のコマンドにより、Job Flow IDを指定して、ステップを追加・実行する。

elastic-mapreduce --stream --step-name WordCount --input s3n://elasticmapreduce/samples/wordcount/input --output s3n://[your bucket]/[your directory] --mapper s3n://elasticmapreduce/samples/wordcount/wordSplitter.py --reducer aggregate --jobflow j-XXXXXXXXXXXXX

以下が、Word Count実行中のコンソール画面。

ステップ名(にあるリンクを)をクリックすると、ステップの詳細を見る事ができる。

処理が終わったら、今度は「outputディレクトリを変えずにステップを追加し」、ジョブを失敗させる。MapReduceジョブでは出力先フォルダーを1つのジョブ(ステップ)につき1つ用意しなくてはならず、よくFailedが帰ってきてがっかりする。

ジョブが失敗すると、上のように赤くFailedと表示される。少し待つと、log filesのカラムにsyslog、stderr、stdoutと表示されて、ログファイルを確認できるようになる(少々じれったい)。
以下は、syslogを表示したもの。URLをみると、S3のURLになっている(=S3にログが転送されるまで、ログの参照を待つ必要がある)。
output directory already existedとなっているのが分かる。

クラスタの終了

Job Flow IDを指定し、以下のようにコマンドを入れると、クラスタ(ジョブフロー)が終了する。

elastic-mapreduce --terminate --jobflow j-XXXXXXXXXXXXX

マネージメント・コンソールを見ると、以下のようにステータスがTerminatingとなり、やがて終了する(ステータスがTerminatedになる)。

Amazon Elastic MapReduceを使う:マネージメント・コンソールでクラスタのクローンを作成する。

前回の記事では、Amazon Elastic MapReduceでWord Countサンプルを実行後、クラスタを停止した。

今回は、そのクローンを作成し、Word Countを再実行する。1回実行したジョブを再実行するのに便利な機能。

Amazon Web Servicesにログインし、Elastic MapReduceを選択すると、過去に作成したクラスタのリストをみることができる。以下は、My cluster。

左端のチェックボックスをチェックすると、画面上のCloneボタンが活性化するので、ボタンをクリックする。すると、ステップもコピーするか?と聞いてくる。

Yesを選択し、ステップ(Word Count)をコピーする。ただし、前回、サンプル実行時、定義を間違ってFailedとなったステップがあったので、これらは削除する(後述)ことにする。
クラスタの属性を変更するため、以下の画面に遷移する。クラスタ名をMy cluster1とする。

Stepsまで、下にスクロールしたのが以下。コピーしたステップの編集・削除ができる。

Failedとなったステップを削除し、残したステップについてoutput先を変更する。

Create Clusterボタンを押すと、クラスターのコピーが完了する。

以下は、上の画面からCluster Listをクリックした画面。新しいクラスタが生成されていることが確認できる。JOD IDが新しく取得されるので注意すること。

My cluster1のDetailをみると、デフォルトで起動する3つのステップ(Hadoop環境の構築、Pig・Hiveの配備)に加え、上でコピーしたWord Countのステップが開始待ちになっていることが確認できる。

しばらくすると、ステップが全て完了する。(StatusがCompletedとなり、Word Countも終了する)

以下は、S3に保存されたクラスタのログ。実行した4ステップ分のログが格納される。

実行結果は上で指定したS3のバケット/フォルダーに保管される。