Hive优化器原理与源码解析—统计信息Parallelism并行度计算

目录

背景

Parallelism并行度

  • Hive执行计划Stage类型
  • PhaseTransition过渡阶段判断
  • SplitCount拆分数
  • Repartition重新分区数

总结

背景

Parallelism是有关RelNode关系表达式的并行度以及如何将其Opeartor运算符分配给具有独立资源池的进程的元数据。同一个Operator操作符,并行执行和串性执行相比,在成本优化器CBO看来,并行执行的成本更低。

从并行性的概念来来讲,就是将大任务划分为较小的任务,其中每个小任务被分配分配给特定处理器,以完成部分主要任务。最后,从每个小任务中获得的部分结果将合并为一个最终结果。与串行执行的一个大任务相比,并行执行多个任务可以获得性能大幅度提升!

在Hive中,Parallelism并行度计算,除了参数指定,CPU cores硬件限制,Operator算法是否可以并行执行等因素的影响,主要与如TableScan、Sort、Join等等Operator的数据大小的拆分个数splitCount计算有关。

Parallelism并行度

讲述并行度之前先熟悉执行计划中Stage划分、Phase阶段定义和PhaseTransition过渡阶段判断的定义。

代码语言:javascript
复制
      isPhaseTransition方法返回一个实现了RelNode的物理操作符Operator相对于其输入RelNode是否属于不同的进程。在查询管道中,在一个特定Stage中,处理所有拆分Split的操作符Operators集合,称为Phase阶段。

一个Phase从一个叶子节点如TableScan或phase变换节点如Exchange开始,如Hadoop的shuffle操作符跨网络发送数据是一种Sort-Exchange的形式。

  • Hive执行计划Stage类型

在优化HiveQL时,都会查看执行计划,这些信息含有开头Stage依赖信息说明,操作符树,统计信息记录数、数据大小等,如图

那么这些Stage大致分为几类:

  • MAP/REDUCE STAGE
  • FILESYSTEM文件系统MOVE或RENAME操作STAGE。
  • METASTORE STAGE 元存储,统计信息收集操作等 等等

MAP/REDUCE STAGE里还有TabelScan、Sort、Filter、Project、Aggreate、Join等等各种Oprerator操作符树构成。

METASTORE STAGE 元存储,统计信息收集操作,如上图

Stage: Stage-2 Stats-Aggr Operator

统计信息的收集设置相关参数,在参数为true的前提下,并在执行DML语句才会收集。强调的是, LOAD DATA数据的加载不会触发统计信息的收集。

hive.stats.autogather
  • Default Value: true
  • Added In: Hive 0.7 with HIVE-1361

This flag enables gathering and updating statistics automatically during Hive DML operations.

Statistics are not gathered for LOAD DATA statements.

  • PhaseTransition过渡阶段判断

判读Operator操作符的输入RelNode和自己是否跨进程,即父Operator与子Operator是否在一个相同的进程里。

1)HiveJoin是否为PhaseTransition的判断

是依据Join Operator的具体实现来判断的,不能的Join 算法会返回不同结果。

HiveDefaultCostModel的Join的isPhaseTransition默认是false。

HiveTezCostModel分为四种Join算法,每种算法都有isPhaseTransition判断方法,isPhaseTransition返回值如下

Common Join:true

Map Join:false

Bucket Map Join:false

Sort Merge Bucket Join:false

代码语言:javascript
复制
public Boolean isPhaseTransition(HiveJoin join, RelMetadataQuery mq) {
  return join.isPhaseTransition();
}

2)Sort Limit是否为PhaseTransition的判断

代码语言:javascript
复制
public Boolean isPhaseTransition(HiveSortLimit sort, RelMetadataQuery mq) {  //HiveSortLimit 默认是true
  // As Exchange operator is introduced later on, we make a
  // sort operator create a new stage for the moment
  return true;
}

3)TableScan、Values、Exchange等RelNode的PhaseTransition的判断,默认值True。

  • SplitCount拆分数

返回数据非重复拆分数,注意splits必须是非重复的,如广播broadcast方式,其每个拷贝都是相同的,所有splitCount为1。因此,split count拆分数与由每个Operator实例发送的数据成倍数关系。

Parallelism并行处理就是对Split数据进行并行处理,在不考虑硬件CPU core和参数限制等因素影响的情况下,Split拆分数就是并行任务的个数。

1)Join的SplitCount拆分个数计算

是依据Join Operator的具体实现来判断的,不能的Join 算法会返回split count。

HiveDefaultCostModel的Join的split count为1。

HiveTezCostModel分为四种Join算法Common Join、Map Join、Bucket Map Join和Sort Merge Bucket的split count计算逻辑相同:

都用HiveAlgorithmsUtil.getSplitCountWithoutRepartition(join)方法实现的,

splitCount = (总行数*平均记录大小) / maxSplitSize,其中maxSplitSize是HiveAlgorithmsConf算法配置项初始化的每个split大小的最大值。

代码语言:javascript
复制
public Integer splitCount(HiveJoin join, RelMetadataQuery mq) {
  return join.getSplitCount();
}//默认值

2)TableScan的SplitCount拆分个数计算

Hive中实现的StorageDescriptor存储类中方法,判断分桶个数,如果bucketCols分桶集合为null,则为0,否则分桶个数和分桶列集合

代码语言:javascript
复制
public List<String> getBucketCols() {
        return this.bucketCols;
    }//分桶列集合
public int getBucketColsSize() {
    return this.bucketCols == null ? 0 : this.bucketCols.size();
}

如果分桶列列表bucketCols不为null,使用getNumBuckets()获取分桶数作为splitCount拆分数。否则使用splitCountRepartition方法通过元数据统计信息计算出splitCount拆分数(splitCount为null,则抛出异常)。splitCountRepartition的计算逻辑在下文有讲解。

代码语言:javascript
复制
public Integer splitCount(HiveTableScan scan, RelMetadataQuery mq) {
  Integer splitCount;

RelOptHiveTable table = (RelOptHiveTable) scan.getTable();
List<String> bucketCols = table.getHiveTableMD().getBucketCols();//从表的元数据信息中,获取分桶的列的列表
if (bucketCols != null && !bucketCols.isEmpty()) { //如果桶列的列表为空,则取桶个数,作为拆分个数
splitCount = table.getHiveTableMD().getNumBuckets();
} else {
splitCount = splitCountRepartition(scan, mq); //否则取重新分区数,作为拆分个数
if (splitCount == null) {
throw new RuntimeException("Could not get split count for table: "
+ scan.getTable().getQualifiedName());
}
}
return splitCount;
}

3)RelNode的SplitCount拆分个数计算

首先判断此RelNode的是否为过渡阶段Phase,如果是过渡阶段Phase,则使用splitCountRepartition方法访问元数据统计信息计算拆分数(此方法在下面有介绍)。

其次,如果不是过渡阶段Phase,则遍历此RelNode的所有输入RelNode,通过RelMetadataQuery对象获取元数据统计信息splitCount并进行累加。

总SplitCount = splitCount1 + splitCount2 + splitCount3...

代码语言:javascript
复制
public Integer splitCount(RelNode rel, RelMetadataQuery mq) {
Boolean newPhase = mq.isPhaseTransition(rel);

if (newPhase == null) {
return null;
}
if (newPhase) {
// We repartition: new number of splits 从新分区,并返回重新分区数
return splitCountRepartition(rel, mq);
}
// We do not repartition: take number of splits from children
//如果不是分区的,则从子节点中获取分区,并累加返回,拆分个数
Integer splitCount = 0;
for (RelNode input : rel.getInputs()) {
splitCount += mq.splitCount(input);
}
return splitCount;
}

  • Repartition重新分区数计算

根据RelMetadataQuery对象获取指定RelNode的统计信息。记录数RowCount、平均记录大小等统计信息。

计算逻辑如下:

  • Step 1:平均记录大小AverageRowSize
  • Step 2:总行数RowCount
  • Step 3:总大小TotalSize = 每行大小 * 总行数
  • Step 4:重新分区个数splitCount = TotalSize / maxSplitSize

其中maxSplitSize是HiveRelMDParallelism的属性生成对象时需初始化的每个split大小的最大值。

代码语言:javascript
复制
public Integer splitCountRepartition(RelNode rel, RelMetadataQuery mq) {
// We repartition: new number of splits
final Double averageRowSize = mq.getAverageRowSize(rel);//通过元数据信息,获取平均行记录大小
final Double rowCount = mq.getRowCount(rel); //获取记录数
if (averageRowSize == null || rowCount == null) {
return null;
}
final Double totalSize = averageRowSize * rowCount; //计算出RelNode的总大小
final Double splitCount = totalSize / maxSplitSize; //。总大小 / 最大拆分大小 = 拆分数
return splitCount.intValue();
}

总结

在不考虑并行度参数设置和硬件的情况下,一个Operator操作符的并行度,在允许并行执行的前提下,由splitCount拆分个数决定的,上述主要讲解了几个常用Operator的并行度的计算。