mportjava。io。IOException; aclasstaghrefwiki10895U6DVNL7LR4Y。htmlimportorga。apache。hadoop。conf。Configuration; importorg。apache。hadoop。conf。Configured; importorg。apache。hadoop。fs。FileSystem; importorg。apache。hadoop。fs。Path; importorg。apache。hadoop。io。LongWritable; importorg。apache。hadoop。io。Text; importorg。apache。hadoop。mapreduce。Job; importorg。apache。hadoop。mapreduce。Mapper; importorg。apache。hadoop。mapreduce。Reducer; importorg。apache。hadoop。mapreduce。lib。input。FileInputFormat; importorg。apache。hadoop。mapreduce。lib。output。FileOutputFormat; importorg。apache。hadoop。util。Tool; importorg。apache。hadoop。util。ToolRunner; publicclassWordCountextendsConfiguredimplementsTool{ 对文本文件进行Wordcount,文本文件的输入类型是TextInputFormat,它实现了createRecordReader, 返回创建的LineRecordReader实现类,这个类里就有对应的key和value的类型 文本文件 KEYIN:行字节偏移量 VALUEIN:一行数据 mapper的输入类型是由业务需求来自行确定类型,跟框架没关系,因为我们的需求是按照单词统计数量 key:单词,String类型的封装类Text value:数值,Long类型的封装类LongWritable publicstaticclassWordCountMapperextendsMapperlt;longwritable,longwritablegt;{lt;longwritable,gt; ZZ map(),一行调用一次 Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext) throwsIOException,InterruptedException{ Stringlinevalue。toString(); System。out。println(map():keyIn:key。get();valueIn:value。toString()); String〔〕splitsline。split(); for(Stringword:splits){ keyOut。set(word); map()输出数据,用context。write() context。write(keyOut,valueOut); System。out。println(map():keyOut:keyOut。toString();valueOut:valueOut。get()); } } } KEYIN,VALUEIN:根据map输出的类型来确定 KEYOUT,VALUEOUT:根据业务需求确定 KEYOUT是单词,String类型的封装类Text VALUEOUT数值,Long类型的封装类LongWritable publicstaticclassWordCountReducerextendsReducerlt;text,longwritablegt;{lt;text,gt; LongWritablevalueOutnewLongWritable(); 一个key调用一次 Override protectedvoidreduce(Textkey,Iterablelt;longwritablegt;values,Contextcontext)throwsIOException,InterruptedException{lt;longwritablegt; StringBuildersbnewStringBuilder(); sb。append(reduce():keyIn:key。toString();vlaueIn:〔); longsum0; for(LongWritablew:values){ 通过get(),获取LongWritable对象的实际值 longnumw。get(); sumnum; sb。append(num)。append(,); } sb。deleteCharAt(sb。length()1); sb。append(〕); System。out。println(sb。toString()); valueOut。set(sum); context。write(key,valueOut); } } Override publicintrun(String〔〕args)throwsException{ job创建及配置,提交任务 ConfigurationconfgetConf(); 创建job对象 JobjobJob。getInstance(conf,wordcount); job任务运行类 job。setJarByClass(WordCount。class); job任务map运行类 job。setMapperClass(WordCountMapper。class); job任务reduce运行类 job。setReducerClass(WordCountReducer。class); job任务map阶段输出的key的类型 job。setMapOutputKeyClass(Text。class); job任务map阶段输出的value类型 job。setMapOutputValueClass(LongWritable。class); job任务reduce阶段(最后阶段)输出的key的类型 job。setOutputKeyClass(Text。class); job任务reduce阶段(最后阶段)输出的value的类型 job。setOutputValueClass(LongWritable。class); 设置reduce个数 job。setNumReduceTasks(2); job任务的输入目录 FileInputFormat。addInputPath(job,newPath(args〔0〕)); PathoutputPathnewPath(args〔1〕); job任务的输出目录 FileOutputFormat。setOutputPath(job,outputPath); 解决自动删除输出目录 FileSystemfsFileSystem。get(conf); 判断文件系统下存不存在该目录,如果存在删除 if(fs。exists(outputPath)){ 递归删除 fs。delete(outputPath,true); System。out。println(outputdir:outputPath。toString()deletedSUCCESS!); } 提交任务 waitForCompletion(false);false:代表不输出counter booleanstatusjob。waitForCompletion(false); returnstatus?0:1; } publicstaticvoidmain(String〔〕args)throwsException{ 运行时将输入输出目录放到执行参数里,用main()的args接收到 tmpinputtmpoutput System。exit(ToolRunner。run(newWordCount(),args)); } }