你的位置:首页 > 数据库

[数据库]hive on spark


背景

何为hive on spark

hive现在已经成为各大公司进行大数据分析处理的宠儿了。由于hive对SQL语法更全面的支持,数据分析人员可以很容易地把原有数据分析应用方便地迁移到hive上并进行大数据量的计算。Hive最初的计算引擎为MapReduce,受限于其自身的Map+Reduce计算过程中是落地到磁盘中的,以及不够充分的大内利用,MapReduce的性能难以得到提升。所以hive on spark 也应运而生。那么何为hive on spark?简单来说就是把Spark作为Hive的一个计算引擎,将Hive的 查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供 了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

hive 的实现过程

在介绍Hive on Spark的具体设计之前,先简单介绍一下Hive的工作原理,以便于大家理解如何把Spark作为新的计算引擎供给Hive使用。

  • 语法分析阶段,Hive将用户提交的SQL语句解析成一棵抽象语法树(Abstract Syntax Tree,AST)。
  • 生成逻辑计划阶段,获取相关的元数据,以及对AST进行语义分析,得到一棵由Hive操作符组成的树(Operator Tree)
  • 逻辑优化阶段,这个阶段的优化不针对特定的计算引擎比如谓词下推就是一个逻辑优化:尽早的对底层数据进行过滤以减少后续需要处理的数据量。
  • 生成物理计划阶段,针对不同的引擎,将Operator Tree划分为若干个Task,并按照依赖关系生成一棵Task的树
  • 物理优化阶段,根据各计算引擎根据自身的特点,对Task Tree进行优化
  • 最后按照依赖关系,依次执行Task Tree中的各个Task,并将结果返回给用户。每个Task按照不同的实现,会把任务提交到不同的计算引擎上执行。

总结一下,就是hive把sql解析成AST,再生成Operator Tree,针对不同的计算引擎划分成若干个task并执行

举个栗子

SparkTask的生成和执行

hive如何把spark当做计算引擎并进行计算呢?我们通过一个例子来看一下一个简单的两表JOIN查询如何被转换为SparkTask并被执行。

下图左半部分展示了这个查询的Operator Tree,以及该Operator Tree如何被转化成SparkTask;右半部分展示了该SparkTask执行时如何得到最终的RDD并通过foreachAsync提交Spark 任务。

首先明白几个概念:

Hive操作符

Hive对表数据的处理逻辑,比如对表进行扫描的TableScanOperator,对表做Group的GroupByOperator等。

使用Hive原语

这 里主要是指使用Hive的操作符对数据进行处理。Spark为RDD提供了一系列的转换(Transformation),其中有些转换也是面向SQL 的,如groupByKey、join等。但如果使用这些转换(就如Shark所做的那样),就意味着我们要重新实现一些Hive已有的功能;而且当 Hive增加新的功能时,我们需要相应地修改Hive on Spark模式。有鉴于此,我们选择将Hive的操作符包装为Function,然后应用到RDD上。这样,我们只需要依赖较少的几种RDD的转换,而主 要的计算逻辑仍由Hive提供。由于使用了Hive的原语,因此我们需要显式地调用一些Transformation来实现Shuffle的功能。下表中列举了Hive on Spark使用的所有转换

物理执行计划

通 过SparkCompiler将Operator Tree转换为Task Tree,其中需要提交给Spark执行的任务即为SparkTask。不同于MapReduce中Map+Reduce的两阶段执行模式,Spark采 用DAG执行模式,因此一个SparkTask包含了一个表示RDD转换的DAG,我们将这个DAG包装为SparkWork。执行SparkTask 时,就根据SparkWork所表示的DAG计算出最终的RDD,然后通过RDD的foreachAsync来触发运算。使用foreachAsync是 因为我们使用了Hive原语,因此不需要RDD返回结果;此外foreachAsync异步提交任务便于我们对任务进行监控。

开始说明那个栗子

SparkCompiler 遍历Operator Tree,将其划分为不同的MapWork和ReduceWork。MapWork为根节点,总是由TableScanOperator(Hive中对表 进行扫描的操作符)开始;后续的Work均为ReduceWork。ReduceSinkOperator(Hive中进行Shuffle输出的操作符) 用来标记两个Work之间的界线,出现ReduceSinkOperator表示当前Work到下一个Work之间的数据需要进行Shuffle。因此, 当我们发现ReduceSinkOperator时,就会创建一个新的ReduceWork并作为当前Work的子节点。包含了 FileSinkOperator(Hive中将结果输出到文件的操作符)的Work为叶子节点。与MapReduce最大的不同在于,我们并不要求 ReduceWork一定是叶子节点,即ReduceWork之后可以链接更多的ReduceWork,并在同一个SparkTask中执行。

从 该图可以看出,这个查询的Operator Tree被转化成了两个MapWork和一个ReduceWork。在执行SparkTask时,首先根据MapWork来生成最底层的 HadoopRDD,然后将各个MapWork和ReduceWork包装成Function应用到RDD上。在有依赖的Work之间,需要显式地调用 Shuffle转换,具体选用哪种Shuffle则要根据查询的类型来确定。另外,由于这个例子涉及多表查询,因此在Shuffle之前还要对RDD进行 Union。经过这一系列转换后,得到最终的RDD,并通过foreachAsync提交到Spark集群上进行计算。

使用hive on spark

一般来说除了hadoop平台的开发人员,我们更多的是学会去使用hive on spark这个工具,而如果你已经用过hive,那么对你来说hive on spark的使用是小菜一碟:Hive 的用户可以通过hive.execution.engine来设置计算引擎,目前该参数可选的值为mr和tez。为了实现Hive on Spark,我们将spark作为该参数的第三个选项。要开启Hive on Spark模式,用户仅需将这个参数设置为spark即可。怎么样简单吧,当然首先你的spark集群和hive工具要先准备好,接下来的文章我会对这些一一介绍~