Mapper端:完成数据抽取以及mapper端的Top10,使用TreeMap。
public class TopTenMapper extends Mapper <Object, Text, NullWritable, Text> { //Store a map of appName Flow to the record private TreeMap<Integer, Text> flowToRecMap = new TreeMap<Integer, Text>(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] recordArray = line.split("\\t"); if (recordArray.length == 2) { Integer flow = Integer.valueOf(recordArray[1].trim()); //Text appName = new Text(recordArray[0].trim()); Text records = new Text(line); //Add this records to our map with the flow as the key //records is the String of line flowToRecMap.put(flow, records); } //If we have more than ten records, remove the one with the lowest flow //As the tree map is sorted in descending order, the appName with //the lowest flow is the last key if (flowToRecMap.size() > 10) { flowToRecMap.remove(flowToRecMap.firstKey()); } } protected void cleanup(Context context) throws IOException, InterruptedException { //Output our ten records to the reduces with a null key for (Text t: flowToRecMap.values()){ context.write(NullWritable.get(), t); } }}
Reduce端:注意特殊性在于使用NullWritbale作为键值,只有一个ReduceTask
public class TopTenRuducer extends Reducer <NullWritable, Text, NullWritable, Text>{ //Stores a map appName flow to the record //Overloads the comparator to order the flow in descending order private TreeMap<Integer, Text> flowToRecMap = new TreeMap<Integer, Text>(); public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values){ String line = val.toString(); String[] recordArray = line.split("\\t"); flowToRecMap.put(new Integer(recordArray[1].trim()), new Text(line)); //If we have more than ten records, remove the one with the lowest flow //As this tree map is sorted in descending order, the appName with //the lowest flow is the last key if (flowToRecMap.size() > 10) { flowToRecMap.remove(flowToRecMap.firstKey()); } } for (Text t : flowToRecMap.descendingMap().values()){ //Output our top ten to the system with a null key context.write(NullWritable.get(),t); } }}
驱动程序:注意设置Reduce的任务数为1
public class TopTenRunJob extends Configured implements Tool{ public int run(String[] args) throws Exception { Configuration conf = getConf(); String inputFilePathStr = args[0]; String outputFilePathStr = args[1]; Job jobForTopTen = Job.getInstance(conf, "TopTenFlow"); jobForTopTen.setJarByClass(TopTenRunJob.class); jobForTopTen.setMapperClass(TopTenMapper.class); jobForTopTen.setReducerClass(TopTenRuducer.class); jobForTopTen.setOutputKeyClass(NullWritable.class); jobForTopTen.setOutputValueClass(Text.class); jobForTopTen.setNumReduceTasks(1); FileInputFormat.addInputPath(jobForTopTen, new Path(inputFilePathStr)); FileOutputFormat.setOutputPath(jobForTopTen, new Path(outputFilePathStr)); if (!jobForTopTen.waitForCompletion(true)) { return 1; } else { return 0; } } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new TopTenRunJob(), args); System.exit(exitCode); }}
原标题:MapReduce实现Top10
关键词: