你的位置:首页 > 数据库

[数据库]五、编写wordcountMapReduce程序


一、输入文件

hadoop mapreduce
hadoop mapreduce
yarn jps

二、实现效果

hadoop 2
jps 1
mapreduce 2
yarn 1

三、分析

1)map端

输入:

hadoop mapreduce
hadoop mapreduce
yarn jps

输出:

hadoop 1

mapreduce 1

hadoop 1

mapreduce 1

yarn 1

jps 1

2)reduce端

输入:

hadoop <1,1>

mapreduce <1,1>

yarn 1

jps 1

四、代码实现

 

package org.apache.hadoop.studyhdfs.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class WordCountMapReduce extends Configured implements Tool {  //1.map  /*   * <KEYIN,VALUEIN,KEYOUT,VALUEOUT>   */  public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{    private Text mapOutPutKey =new Text();    private IntWritable mapOutPutValue =new IntWritable(1);    @Override    public void map(LongWritable key, Text value, Context context)        throws IOException, InterruptedException {      //to do      String lineValue=value.toString();      String [] strs =lineValue.split("\t");      for(String str:strs){        mapOutPutKey.set(str);        context.write(mapOutPutKey, mapOutPutValue);      }    }      }    //2.reduce  public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable>{    private IntWritable mapOutPutValue =new IntWritable();    @Override    public void reduce(Text key, Iterable<IntWritable> values,Context context)        throws IOException, InterruptedException {      //to do      int sum =0;      for(IntWritable value:values){        sum +=value.get();      }      mapOutPutValue.set(sum);      context.write(key, mapOutPutValue);    }          }    public int run(String[] args) throws Exception{    //1.get Configuration    Configuration conf =super.getConf();    //2.create job    Job job =Job.getInstance(conf, this.getClass().getSimpleName());    job.setJarByClass(WordCountMapReduce.class);    //3.set job    //3.1 set input    Path inputPath =new Path(args[0]);    FileInputFormat.addInputPath(job, inputPath);    //3.2 set mapper    job.setMapperClass(WordCountMapper.class);    job.setMapOutputKeyClass(Text.class);    job.setMapOutputValueClass(IntWritable.class);        //3.3 set reduce    job.setReducerClass(WordCountReduce.class);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(IntWritable.class);    //3.4 set input    Path outputPath =new Path(args[1]);    FileOutputFormat.setOutputPath(job, outputPath);        //4.submmit    boolean isSuccess =job.waitForCompletion(true);    return isSuccess?0:1;  }    public static void main(String[] args) throws Exception {    args =new String[]{"hdfs://Hadoop-senior02.beifeng.com:8020/input/sort.txt"        ,"hdfs://Hadoop-senior02.beifeng.com:8020/output8/"};    Configuration conf =new Configuration();    int status=ToolRunner.run(conf, new WordCountMapReduce() , args);    System.exit(status);  }  }

 

五、运行结果

1)代码运行:bin/hdfs dfs -text /output/part*

2)运行结果:

hadoop 2
jps 1
mapreduce 2
yarn 1