Amazon Elastic MapReduceを使う:Hadoop2.4でサンプルを実行してみる。

今回から(覚悟を決めて)Hadoop2のAMIを使ってみる。
Hadoop2系は、昨年の秋、バージョン0.23から2.2というバージョン番号が付与されたプロダクト(Hadoop1系は0.20からメジャーバージョンが付与された)。YARN(Yet Another Resource Negotiator)という分散処理のフレームワークに乗っかっていて、1系とは大きくアーキテクチャが違っている。「MapReduceはその中の1つにすぎない」ということになっている。
MapReduce1もサポートされているようなので少し安心。

2系についてのお勉強には、以下のサイトを参考にさせていただいた(象本が新しくなっているのに今更気がついたので、今度のお小遣いで買わなくちゃ)。

  1. 「Hadoop 2」データ処理とサービスの同時実行が可能な安定版が公開 ITPro
  2. MR2とYARNの手短な解説(cloudera)
  3. Hadoop2 Going beyound MapReduce
  4. 完全分散モードの設計メモとYARNの仕様(Open Groove)
  5. Hadoop0.23 YARN

Amazon Elastic MapReduceでは、1系はHadoop1.0.3だけが選択できて(0.20も可能)、2系の方がAMIが充実している(2系を使ってみる理由)。
2.2以上は、選択できるインスタンス・サイズがmidium以上になっている。節約のため、スポットインスタンスを使う。
スポットインスタンスは、「Amazon Web Services>購入オプション>Amazon EC2 スポットインスタンス」で参照できる。使用するリージョンで適宜調べる。自分の場合、us-east(N.Virginia)を使う。

Hadoop2.4のAMIにSSHで接続する。

クラスタを起動して、SSHでログイン(ユーザー:hadoop)し、ls-alした結果は以下。

[hadoop@ip-10-181-195-85 ~]$ ls -al
合計 36
drwxr-xr-x 6 hadoop hadoop 4096  6月  4 03:01 .
drwxr-xr-x 4 root   root   4096  5月 13 18:24 ..
-rwxrwxr-- 1 hadoop hadoop   94  5月 13 18:25 .bash_profile
-rwxrwxr-- 1 hadoop hadoop 1558  5月 13 18:27 .bashrc
drwx------ 2 hadoop hadoop 4096  6月  4 03:01 .ssh
drwxr-xr-x 9 hadoop hadoop 4096  5月 13 18:27 .versions
lrwxrwxrwx 1 root   root     40  5月 13 18:27 Cascading-2.5-SDK -> /home/hadoop/.versions/Cascading-2.5-SDK
lrwxrwxrwx 1 hadoop hadoop   32  5月 13 18:24 bin -> /home/hadoop/.versions/2.4.0/bin
lrwxrwxrwx 1 hadoop hadoop   39  5月 13 18:24 conf -> /home/hadoop/.versions/2.4.0/etc/hadoop
drwxr-xr-x 3 hadoop hadoop 4096  5月 13 18:24 contrib
lrwxrwxrwx 1 hadoop hadoop   32  5月 13 18:24 etc -> /home/hadoop/.versions/2.4.0/etc
lrwxrwxrwx 1 hadoop hadoop   71  5月 13 18:24 hadoop-examples.jar -> /home/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.0.jar
lrwxrwxrwx 1 root   root     34  5月 13 18:27 hive -> /home/hadoop/.versions/hive-0.11.0
drwxr-xr-x 2 hadoop hadoop 4096  5月 13 18:27 lib
lrwxrwxrwx 1 hadoop hadoop   36  5月 13 18:24 libexec -> /home/hadoop/.versions/2.4.0/libexec
lrwxrwxrwx 1 hadoop hadoop   33  5月 13 18:25 mahout -> /home/hadoop/.versions/mahout-0.9
lrwxrwxrwx 1 root   root     33  5月 13 18:27 pig -> /home/hadoop/.versions/pig-0.12.0
lrwxrwxrwx 1 hadoop hadoop   33  5月 13 18:24 sbin -> /home/hadoop/.versions/2.4.0/sbin
lrwxrwxrwx 1 hadoop hadoop   34  5月 13 18:24 share -> /home/hadoop/.versions/2.4.0/share

