EclipseでHadoop2.4の開発環境を作る&ワードカウントのプログラムを作成する。

前回に引き続き、使用するHadoopのバージョンは2.4。1年前の記事をもとに、Eclipseで開発環境を作る。

開発を行うクライアントは、MacBook Pro (OSX 10.9.3)。クライアントのJavaのバージョンは以下。

MacBook-Pro:~ tetsuya$ java -version
java version "1.6.0_65"
Java(TM) SE Runtime Environment (build 1.6.0_65-b14-462-11M4609)
Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-462, mixed mode)

ダウンロード

hadoopのダウンロードページから、hadoop-2.4.0.tar.gzをダウンロードする。

クライアントの設定

ダウンロードしたhadoop-2.4.0.tar.gzをホームに解凍し、hadoopというシンボリックリンクを張る。

lrwxr-xr-x    1 tetsuya  staff        13  6  7 11:50 hadoop -> hadoop-2.4.0/
drwxr-xr-x@  12 tetsuya  staff       408  3 31 18:15 hadoop-2.4.0

ワードカウントのプログラムを作成する。

プロジェクトの作成と外部JARの登録

新しくJavaプロジェクトを作成する。
プロパティーJavaのビルド・パスに、「Add External JARS」で~/hadoop/share/hadoop下に含まれるjarファイルを登録する。

追加するjarファイルが存在するディレクトリは以下。

  • common
  • common/lib
  • tools/lib(common/libとjarがだぶってる)
  • hdfs
  • mapreduce
  • yarn
クラスの作成:ワードカウントのプログラムの作成。

ワードカウントを行うJavaプログラムを作成する。

やって見た感じでは、MapReduceの処理そのものはそのまま動くし、殆ど変わらない。
以下に具体的にEclipseでの開発手順とプログラム、実行方法と結果を示す。


新規にパッケージを作成して、クラス(FQCN=com.tetsuyaodaka.WordCount)を追加する。


下に示すプログラムは、至る所で見られる代物で、次のロジックになっている。

  • Mapperでは、スペース区切りの文書を単語(ワード)に分け、(ワード,1)というキーバリューに分ける。
  • Reducerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。

上のロジックは、以下と書いても同じなので、ReduceクラスをCombinerとして使う。

  • Mapperでは、スペース区切りの文書を単語(ワード)に分け、(ワード,1)というキーバリューに分ける。
  • Combinerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。
  • Reducerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。

CombinerはMapperの後に(Mapperと同一のマシンで)処理されるが、処理が実行される保証はない。

/*
/*
                                 Apache License
                           Version 2.0, January 2004
                        http://www.apache.org/licenses/
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.

 */
package com.tetsuyaodaka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;

/**
 * Hadoop2.4.0のMapReduceでワードカウントする。
 * 
 * author: tetsuya.odaka@gmail.com
 * 
 */

@SuppressWarnings("unused")
public class WordCount{

	/*
	 * Map
	 * Text Fileを読んで、スペースで区切って、(ワード,1)で書き出す。
	 * (注意)	Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、
	 *			必ず、LongWritableにする。
	 */
	public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{
    	private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

		private String mapTaskId;
		private String inputFile;
		private int noRecords = 0;
        
		/*
		 * Mapperの初期化
		 */
        public void setup(Context context) {
            mapTaskId = MRJobConfig.TASK_ATTEMPT_ID;
            inputFile = MRJobConfig.MAP_INPUT_FILE;
          }

        /*
         * Mapper本体
         * 
         * 入力ファイルから1行ずつ読み込んで処理を行う。
         */
        public void map(LongWritable key, Text value, Context context) 
        		throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tk = new StringTokenizer(line);
            while(tk.hasMoreTokens()){
            	++noRecords;
                word.set(tk.nextToken());
                context.write(word, one);
            }
        }
    }

	/*
	 * Reduce
	 * (ワード,1)を集約して、ワードの出現回数を出力する。
	 * 
	 * 注記:
	 * Combinerでも使う。
	 */
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    	private String reduceTaskId;
		private int noKeys = 0;
        
		/*
		 * Reducerの初期化
		 * 
		 */
        public void setup(Context context) {
        	reduceTaskId = MRJobConfig.TASK_ATTEMPT_ID;
          }
        
        /*
         * Reducer本体
         * 
         * 同一キーの値(value)がItarableに入って、処理が開始。
         * Comberで、Wordに対してカウントをとっても大丈夫なので、Combinerとしても使う。
         */
        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        		throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable value: values){
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
            ++noKeys;
        }
    }

    public static void main(String[] args) throws Exception {

        // プログラムの開始時刻をstdoutに書く。
    	Date date = new Date();
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        System.out.println("Program Start: "+sdf.format(date));

        // Jobインスタンスの生成
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCount.class);
        
        // Mapper, Combiner, Reducerの定義
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);

        // OutPutKey, OutPutValueの定義
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 入力・出力ファイルのフォルダー定義:S3バケットからとる。
        // インプラントフォルダーは引数からとる。
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // アウトプットフォルダーは動的に作成する。
        Long dTmp = Math.round(Math.random()*100000);
        String subDir = args[1]+"/"+dTmp.toString();
        FileOutputFormat.setOutputPath(job, new Path(subDir));
        System.out.println("Output folder is "+subDir);

        // 入力・出力ファイルの形式の定義
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // ジョブの実行
        boolean success = job.waitForCompletion(true);
        if(success){
        	System.out.println("MapReduce Job endded successfully.");
        }else{
        	System.out.println("Error! MapReduce Job abnormally endded.");
        }

        // プログラムの終了時刻をstdoutに書く。
        date = new Date();
        System.out.println("Program END: "+sdf.format(date));
    }
}


