Hive-性能优化

By timebusker on December 19, 2017
Fetch抓取

Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。 例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。

  • hive.fetch.task.conversion 2.X版本默认是more,1.X版本默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

    • none: disable hive.fetch.task.conversion
    • minimal: SELECT STAR, FILTER on partition columns, LIMIT only
    • more: SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
本地模式

Hive 在集群上查询时,默认是在集群上N台机器上运行, 需要多个机器进行协调运行,这 个方式很好地解决了大数据量的查询问题。 但是当 Hive 查询处理的数据量比较小时,其实没有必要启动分布式模式去执行,因为以分布式方式执行就涉及到跨网络传输、多节点协调等, 并且消耗资源。这个时间可以只使用本地模式来执行mapreduce job,只在一台机器上执行,速度会很快。启动本地模式涉及到三个参数:

hive本地模式

hive.exec.mode.local.autotrue时,启用hive自动判断是否启动本地模式的开关,但同时还需配合其他两个参数使用。

压缩存储
  • 合理利用文件存储格式 创建表时,尽量使用 orc、parquet 这些列式存储格式,因为列式存储的表,每一列的数据在物理上是存储在一起的, Hive查询时会只遍历需要列数据,大大减少处理的数据量。

  • 压缩的原因 Hive最终是转为MapReduce程序来执行的,而MapReduce的性能瓶颈在于网络IO磁盘IO,要解决性能瓶颈,最主要的是减少数据量, 对数据进行压缩是个好的方式。压缩虽然是减少了数据量,但是压缩过程要消耗CPU的,但是在Hadoop中, 往往性能瓶颈不在于CPU,CPU压力并不大,所以压缩充分利用了比较空闲的CPU

  • 常用压缩方法对比

压缩存储 压缩存储

  • 压缩方式的选择

压缩比率 --> 压缩解压缩速度 --> 是否支持 Split

Job输出文件按照blockGZip的方式进行压缩:

// 默认值是 false
set mapreduce.output.fileoutputformat.compress=true
// 默认值是 Record
set mapreduce.output.fileoutputformat.compress.type=BLOCK
// 默认值是 org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec

Map输出结果也以Gzip进行压缩:

// 默认值是 false,不压缩
set mapred.map.output.compress=true
// 默认值是 org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec

对 Hive 输出结果和中间都进行压缩:

// 默认值是 false,不压缩
set hive.exec.compress.output=true
// 默认值是 false,为 true 时 MR 设置的压缩才启用
set hive.exec.compress.intermediate=true
``

##### 表的优化

- 小表、大表Join--mapjoin

将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,
可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。`小表放在左边和右边已经没有明显区别。`

- MapJoin
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join。
即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

> 开启MapJoin参数设置:`set hive.auto.convert.join = true;`

> 大表小表的阀值设置:`set hive.mapjoin.smalltable.filesize=25000000;(25M)`

![MapJoin](img/hive/8.png)

首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。

接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。

由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。

- 笛卡尔积
`尽量避免笛卡尔积`,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积

当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积, 这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。

- 动态分区调整
关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。

开启动态分区功能(默认true,开启)

hive.exec.dynamic.partition=true

设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)

hive.exec.dynamic.partition.mode=nonstrict

在所有执行MR的节点上,最大一共可以创建多少个动态分区。

hive.exec.max.dynamic.partitions=1000


##### 数据倾斜
- 小文件进行合并
在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能
(系统默认的格式)。HiveInputFormat没有对小文件合并功能。

set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件 set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件 set hive.merge.size.per.task = 25610001000 ##合并文件的大小 set mapred.max.split.size=256000000; ##每个 Map 最大分割大小 set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行Map前进行小文件合并

针对SparkSQL小文件,降低任务运行的并行任务数(默认:200),根据实际情况,配置为worker节点的整数倍。

–conf spark.sql.shuffle.partitions=5
–conf spark.default.parallelism=5


- 复杂文件增加Map数
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,
从而提高任务的执行效率。

增加map的方法为:根据`computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M`公式,
调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。

`set mapreduce.input.fileinputformat.split.maxsize=100;`

- 并行执行
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。
或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,
而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。

通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,
需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。

set hive.exec.parallel=true; //打开任务并行执行 set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。 ```