Cascading SDKホームページ)の2.5(最新バージョン)が配備されている。
Cascadingは、MapReduceのパイプラインを構築するライブラリー。サイバーエージェントの方がとても親切なPigとの比較レポートを公開してくださっている(こちら)。

Apache Mahout(0.9)もセットアップされている。

javaのバージョンは、1.7.0。

[hadoop@ip-XX-XX-XX-XX ~]$ java -version
java version "1.7.0_60-ea"
Java(TM) SE Runtime Environment (build 1.7.0_60-ea-b02)
Java HotSpot(TM) 64-Bit Server VM (build 24.60-b04, mixed mode)

rubyのバージョンは2.0.0。hadoop1.0.3のAMIでは、ruby1.8.7がインストされていた。s3syncがruby1.9.2以上を要求するので、こちらの方が都合がよさそう。

[hadoop@ip-XX-XX-XX-XX ~]$ ruby -v
ruby 2.0.0p451 (2014-02-24 revision 45167) [x86_64-linux]

Hadoop2.4のAMIのコンフィギュレーション

以下は、m1.mediumのインスタンス2つ(マスタ、コア)のクラスタを起動した場合。2系からyarn-site.xmlが追加。

以下に、hadoopの設定ファイルの内容を列記しておく。Hadoop1と同じ3つの設定ファイルに加えて、yarn-site.xmlが追加されてる。

/home/hadoop/conf/core-site.xmlhadoop全体の設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>hadoop.proxyuser.hadoop.groups</name><value>*</value></property>
  <property><name>fs.s3n.impl</name><value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value></property>
  <property><name>fs.s3.impl</name><value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value></property>
  <property><name>fs.default.name</name><value>hdfs://[your internal address]:9000</value></property>
  <property><name>hadoop.tmp.dir</name><value>/mnt/var/lib/hadoop/tmp</value></property>
  <property><name>fs.s3n.awsSecretAccessKey</name><value>[your was secret access key]</value></property>
  <property><name>fs.s3n.awsAccessKeyId</name><value>[your was access key]</value></property>
  <property><name>fs.s3.buffer.dir</name><value>/mnt/var/lib/hadoop/s3</value></property>
  <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value></property>
  <property><name>fs.s3bfs.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
  <property><name>hadoop.metrics.defaultFile</name><value>/home/hadoop/conf/hadoopDefaultMetricsList</value></property>
  <property><name>fs.s3.awsSecretAccessKey</name><value>[your was secret access key]</value></property>
  <property><name>hadoop.proxyuser.hadoop.hosts</name><value>*</value></property>
  <property><name>fs.s3bfs.awsAccessKeyId</name><value>[your was access key]</value></property>
  <property><name>fs.s3bfs.awsSecretAccessKey</name><value>[your was secret access key]</value></property>
  <property><name>hadoop.metrics.list</name><value>TotalLoad,CapacityTotalGB,UnderReplicatedBlocks,CapacityRemainingGB,PendingDeletionBlocks,PendingReplicationBlocks,CorruptBlocks,CapacityUsedGB,numLiveDataNodes,numDeadDataNodes,MissingBlocks</value></property>
  <property><name>io.compression.codec.lzo.class</name><value>com.hadoop.compression.lzo.LzoCodec</value></property>
  <property><name>fs.s3.awsAccessKeyId</name><value>[your was access key]</value></property>
