你的位置:首页 > 数据库

[数据库][Hadoop in Action] 第6章 编程实践


  • Hadoop程序开发的独门绝技
  • 在本地,伪分布和全分布模式下调试程序
  • 程序输出的完整性检查和回归测试
  • 日志和监控
  • 性能调优
 
1、开发MapReduce程序
 
[本地模式]
 
     本地模式下的hadoop将所有的运行都放在一个单独的Java虚拟机中完成,并且使用的是本地文件系统(非HDFS)。在本地模式中运行的程序将所有的日志和错误信息都输出到控制台,最后它会给出所处理数据的总量。
 
对程序进行正确性检查:
  • 完整性检查
  • 回归测试
  • 考虑使用long而非int
 
 
[伪分布模式]
 
本地模式不具备生产型hadoop集群的分布式特征。一些bug在运行本地模式时是不会出现的。现在是通过日志文件和web界面远程监视它,这些工具和以后在监控生产集群时用的工具是相同的。
 
2、生产集群上的监视和调试
 
[计数器]
 

代码清单 使用计数器统计缺失值个数的MapClass
 
 1 import java.io.IOException; 2 import java.util.regex.PatternSyntaxException; 3 import java.util.Iterator; 4  5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.DoubleWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapred.*; 13 import org.apache.hadoop.util.Tool; 14 import org.apache.hadoop.util.ToolRunner; 15  16  17 public class AveragingWithCombiner extends Configured implements Tool { 18  19   public static class MapClass extends MapReduceBase 20     implements Mapper<LongWritable, Text, Text, Text> { 21  22     static enum ClaimsCounters { MISSING, QUOTED }; 23  24     public void map(LongWritable key, Text value, 25             OutputCollector<Text, Text> output, 26             Reporter reporter) throws IOException { 27  28       String fields[] = value.toString().split(",", -20); 29       String country = fields[4]; 30       String numClaims = fields[8]; 31       if (numClaims.length() == 0) { 32         reporter.incrCounter(ClaimsCounters.MISSING, 1); 33       } else if (numClaims.startsWith("\"")) { 34         reporter.incrCounter(ClaimsCounters.QUOTED, 1); 35       } else { 36         output.collect(new Text(country), new Text(numClaims + ",1")); 37       } 38  39     } 40   } 41  42   public static class Combine extends MapReduceBase 43     implements Reducer<Text, Text, Text, Text> { 44  45     public void reduce(Text key, Iterator<Text> values, 46              OutputCollector<Text, Text> output, 47              Reporter reporter) throws IOException { 48  49       double sum = 0; 50       int count = 0; 51       while (values.hasNext()) { 52         String fields[] = values.next().toString().split(","); 53         sum += Double.parseDouble(fields[0]); 54         count += Integer.parseInt(fields[1]); 55       } 56       output.collect(key, new Text(sum + "," + count)); 57     } 58   } 59  60   public static class Reduce extends MapReduceBase 61     implements Reducer<Text, Text, Text, DoubleWritable> { 62  63     public void reduce(Text key, Iterator<Text> values, 64              OutputCollector<Text, DoubleWritable> output, 65              Reporter reporter) throws IOException { 66  67       double sum = 0; 68       int count = 0; 69       while (values.hasNext()) { 70         String fields[] = values.next().toString().split(","); 71         sum += Double.parseDouble(fields[0]); 72         count += Integer.parseInt(fields[1]); 73       } 74       output.collect(key, new DoubleWritable(sum/count)); 75     } 76   } 77  78   public int run(String[] args) throws Exception { 79     // Configuration processed by ToolRunner 80     Configuration conf = getConf(); 81  82     // Create a JobConf using the processed conf 83     JobConf job = new JobConf(conf, AveragingWithCombiner.class); 84  85     // Process custom command-line options 86     Path in = new Path(args[0]); 87     Path out = new Path(args[1]); 88     FileInputFormat.setInputPaths(job, in); 89     FileOutputFormat.setOutputPath(job, out); 90  91     // Specify various job-specific parameters    92     job.setJobName("AveragingWithCombiner"); 93     job.setMapperClass(MapClass.class); 94     job.setCombinerClass(Combine.class); 95     job.setReducerClass(Reduce.class); 96  97     job.setInputFormat(TextInputFormat.class); 98     job.setOutputFormat(TextOutputFormat.class); 99     job.setOutputKeyClass(Text.class);100     job.setOutputValueClass(Text.class);101 102     // Submit the job, then poll for progress until the job is complete103     JobClient.runJob(job);104 105     return 0;106   }107 108   public static void main(String[] args) throws Exception {109     // Let ToolRunner handle generic command-line options 110     int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);111 112     System.exit(res);113   }114 }


 

 
[跳过坏记录]
 
(1)在Java中配置记录跳读
 
     hadoop从0.19版本起就已经支持skipping特征了,但默认状态是关闭的。在Java中,这个特征由类SkipBadRecords来控制,全部由静态方法组成。作业的driver需要调用如下的一个或全部方法:
     public static void setMapperMaxSkipRecords(Configuration conf, long maxSkipRecs)
     public static void setReducerMaxSkipGroups(Configuration conf, long maxSkipRecs)
来分别为map任务和reduce任务打开记录跳读的设置。如果最大的跳读区域大小被设置为0(默认),那么记录跳读就处于关闭状态。可以使用JobConf.setMaxMapAttempts()和JobConf.setMaxReduceAttempts()方法,或者设置等效的属性mapred.map.max.attempts和mapred.reduce.max.attempts来做到这点。
 
     如果skipping被启用,hadoop在任务失效两次后就进入skipping模式。你可以在SkipBadRecords的setAttemptsToStartSkipping()方法中设置触发skipping模式的任务失效次数:
     public static void setAttemptsToStartSkipping(Configuration conf, int attemptsToStartSkipping)
hadoop会把被跳过的记录写入HDFS以供以后分析,它们以序列文件的形式写入_log/skip目录,可以用hadoop fs -text <filepath>解压并读取。你可以使用方法SkipBadRecords.setSkipOutputPath(JobConf conf, Path path)修改当前用于存放被跳过记录的目录_log/skip,如果path被设为空,或者一个值为“none”的字符串path,hadoop就会放弃记录被跳过的记录。
 
(2)在Java之外配置记录跳读
 
SkipBadRecords方法
JobConf属性
setAttemptsToStartSkipping()mapred.skip.attempts.to.start.skipping
setMapperMaxSkipRecords()mapred.skip.map.max.skip.records
setReducerMaxSkipGroups()mapred.skip.reduce.max.skip.groups
setSkipOutputPath()mapred.skip.out.dir
setAutoIncrMapperProcCount()mapred.skip.map.auto.incr.proc.count
setAutoIncrReducerProcCount()mapred.skip.reduce.auto.incr.proc.count
 
 
3、性能调优
 
(1)通过combiner来减少网络流量
     Combiner可以减少在map和reduce阶段之间洗牌的数据量,较低的网络流量缩短了执行时间。
 
(2)减少输入数据量
 
(3)使用压缩
     hadoop内置支持压缩与解压。启用对map输出的压缩涉及对两个属性的配置:
 
属性
描述
mapred.compress.map.outputBoolean属性,表示mapper的输出是否被压缩
mapred.map.output.compression.codecClass属性,表示哪种CompressionCodec被用于压缩mapper的输出
 
conf.setBoolean(“mapred.compress.map.output”, true);
conf.setClass(“mapred.map.output.compression.codec”, GzipCodec.calss, CompressionCodec.class);
也可以直接使用JobConf中的便捷方法setCompressionMapOutput()和setMapOutputCompressorClass()。
 
(4)重用JVM
     hadoop从版本0.19.0开始,允许相同作业的多个任务之间重用JVM。因此,启动开销被平摊到多个任务中。一个新属性(mapred.job.reuse.jvm.num.tasks)指定了一个JVM可以运行的最大任务数。它默认值为1,此时JVM不能被重用。你可以增大该属性值来启用JVM重用。如果将其设置为-1,则意味着在可重复使用JVM的任务数量上没有限制。在JobConf对象中有一个便捷方法,setNumTasksToExecutePerJvm(int),可以用它很方便地设置作业的属性。
 
(5)根据猜测执行来运行
     启动和禁止猜测执行的配置属性:
 
属性
描述
mapred.map.tasks.speculative.execution布尔属性,表示是否运行map任务猜测执行
mapred.reduce.tasks.speculative.execution布尔属性,表示是否运行reduce任务猜测执行
 
(6)代码重构与算法重写
     Streaming程序重写为hadoop的Java程序
 
 
 [转载请注明] http://www.cnblogs.com/zhengrunjian/