1.0.3の時に使っていたorg.apache.hadoop.mapreduce.Jobのコンストラクタが全てdeprecatedになっている(APIドキュメント)になっているので、staticなFactoryを使うように修正した。

実行時には、

  1. 入力テキストを保持するフォルダー
  2. 出力を保持するフォルダー

を指定する。

/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/input /Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/output

入力を保持するフォルダーには、スペース区切りのテキストを突っ込んでおけば良い。出力フォルダーを作成しておくと、「File Already Exists」というIOExceptionがでる。なので、この配下に、乱数を使ってフォルダーを作成するようにする。(このIOExceptionは、「上書きでせっかく作ったデータが消えるのを防止するため」と象本に書いてある)

以下が、プロジェクトのフォルダー構成。

クラスの実行:ワードカウントのプログラムの実行。

サンプルを実行してみる。サンプルには、USA Today紙の記事(3-alarm fire near Rockefeller Center)を使ってみた。

この記事をコピペして、入力フォルダーにinput.txtとして保存する。これで、Javaアプリケーションの実行をすれば動くのだが、log4jのappenderがないよと言って来たり、Macのセキュリティーエラーが出たりするので(こちらの記事を参考にさせていただいた)、Dパラ(VM Arguments)を以下のように設定しておく。

-Xmx1024m 
-Djava.security.krb5.realm=OX.AC.UK 
-Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk 
-Dlog4j.configuration="file:/Users/tetsuya/hadoop/etc/hadoop/log4j.properties" 
-DHadoop.home.dir=/Users/tetsuya/hadoop 
-Dhadoop.id.str=host -Dhadoop.root.logger=INFO,console 
-Dhadoop.policy.file=/Users/tetsuya/hadoop/etc/hadoop/hadoop-policy.xml


以下がコンソール・ログ。Nativeのローダーで警告が出たが、動くことは確認できた。