</configuration>
/home/hadoop/conf/hdfs-site.xmlHDFSの設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>dfs.datanode.max.xcievers</name><value>4096</value></property>
  <property><name>dfs.datanode.https.address</name><value>0.0.0.0:9402</value></property>
  <property><name>dfs.datanode.du.reserved</name><value>536870912</value></property>
  <property><name>dfs.namenode.handler.count</name><value>64</value></property>
  <property><name>io.file.buffer.size</name><value>65536</value></property>
  <property><name>dfs.block.size</name><value>134217728</value></property>
  <property><name>dfs.data.dir</name><value>/mnt/var/lib/hadoop/dfs</value></property>
  <property><name>dfs.secondary.http.address</name><value>0.0.0.0:9104</value></property>
  <property><name>dfs.replication</name><value>1</value></property>
  <property><name>dfs.http.address</name><value>0.0.0.0:9101</value></property>
  <property><name>dfs.https.address</name><value>0.0.0.0:9202</value></property>
  <property><name>dfs.datanode.http.address</name><value>0.0.0.0:9102</value></property>
  <property><name>dfs.datanode.address</name><value>0.0.0.0:9200</value></property>
  <property><name>dfs.name.dir</name><value>/mnt/var/lib/hadoop/dfs-name</value></property>
  <property><name>dfs.webhdfs.enabled</name><value>true</value></property>
  <property><name>dfs.datanode.ipc.address</name><value>0.0.0.0:9201</value></property>
