你的位置:首页 > 数据库

[数据库]MapReduce实现Top10


Mapper端:完成数据抽取以及mapper端的Top10,使用TreeMap。
public class TopTenMapper extends   Mapper <Object, Text, NullWritable, Text> {  //Store a map of appName Flow to the record  private TreeMap<Integer, Text> flowToRecMap = new TreeMap<Integer, Text>();  public void map(Object key, Text value, Context context)   throws IOException, InterruptedException {   String line = value.toString();   String[] recordArray = line.split("\\t");   if (recordArray.length == 2) {     Integer flow = Integer.valueOf(recordArray[1].trim());     //Text appName = new Text(recordArray[0].trim());     Text records = new Text(line);     //Add this records to our map with the flow as the key     //records is the String of line     flowToRecMap.put(flow, records);   }   //If we have more than ten records, remove the one with the lowest flow   //As the tree map is sorted in descending order, the appName with   //the lowest flow is the last key   if (flowToRecMap.size() > 10) {     flowToRecMap.remove(flowToRecMap.firstKey());   }  }  protected void cleanup(Context context) throws IOException,     InterruptedException {   //Output our ten records to the reduces with a null key   for (Text t: flowToRecMap.values()){     context.write(NullWritable.get(), t);   }  }}


Reduce端:注意特殊性在于使用NullWritbale作为键值,只有一个ReduceTask
public class TopTenRuducer extends    Reducer <NullWritable, Text, NullWritable, Text>{  //Stores a map appName flow to the record  //Overloads the comparator to order the flow in descending order  private TreeMap<Integer, Text> flowToRecMap = new TreeMap<Integer, Text>();  public void reduce(NullWritable key, Iterable<Text> values,            Context context) throws IOException, InterruptedException {    for (Text val : values){      String line = val.toString();      String[] recordArray = line.split("\\t");      flowToRecMap.put(new Integer(recordArray[1].trim()), new Text(line));      //If we have more than ten records, remove the one with the lowest flow      //As this tree map is sorted in descending order, the appName with      //the lowest flow is the last key      if (flowToRecMap.size() > 10) {        flowToRecMap.remove(flowToRecMap.firstKey());      }    }    for (Text t : flowToRecMap.descendingMap().values()){      //Output our top ten to the system with a null key      context.write(NullWritable.get(),t);    }  }}

驱动程序:注意设置Reduce的任务数为1

public class TopTenRunJob extends Configured implements Tool{  public int run(String[] args) throws Exception {    Configuration conf = getConf();    String inputFilePathStr = args[0];    String outputFilePathStr = args[1];    Job jobForTopTen = Job.getInstance(conf, "TopTenFlow");    jobForTopTen.setJarByClass(TopTenRunJob.class);    jobForTopTen.setMapperClass(TopTenMapper.class);    jobForTopTen.setReducerClass(TopTenRuducer.class);    jobForTopTen.setOutputKeyClass(NullWritable.class);    jobForTopTen.setOutputValueClass(Text.class);    jobForTopTen.setNumReduceTasks(1);    FileInputFormat.addInputPath(jobForTopTen, new Path(inputFilePathStr));    FileOutputFormat.setOutputPath(jobForTopTen, new Path(outputFilePathStr));    if (!jobForTopTen.waitForCompletion(true)) {      return 1;    } else {      return 0;    }  }  public static void main(String[] args) throws Exception {    int exitCode = ToolRunner.run(new TopTenRunJob(), args);    System.exit(exitCode);  }}