你的位置:首页 > 数据库

[数据库]Spark读HBase多表组成一个RDD

环境:Spark-1.5.0 HBase-1.0.0。

场景:HBase中按天分表存数据,要求将任意时间段的数据合并成一个RDD以做后续计算。

尝试1: 寻找一次读取多个表的API,找到最接近的是一个叫MultiTableInputFormat的东西,它在MapReduce中使用良好,

  但没有找到用于RDD读HBase的方法。

尝试2: 每个表生成一个RDD,再用union合并,代码逻辑如下:

var totalRDD = xxx  // 读取第一张表for {  // 循环读表并合并到totalRDD  val sRDD = xxx
  totalRDD.union(sRDD)}

代码放到集群上执行,totalRDD并不是正确的union结果,用var还真是不行。

尝试3: 思路类似2,但使用SparkContext.union来一次合并多个RDD,代码逻辑如下:

var rddSet: xxx = Set()  // 创建RDD列表dateSet.foreach(date => {  // 将所有表的RDD放入列表中  val sRDD = xxx  rddSet += sRDD}val totalRDD = sc.union(rddSet.toSeq)  // 合并列表中的所有RDD

完整代码如下:

import java.text.SimpleDateFormatimport org.apache.hadoop.hbase.client.Resultimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkContext, SparkConf}import org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport scala.collection.mutable.Set/** * 时间处理类 */object Htime { /**  * 根据起止日期获取日期列表  * 例如起止时间为20160118,20160120,那么日期列表为(20160118,20160119,20160120)  *  * @param sDate 开始日期  * @param eDate 结束日期  * @return 日期列表  */ def getDateSet(sDate:String, eDate:String): Set[String] = {  // 定义要生成的日期列表  var dateSet: Set[String] = Set()  // 定义日期格式  val sdf = new SimpleDateFormat("yyyyMMdd")  // 按照上边定义的日期格式将起止时间转化成毫秒数  val sDate_ms = sdf.parse(sDate).getTime  val eDate_ms = sdf.parse(eDate).getTime  // 计算一天的毫秒数用于后续迭代  val day_ms = 24*60*60*1000  // 循环生成日期列表  var tm = sDate_ms  while (tm <= eDate_ms) {   val dateStr = sdf.format(tm)   dateSet += dateStr   tm = tm + day_ms  }  // 日期列表作为返回  dateSet }}/** * 从HBase中读取行为数据计算人群分类 */object Classify { /**  * @param args 命令行参数,第一个参数为行为数据开始日期,第二个为结束日期,例如20160118  */ def main(args: Array[String]) {  // 命令行参数个数必须为2  if (args.length != 2) {   System.err.println("参数个数错误")   System.err.println("Usage: Classify <开始日期> <结束日期>")   System.exit(1)  }  // 获取命令行参数中的行为数据起止日期  val startDate = args(0)  val endDate  = args(1)  // 根据起止日志获取日期列表  // 例如起止时间为20160118,20160120,那么日期列表为(20160118,20160119,20160120)  val dateSet = Htime.getDateSet(startDate, endDate)  // Spark上下文  val sparkConf = new SparkConf().setAppName("Classify")  val sc = new SparkContext(sparkConf)  // 初始化HBase配置  val conf = HBaseConfiguration.create()  // 按照日期列表读出多个RDD存在一个Set中,再用SparkContext.union()合并成一个RDD  var rddSet: Set[RDD[(ImmutableBytesWritable, Result)] ] = Set()  dateSet.foreach(date => {   conf.set(TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // 设置表名   val bRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],    classOf[org.apache.hadoop.hbase.client.Result])   rddSet += bRdd  })    val behavRdd = sc.union(rddSet.toSeq)    behavRdd.collect().foreach(println) }}