</configuration>
/home/hadoop/conf/mapred-site.xmlMapReduceの設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>mapred.output.committer.class</name><value>org.apache.hadoop.mapred.DirectFileOutputCommitter</value></property>
  <property><name>mapreduce.task.io.sort.mb</name><value>200</value></property>
  <property><name>mapreduce.reduce.cpu.vcores</name><value>2</value></property>
  <property><name>mapred.output.direct.EmrFileSystem</name><value>true</value></property>
  <property><name>mapreduce.task.io.sort.factor</name><value>48</value></property>
  <property><name>mapreduce.jobhistory.address</name><value>ip-[your internal address].ec2.internal:10020</value></property>
  <property><name>mapreduce.job.userlog.retain.hours</name><value>48</value></property>
  <property><name>mapreduce.framework.name</name><value>yarn</value></property>
  <property><name>mapreduce.reduce.java.opts</name><value>-Xmx768m</value></property>
  <property><name>mapreduce.map.java.opts</name><value>-Xmx512m</value></property>
  <property><name>mapreduce.reduce.shuffle.parallelcopies</name><value>20</value></property>
  <property><name>mapreduce.map.memory.mb</name><value>768</value></property>
  <property><name>hadoop.job.history.user.location</name><value>none</value></property>
  <property><name>mapreduce.reduce.speculative</name><value>true</value></property>
  <property><name>mapreduce.application.classpath</name><value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,/usr/share/aws/emr/emr-fs/lib/*,/usr/share/aws/emr/lib/*</value></property>
  <property><name>mapreduce.map.cpu.vcores</name><value>1</value></property>
  <property><name>mapreduce.map.speculative</name><value>true</value></property>
  <property><name>mapred.output.direct.NativeS3FileSystem</name><value>true</value></property>
  <property><name>mapreduce.job.maps</name><value>2</value></property>
  <property><name>mapreduce.reduce.memory.mb</name><value>1024</value></property>
  <property><name>mapreduce.map.output.compress</name><value>true</value></property>
  <property><name>mapred.local.dir</name><value>/mnt/var/lib/hadoop/mapred</value></property>
  <property><name>mapreduce.job.reduces</name><value>1</value></property>
  <property><name>mapreduce.jobhistory.webapp.address</name><value>ip-[your internal address].ec2.internal:19888</value></property>
  <property><name>yarn.app.mapreduce.am.job.task.listener.thread-count</name><value>60</value></property>
  <property><name>yarn.app.mapreduce.am.resource.mb</name><value>1024</value></property>
  <property><name>mapreduce.job.jvm.numtasks</name><value>20</value></property>
  <property><name>mapreduce.map.output.compress.codec</name><value>org.apache.hadoop.io.compress.SnappyCodec</value></property>
</configuration>
/home/hadoop/conf/yarn-site.xml(YARNの設定)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property><name>yarn.nodemanager.container-executor.class</name><value>org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor</value></property>
  <property><name>yarn.resourcemanager.scheduler.client.thread-count</name><value>64</value></property>
  <property><name>yarn.nodemanager.webapp.address</name><value>0.0.0.0:9035</value></property>
  <property><name>yarn.nodemanager.container-manager.thread-count</name><value>64</value></property>
  <property><name>yarn.resourcemanager.webapp.address</name><value>ip-[your internal address].ec2.internal:9026</value></property>
  <property><name>yarn.nodemanager.vmem-pmem-ratio</name><value>5</value></property>
  <property><name>yarn.log.server.url</name><value>http://ip-[your internal address].ec2.internal:19888/jobhistory/logs/</value></property>
  <property><name>yarn.nodemanager.resource.memory-mb</name><value>2048</value></property>
  <property><name>yarn.nodemanager.resource.cpu-vcores</name><value>4</value></property>
  <property><name>yarn.nodemanager.localizer.client.thread-count</name><value>20</value></property>
  <property><name>yarn.web-proxy.address</name><value>[your internal address]:9046</value></property>
  <property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
  <property><name>yarn.resourcemanager.scheduler.class</name><value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value></property>
  <property><name>yarn.resourcemanager.admin.address</name><value>[your internal address]:9025</value></property>
  <property><name>yarn.resourcemanager.resource-tracker.address</name><value>[your internal address]:9023</value></property>
  <property><name>yarn.resourcemanager.scheduler.address</name><value>[your internal address]:9024</value></property>
  <property><name>yarn.nodemanager.localizer.fetch.thread-count</name><value>20</value></property>
  <property><name>yarn.resourcemanager.address</name><value>[your internal address]:9022</value></property>
  <property><name>yarn.application.classpath</name><value>$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*,/usr/share/aws/emr/emr-fs/lib/*,/usr/share/aws/emr/lib/*</value></property>
  <property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name><value>org.apache.hadoop.mapred.ShuffleHandler</value></property>
  <property><name>yarn.scheduler.minimum-allocation-mb</name><value>256</value></property>
  <property><name>yarn.nodemanager.localizer.address</name><value>0.0.0.0:9033</value></property>
  <property><name>yarn.nodemanager.address</name><value>0.0.0.0:9103</value></property>
  <property><name>yarn.scheduler.maximum-allocation-mb</name><value>2048</value></property>
  <property><name>yarn.resourcemanager.resource-tracker.client.thread-count</name><value>64</value></property>
  <property><name>yarn.resourcemanager.client.thread-count</name><value>64</value></property>
</configuration>

サンプルの実行

ためしに、サンプル(モンテカルロ法でパイを求める)を実行してみる。

[hadoop@ip-10-181-195-85 ~]$ hadoop jar hadoop-examples.jar pi 10 10000
Number of Maps  = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
14/06/04 04:43:56 INFO client.RMProxy: Connecting to ResourceManager at /10.181.195.85:9022
14/06/04 04:43:57 INFO input.FileInputFormat: Total input paths to process : 10
14/06/04 04:43:57 INFO mapreduce.JobSubmitter: number of splits:10
14/06/04 04:43:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1401850967351_0001
14/06/04 04:43:58 INFO impl.YarnClientImpl: Submitted application application_1401850967351_0001
14/06/04 04:43:58 INFO mapreduce.Job: The url to track the job: http://10.181.195.85:9046/proxy/application_1401850967351_0001/
14/06/04 04:43:58 INFO mapreduce.Job: Running job: job_1401850967351_0001
14/06/04 04:44:14 INFO mapreduce.Job: Job job_1401850967351_0001 running in uber mode : false
14/06/04 04:44:14 INFO mapreduce.Job:  map 0% reduce 0%
14/06/04 04:44:29 INFO mapreduce.Job:  map 10% reduce 0%
14/06/04 04:44:39 INFO mapreduce.Job:  map 20% reduce 0%
14/06/04 04:44:49 INFO mapreduce.Job:  map 30% reduce 0%
14/06/04 04:45:00 INFO mapreduce.Job:  map 40% reduce 0%
14/06/04 04:45:15 INFO mapreduce.Job:  map 50% reduce 0%
14/06/04 04:45:26 INFO mapreduce.Job:  map 60% reduce 0%
14/06/04 04:45:36 INFO mapreduce.Job:  map 70% reduce 0%
14/06/04 04:45:47 INFO mapreduce.Job:  map 80% reduce 0%
14/06/04 04:46:00 INFO mapreduce.Job:  map 90% reduce 0%
14/06/04 04:46:11 INFO mapreduce.Job:  map 100% reduce 0%
14/06/04 04:46:23 INFO mapreduce.Job:  map 100% reduce 100%
14/06/04 04:46:23 INFO mapreduce.Job: Job job_1401850967351_0001 completed successfully
14/06/04 04:46:23 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=107
		FILE: Number of bytes written=1080316
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=2690
		HDFS: Number of bytes written=215
		HDFS: Number of read operations=43
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=3
	Job Counters 
		Launched map tasks=10
		Launched reduce tasks=1
		Data-local map tasks=10
		Total time spent by all maps in occupied slots (ms)=295995
		Total time spent by all reduces in occupied slots (ms)=42416
		Total time spent by all map tasks (ms)=98665
		Total time spent by all reduce tasks (ms)=10604
		Total vcore-seconds taken by all map tasks=98665
		Total vcore-seconds taken by all reduce tasks=21208
		Total megabyte-seconds taken by all map tasks=75774720
		Total megabyte-seconds taken by all reduce tasks=10858496
	Map-Reduce Framework
		Map input records=10
		Map output records=20
		Map output bytes=180
		Map output materialized bytes=350
		Input split bytes=1510
		Combine input records=0
		Combine output records=0
		Reduce input groups=2
		Reduce shuffle bytes=350
		Reduce input records=20
		Reduce output records=0
		Spilled Records=40
		Shuffled Maps =10
		Failed Shuffles=0
		Merged Map outputs=10
		GC time elapsed (ms)=1409
		CPU time spent (ms)=10620
		Physical memory (bytes) snapshot=3488313344
		Virtual memory (bytes) snapshot=13513596928
		Total committed heap usage (bytes)=2752421888
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1180
	File Output Format Counters 
		Bytes Written=97
Job Finished in 147.258 seconds
Estimated value of Pi is 3.14120000000000000000

Name Node(HDFS)のWeb UI

http://[your master public DNS]:9101/

にアクセスすれば、以下の画面が表示される。1系とは随分見た目が違う。

以下は、HDFS上のディレクトリをブラウズしたところ。

ログも見られる。

Resource ManagerのWeb UI

Hadoopの2系では、JobTrackerではなく、ResourceManager(Application Manager)によってジョブが管理されている。Capacity Schedulerの管理UIが見られるはずなのだが、これにはハマってしまった。EC2にかかっているSecurity Groupで9100が開いていたので、てっきりそれだと思い込んだが、ポートは9026(SSHでログインし後にポートが表示されるのをすっかり忘れていた)。

なので、EC2のElasticMapreduce-masterのセキュリティーグループを変更する。

ElasticMapreduce-masterを選択し、画面下のInboundにあるEditボタンをクリックする。とりあえず、以下のように、9026を空ける。

これで以下のURLにアクセスする。

http://[your master public DNS]:9026/

以下がCapacity Schedulerの管理UI(トップ)

サンプル(モンテカルロ法でパイを求める)のジョブを表示したところ。

ログはTools(下)を展開すると出てくる。

こんな感じ。