14/06/07 15:33:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/07 15:33:50 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/06/07 15:33:50 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/06/07 15:33:50 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
14/06/07 15:33:50 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/07 15:33:50 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
14/06/07 15:33:50 INFO mapred.FileInputFormat: Total input paths to process : 1
14/06/07 15:33:50 INFO mapreduce.JobSubmitter: number of splits:1
14/06/07 15:33:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local601616696_0001
14/06/07 15:33:50 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/staging/tetsuya601616696/.staging/job_local601616696_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/06/07 15:33:50 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/staging/tetsuya601616696/.staging/job_local601616696_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/06/07 15:33:51 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/local/localRunner/tetsuya/job_local601616696_0001/job_local601616696_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/06/07 15:33:51 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/local/localRunner/tetsuya/job_local601616696_0001/job_local601616696_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/06/07 15:33:51 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
14/06/07 15:33:51 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/06/07 15:33:51 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
14/06/07 15:33:51 INFO mapreduce.Job: Running job: job_local601616696_0001
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Waiting for map tasks
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Starting task: attempt_local601616696_0001_m_000000_0
14/06/07 15:33:51 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
14/06/07 15:33:51 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
14/06/07 15:33:51 INFO mapred.MapTask: Processing split: file:/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/input/input.txt:0+1305
14/06/07 15:33:51 INFO mapred.MapTask: numReduceTasks: 1
14/06/07 15:33:51 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
14/06/07 15:33:51 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
14/06/07 15:33:51 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
14/06/07 15:33:51 INFO mapred.MapTask: soft limit at 83886080
14/06/07 15:33:51 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
14/06/07 15:33:51 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 
14/06/07 15:33:51 INFO mapred.MapTask: Starting flush of map output
14/06/07 15:33:51 INFO mapred.MapTask: Spilling map output
14/06/07 15:33:51 INFO mapred.MapTask: bufstart = 0; bufend = 2155; bufvoid = 104857600
14/06/07 15:33:51 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213544(104854176); length = 853/6553600
14/06/07 15:33:51 INFO mapred.MapTask: Finished spill 0
14/06/07 15:33:51 INFO mapred.Task: Task:attempt_local601616696_0001_m_000000_0 is done. And is in the process of committing
14/06/07 15:33:51 INFO mapred.LocalJobRunner: attempt_local601616696_0001_m_000000_0 processed 100
14/06/07 15:33:51 INFO mapred.Task: Task 'attempt_local601616696_0001_m_000000_0' done.
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local601616696_0001_m_000000_0
14/06/07 15:33:51 INFO mapred.LocalJobRunner: map task executor complete.
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Starting task: attempt_local601616696_0001_r_000000_0
14/06/07 15:33:51 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
14/06/07 15:33:51 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
14/06/07 15:33:51 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2d397e5c
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=745517888, maxSingleShuffleLimit=186379472, mergeThreshold=492041824, ioSortFactor=10, memToMemMergeOutputsThreshold=10
14/06/07 15:33:51 INFO reduce.EventFetcher: attempt_local601616696_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
14/06/07 15:33:51 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local601616696_0001_m_000000_0 decomp: 1930 len: 1934 to MEMORY
14/06/07 15:33:51 INFO reduce.InMemoryMapOutput: Read 1930 bytes from map-output for attempt_local601616696_0001_m_000000_0
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1930, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1930
14/06/07 15:33:51 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
14/06/07 15:33:51 INFO mapred.Merger: Merging 1 sorted segments
14/06/07 15:33:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1922 bytes
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merged 1 segments, 1930 bytes to disk to satisfy reduce memory limit
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merging 1 files, 1934 bytes from disk
14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
14/06/07 15:33:51 INFO mapred.Merger: Merging 1 sorted segments
14/06/07 15:33:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1922 bytes
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/06/07 15:33:51 INFO mapred.Task: Task:attempt_local601616696_0001_r_000000_0 is done. And is in the process of committing
14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/06/07 15:33:51 INFO mapred.Task: Task attempt_local601616696_0001_r_000000_0 is allowed to commit now
14/06/07 15:33:51 INFO output.FileOutputCommitter: Saved output of task 'attempt_local601616696_0001_r_000000_0' to file:/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/output/_temporary/0/task_local601616696_0001_r_000000
14/06/07 15:33:51 INFO mapred.LocalJobRunner: attempt_local601616696_0001_r_000000_0 processed 100 > reduce
14/06/07 15:33:51 INFO mapred.Task: Task 'attempt_local601616696_0001_r_000000_0' done.
14/06/07 15:33:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local601616696_0001_r_000000_0
14/06/07 15:33:51 INFO mapred.LocalJobRunner: reduce task executor complete.
14/06/07 15:33:52 INFO mapreduce.Job: Job job_local601616696_0001 running in uber mode : false
14/06/07 15:33:52 INFO mapreduce.Job:  map 100% reduce 100%
14/06/07 15:33:52 INFO mapreduce.Job: Job job_local601616696_0001 completed successfully
14/06/07 15:33:52 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=6870
		FILE: Number of bytes written=451235
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=15
		Map output records=214
		Map output bytes=2155
		Map output materialized bytes=1934
		Input split bytes=124
		Combine input records=214
		Combine output records=150
		Reduce input groups=150
		Reduce shuffle bytes=1934
		Reduce input records=150
		Reduce output records=150
		Spilled Records=300
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=4
		Total committed heap usage (bytes)=395567104
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1305
	File Output Format Counters 
		Bytes Written=1349


以下が結果。ストップワードや区切り文字、大文字・小文字といった(実用上での)問題はあるが、まずはOK。

10:30	1
130	1
1920s	1
2014	1
48th	1
49th	1
A	3
According	1
All	1
Associated	1
Center.	1
Copyright	1
Department	1
FDNY	1
Fire	1
Firefighters	1
Flames	1
Friday	1
Friday's	1
Gargantiel	1
Kirby	1
Manhattan,	1
More	1
New	2
One	1
Press.	1
Rockefeller	1
TGI	1
The	3
This	1
Twitter	1
Witness	1
York	1
York's	1
a	5
about	1
according	1
acrid.	1
after	1
air	1
an	1
and	8
area	2
around	2
as	1
basement	2
be	1
blaze	2
broadcast,	1
broke	1
brought	1
building	1
building.	1
built	1
bus	1
but	1
called	1
came	1
cause	1
caused	1
city	1
clouded	1
control	1
darker,	1
delays	1
department	1
dining	1
fifth	1
fire	3
firefighter	1
firefighters	1
first	1
flames	2
floor.	1
floors,	1
fourth	1
from	4
grew	1
has	1
he	1
hours	1
hurt	1
in	6
injury,	1
investigation.	1
it	1
kitchens	1
likely	1
material	1
may	1
midtown	1
minor	1
near	1
nearby	1
no	1
not	1
occupies	1
of	4
offices	1
officials	2
on	3
one	1
or	1
out	1
p.m.,	1
photo	1
published,	1
records,	1
redistributed	1
rerouted	1
reserved.	1
restaurant	2
rewritten	1
rights	1
roof	2
rooftop	1
routes.	1
said	3
said.	1
scene.	1
seriously	1
shooting	1
shot	1
showed	1
six-story	1
sixth.	1
skyline	1
smoke	2
spokesman	1
spokesman.	1
streets	1
sustained	1
system	1
than	1
the	18
thicker	1
third	1
through	1
to	3
traffic	1
transit	1
turning	1
two	1
under	2
vent.	1
ventilation	1
was	3
watched	1
were	1
with	1

続きの記事