EclipseでHadoop2.4の開発環境を作る&ワードカウントのプログラムを作成する。
前回に引き続き、使用するHadoopのバージョンは2.4。1年前の記事をもとに、Eclipseで開発環境を作る。
開発を行うクライアントは、MacBook Pro (OSX 10.9.3)。クライアントのJavaのバージョンは以下。
MacBook-Pro:~ tetsuya$ java -version java version "1.6.0_65" Java(TM) SE Runtime Environment (build 1.6.0_65-b14-462-11M4609) Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-462, mixed mode)
ダウンロード
hadoopのダウンロードページから、hadoop-2.4.0.tar.gzをダウンロードする。
クライアントの設定
ダウンロードしたhadoop-2.4.0.tar.gzをホームに解凍し、hadoopというシンボリックリンクを張る。
lrwxr-xr-x 1 tetsuya staff 13 6 7 11:50 hadoop -> hadoop-2.4.0/ drwxr-xr-x@ 12 tetsuya staff 408 3 31 18:15 hadoop-2.4.0
ワードカウントのプログラムを作成する。
プロジェクトの作成と外部JARの登録
新しくJavaプロジェクトを作成する。
プロパティーのJavaのビルド・パスに、「Add External JARS」で~/hadoop/share/hadoop下に含まれるjarファイルを登録する。
追加するjarファイルが存在するディレクトリは以下。
クラスの作成:ワードカウントのプログラムの作成。
ワードカウントを行うJavaプログラムを作成する。
- Javaアプリケーションとしてバッチで処理するものとする。
- MapReduceは1系(一年前の記事から移植する)
- Hadoop-2.4.0のAPIにしたがう。
やって見た感じでは、MapReduceの処理そのものはそのまま動くし、殆ど変わらない。
以下に具体的にEclipseでの開発手順とプログラム、実行方法と結果を示す。
新規にパッケージを作成して、クラス(FQCN=com.tetsuyaodaka.WordCount)を追加する。
下に示すプログラムは、至る所で見られる代物で、次のロジックになっている。
- Mapperでは、スペース区切りの文書を単語(ワード)に分け、(ワード,1)というキーバリューに分ける。
- Reducerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。
上のロジックは、以下と書いても同じなので、ReduceクラスをCombinerとして使う。
- Mapperでは、スペース区切りの文書を単語(ワード)に分け、(ワード,1)というキーバリューに分ける。
- Combinerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。
- Reducerでは、ワードごとにValueの1を足し合わせることで「単語の出現頻度」をカウントする。
CombinerはMapperの後に(Mapperと同一のマシンで)処理されるが、処理が実行される保証はない。
/* /* Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package com.tetsuyaodaka; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.Tool; /** * Hadoop2.4.0のMapReduceでワードカウントする。 * * author: tetsuya.odaka@gmail.com * */ @SuppressWarnings("unused") public class WordCount{ /* * Map * Text Fileを読んで、スペースで区切って、(ワード,1)で書き出す。 * (注意) Map(K1,V1,OutputCollector(K1,V1))のK1はSequenceなので、 * 必ず、LongWritableにする。 */ public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String mapTaskId; private String inputFile; private int noRecords = 0; /* * Mapperの初期化 */ public void setup(Context context) { mapTaskId = MRJobConfig.TASK_ATTEMPT_ID; inputFile = MRJobConfig.MAP_INPUT_FILE; } /* * Mapper本体 * * 入力ファイルから1行ずつ読み込んで処理を行う。 */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); StringTokenizer tk = new StringTokenizer(line); while(tk.hasMoreTokens()){ ++noRecords; word.set(tk.nextToken()); context.write(word, one); } } } /* * Reduce * (ワード,1)を集約して、ワードの出現回数を出力する。 * * 注記: * Combinerでも使う。 */ public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private String reduceTaskId; private int noKeys = 0; /* * Reducerの初期化 * */ public void setup(Context context) { reduceTaskId = MRJobConfig.TASK_ATTEMPT_ID; } /* * Reducer本体 * * 同一キーの値(value)がItarableに入って、処理が開始。 * Comberで、Wordに対してカウントをとっても大丈夫なので、Combinerとしても使う。 */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable value: values){ sum += value.get(); } context.write(key, new IntWritable(sum)); ++noKeys; } } public static void main(String[] args) throws Exception { // プログラムの開始時刻をstdoutに書く。 Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); System.out.println("Program Start: "+sdf.format(date)); // Jobインスタンスの生成 Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCount.class); // Mapper, Combiner, Reducerの定義 job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); // OutPutKey, OutPutValueの定義 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 入力・出力ファイルのフォルダー定義:S3バケットからとる。 // インプラントフォルダーは引数からとる。 FileInputFormat.addInputPath(job, new Path(args[0])); // アウトプットフォルダーは動的に作成する。 Long dTmp = Math.round(Math.random()*100000); String subDir = args[1]+"/"+dTmp.toString(); FileOutputFormat.setOutputPath(job, new Path(subDir)); System.out.println("Output folder is "+subDir); // 入力・出力ファイルの形式の定義 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // ジョブの実行 boolean success = job.waitForCompletion(true); if(success){ System.out.println("MapReduce Job endded successfully."); }else{ System.out.println("Error! MapReduce Job abnormally endded."); } // プログラムの終了時刻をstdoutに書く。 date = new Date(); System.out.println("Program END: "+sdf.format(date)); } }
1.0.3の時に使っていたorg.apache.hadoop.mapreduce.Jobのコンストラクタが全てdeprecatedになっている(APIドキュメント)になっているので、staticなFactoryを使うように修正した。
実行時には、
- 入力テキストを保持するフォルダー
- 出力を保持するフォルダー
を指定する。
/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/input /Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/output
入力を保持するフォルダーには、スペース区切りのテキストを突っ込んでおけば良い。出力フォルダーを作成しておくと、「File Already Exists」というIOExceptionがでる。なので、この配下に、乱数を使ってフォルダーを作成するようにする。(このIOExceptionは、「上書きでせっかく作ったデータが消えるのを防止するため」と象本に書いてある)
以下が、プロジェクトのフォルダー構成。
クラスの実行:ワードカウントのプログラムの実行。
サンプルを実行してみる。サンプルには、USA Today紙の記事(3-alarm fire near Rockefeller Center)を使ってみた。
この記事をコピペして、入力フォルダーにinput.txtとして保存する。これで、Javaアプリケーションの実行をすれば動くのだが、log4jのappenderがないよと言って来たり、Macのセキュリティーエラーが出たりするので(こちらの記事を参考にさせていただいた)、Dパラ(VM Arguments)を以下のように設定しておく。
-Xmx1024m -Djava.security.krb5.realm=OX.AC.UK -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk -Dlog4j.configuration="file:/Users/tetsuya/hadoop/etc/hadoop/log4j.properties" -DHadoop.home.dir=/Users/tetsuya/hadoop -Dhadoop.id.str=host -Dhadoop.root.logger=INFO,console -Dhadoop.policy.file=/Users/tetsuya/hadoop/etc/hadoop/hadoop-policy.xml
以下がコンソール・ログ。Nativeのローダーで警告が出たが、動くことは確認できた。
14/06/07 15:33:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/07 15:33:50 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 14/06/07 15:33:50 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 14/06/07 15:33:50 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 14/06/07 15:33:50 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 14/06/07 15:33:50 WARN mapreduce.JobSubmitter: No job jar file set. User classes may not be found. See Job or Job#setJar(String). 14/06/07 15:33:50 INFO mapred.FileInputFormat: Total input paths to process : 1 14/06/07 15:33:50 INFO mapreduce.JobSubmitter: number of splits:1 14/06/07 15:33:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local601616696_0001 14/06/07 15:33:50 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/staging/tetsuya601616696/.staging/job_local601616696_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 14/06/07 15:33:50 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/staging/tetsuya601616696/.staging/job_local601616696_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 14/06/07 15:33:51 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/local/localRunner/tetsuya/job_local601616696_0001/job_local601616696_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 14/06/07 15:33:51 WARN conf.Configuration: file:/tmp/hadoop-tetsuya/mapred/local/localRunner/tetsuya/job_local601616696_0001/job_local601616696_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 14/06/07 15:33:51 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 14/06/07 15:33:51 INFO mapred.LocalJobRunner: OutputCommitter set in config null 14/06/07 15:33:51 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 14/06/07 15:33:51 INFO mapreduce.Job: Running job: job_local601616696_0001 14/06/07 15:33:51 INFO mapred.LocalJobRunner: Waiting for map tasks 14/06/07 15:33:51 INFO mapred.LocalJobRunner: Starting task: attempt_local601616696_0001_m_000000_0 14/06/07 15:33:51 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux. 14/06/07 15:33:51 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/06/07 15:33:51 INFO mapred.MapTask: Processing split: file:/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/input/input.txt:0+1305 14/06/07 15:33:51 INFO mapred.MapTask: numReduceTasks: 1 14/06/07 15:33:51 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 14/06/07 15:33:51 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 14/06/07 15:33:51 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 14/06/07 15:33:51 INFO mapred.MapTask: soft limit at 83886080 14/06/07 15:33:51 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 14/06/07 15:33:51 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 14/06/07 15:33:51 INFO mapred.LocalJobRunner: 14/06/07 15:33:51 INFO mapred.MapTask: Starting flush of map output 14/06/07 15:33:51 INFO mapred.MapTask: Spilling map output 14/06/07 15:33:51 INFO mapred.MapTask: bufstart = 0; bufend = 2155; bufvoid = 104857600 14/06/07 15:33:51 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213544(104854176); length = 853/6553600 14/06/07 15:33:51 INFO mapred.MapTask: Finished spill 0 14/06/07 15:33:51 INFO mapred.Task: Task:attempt_local601616696_0001_m_000000_0 is done. And is in the process of committing 14/06/07 15:33:51 INFO mapred.LocalJobRunner: attempt_local601616696_0001_m_000000_0 processed 100 14/06/07 15:33:51 INFO mapred.Task: Task 'attempt_local601616696_0001_m_000000_0' done. 14/06/07 15:33:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local601616696_0001_m_000000_0 14/06/07 15:33:51 INFO mapred.LocalJobRunner: map task executor complete. 14/06/07 15:33:51 INFO mapred.LocalJobRunner: Waiting for reduce tasks 14/06/07 15:33:51 INFO mapred.LocalJobRunner: Starting task: attempt_local601616696_0001_r_000000_0 14/06/07 15:33:51 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux. 14/06/07 15:33:51 INFO mapred.Task: Using ResourceCalculatorProcessTree : null 14/06/07 15:33:51 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2d397e5c 14/06/07 15:33:51 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=745517888, maxSingleShuffleLimit=186379472, mergeThreshold=492041824, ioSortFactor=10, memToMemMergeOutputsThreshold=10 14/06/07 15:33:51 INFO reduce.EventFetcher: attempt_local601616696_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 14/06/07 15:33:51 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local601616696_0001_m_000000_0 decomp: 1930 len: 1934 to MEMORY 14/06/07 15:33:51 INFO reduce.InMemoryMapOutput: Read 1930 bytes from map-output for attempt_local601616696_0001_m_000000_0 14/06/07 15:33:51 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 1930, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1930 14/06/07 15:33:51 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/06/07 15:33:51 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 14/06/07 15:33:51 INFO mapred.Merger: Merging 1 sorted segments 14/06/07 15:33:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1922 bytes 14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merged 1 segments, 1930 bytes to disk to satisfy reduce memory limit 14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merging 1 files, 1934 bytes from disk 14/06/07 15:33:51 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 14/06/07 15:33:51 INFO mapred.Merger: Merging 1 sorted segments 14/06/07 15:33:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1922 bytes 14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/06/07 15:33:51 INFO mapred.Task: Task:attempt_local601616696_0001_r_000000_0 is done. And is in the process of committing 14/06/07 15:33:51 INFO mapred.LocalJobRunner: 1 / 1 copied. 14/06/07 15:33:51 INFO mapred.Task: Task attempt_local601616696_0001_r_000000_0 is allowed to commit now 14/06/07 15:33:51 INFO output.FileOutputCommitter: Saved output of task 'attempt_local601616696_0001_r_000000_0' to file:/Users/tetsuya/Documents/workspace_jee/hadoop-2.4.0/output/_temporary/0/task_local601616696_0001_r_000000 14/06/07 15:33:51 INFO mapred.LocalJobRunner: attempt_local601616696_0001_r_000000_0 processed 100 > reduce 14/06/07 15:33:51 INFO mapred.Task: Task 'attempt_local601616696_0001_r_000000_0' done. 14/06/07 15:33:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local601616696_0001_r_000000_0 14/06/07 15:33:51 INFO mapred.LocalJobRunner: reduce task executor complete. 14/06/07 15:33:52 INFO mapreduce.Job: Job job_local601616696_0001 running in uber mode : false 14/06/07 15:33:52 INFO mapreduce.Job: map 100% reduce 100% 14/06/07 15:33:52 INFO mapreduce.Job: Job job_local601616696_0001 completed successfully 14/06/07 15:33:52 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=6870 FILE: Number of bytes written=451235 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=15 Map output records=214 Map output bytes=2155 Map output materialized bytes=1934 Input split bytes=124 Combine input records=214 Combine output records=150 Reduce input groups=150 Reduce shuffle bytes=1934 Reduce input records=150 Reduce output records=150 Spilled Records=300 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=4 Total committed heap usage (bytes)=395567104 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=1305 File Output Format Counters Bytes Written=1349
以下が結果。ストップワードや区切り文字、大文字・小文字といった(実用上での)問題はあるが、まずはOK。
10:30 1 130 1 1920s 1 2014 1 48th 1 49th 1 A 3 According 1 All 1 Associated 1 Center. 1 Copyright 1 Department 1 FDNY 1 Fire 1 Firefighters 1 Flames 1 Friday 1 Friday's 1 Gargantiel 1 Kirby 1 Manhattan, 1 More 1 New 2 One 1 Press. 1 Rockefeller 1 TGI 1 The 3 This 1 Twitter 1 Witness 1 York 1 York's 1 a 5 about 1 according 1 acrid. 1 after 1 air 1 an 1 and 8 area 2 around 2 as 1 basement 2 be 1 blaze 2 broadcast, 1 broke 1 brought 1 building 1 building. 1 built 1 bus 1 but 1 called 1 came 1 cause 1 caused 1 city 1 clouded 1 control 1 darker, 1 delays 1 department 1 dining 1 fifth 1 fire 3 firefighter 1 firefighters 1 first 1 flames 2 floor. 1 floors, 1 fourth 1 from 4 grew 1 has 1 he 1 hours 1 hurt 1 in 6 injury, 1 investigation. 1 it 1 kitchens 1 likely 1 material 1 may 1 midtown 1 minor 1 near 1 nearby 1 no 1 not 1 occupies 1 of 4 offices 1 officials 2 on 3 one 1 or 1 out 1 p.m., 1 photo 1 published, 1 records, 1 redistributed 1 rerouted 1 reserved. 1 restaurant 2 rewritten 1 rights 1 roof 2 rooftop 1 routes. 1 said 3 said. 1 scene. 1 seriously 1 shooting 1 shot 1 showed 1 six-story 1 sixth. 1 skyline 1 smoke 2 spokesman 1 spokesman. 1 streets 1 sustained 1 system 1 than 1 the 18 thicker 1 third 1 through 1 to 3 traffic 1 transit 1 turning 1 two 1 under 2 vent. 1 ventilation 1 was 3 watched 1 were 1 with 1
続きの記事