Hive的数据倾斜解决方案

By timebusker on January 9, 2018
什么是数据倾斜

由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点

Hadoop框架的特性
  • 不怕数据大,怕数据倾斜
  • Jobs 数比较多的作业运行效率相对比较低,如子查询比较多
  • sum,count,max,min 等聚集函数,通常不会有数据倾斜问题
主要表现

任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大 于平均时长。

容易数据倾斜情况
  • 小表关联超大表join
  • 大表关联大表join,但key比较集中到一个值上,如null
  • count(distinct),在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by字段分组,按distinct字段排序
  • group by,数据量大,维度太小
空值产生的数据倾斜

在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的 user_id 相关联,就会碰到数据倾斜的问题。

** 方法 1:user_id为空的不参与join关联 **

select * from log a join user b on a.user_id is not null and a.user_id = b.user_id
union all
select * from log c where c.user_id is null;

** 方法2:赋予空值新的随机key值 **

select * from log a left outer join user b on
case when a.user_id is null then concat('hive',rand()) else a.user_id end = b.user_id

方法 2 比方法 1 效率更好,不但 IO 少了,而且作业数也少了,方案 1 中,log 表 读了两次,jobs 肯定是 2,而方案 2 是 1。这个优化适合无效 id(比如-99,’’,null)产 生的数据倾斜,把空值的 key 变 成一个字符串加上一个随机数,就能把造成数据倾斜的 数据分到不同的 reduce 上解决数据倾斜的问题。 改变之处:使本身为 null 的所有记录不会拥挤在同一个 reduceTask 了,会由于有替代的 随机字符串值,而分散到了多个 reduceTask 中了,由于 null 值关联不上,处理后并不影响最终结果。

不同数据类型关联产生数据倾斜

用户表中user_id字段为int,log 表中user_id为既有string也有int的类型, 当按照两个表的 user_id 进行 join 操作的时候,默认的hash操作会按照int类型的id进行分配, 这样就会导致所有的string类型的id就被分到同一个reducer当中

** 统一数据类型 **

select * from user a left outer join log b on b.user_id = cast(a.user_id as string)
大表与小表关联查询产生数据倾斜(目前版本已做足够好的优化)

使用map join解决小表关联大表造成的数据倾斜问题。这个方法使用的频率很高。

map join 概念:将其中做连接的小表(全量数据)分发到所有 MapTask 端进行 Join,从 而避免了 reduceTask,前提要求是内存足以装下该全量数据。

# /* +mapjoin(内存中加载小表) */
select /* +mapjoin(a) */ a.id aid, a.name, b.age from a join b on a.id = b.id;

hive0.11版本以后会自动开启 map join 优化,由两个参数控制:

// 设置 MapJoin 优化自动开启
set hive.auto.convert.join=true; 

// 设置小表不超过多大时开启 mapjoin 优化
set hive.mapjoin.smalltable.filesize=25000000 
大表与大表之间数据倾斜

业务逻辑的拆分

大表与大表之间数据倾斜主要原因还是某一个key比较集中导致,此时可以参考大表拆分小表,把大表中集中的key取出作为小表,使用 map join。

集群任务调优

针对集群运行中可能存在数据倾斜的任务进行集群任务参数优化配置。

  • 开启map端部分聚合功能
hive.map.aggr=true

将key相同的归到一起,减少数据量,这样就可以相对地减少进入reduce的数据量,在一定程度上可以提高性能,当然,如果数据的减少量微乎其微,那对性能的影响几乎没啥变。

  • 任务负载均衡
hive.groupby.skewindata=true 

如果发生了数据倾斜就可以通过它来进行负载均衡。当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中, 每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的; 第二个 MR Job 再根据预处理的数据结果按照Key 分布到 Reduce 中(这个过程是按照key的hash值进行分区的,不同于mr job1的随机分配, 这次可以保证相同的Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。所以它主要就是先通过第一个mr job将key随机分配到reduce, 使得会造成数据倾斜的key可能被分配到不同的reduce上,从而达到负载均衡的目的。到第二个mr job中,因为第一个mr job已经在reduce中对这些数据进行了部分聚合。

就像单词统计的例子,a这个字母在不同的reduce中,已经算出它在每个reduce中的个数,但是最终的总的个数还没算出来,那么就将它传到第二个mr job,这样就可以得到总的单词个数,所以这里直接进行最后的聚合就可以了。

  • 任务数据量控制

位/比特(bit)–>字节(byte)–>千字节(kb)–>兆字节(mb)–>吉字节(gb)–>太字节(tb)–>拍字节(pb)

hive.exec.reducers.bytes.per.reducer=1024*1024*1024 (单位是字节,1GM)

控制每个reduce任务计算的数据量。

  • 任务数量控制
hive.exec.reducers.max=999

最大可以开启的reduce个数,默认是999个

在只配了hive.exec.reducers.bytes.per.reducer以及hive.exec.reducers.max的情况下,实际的reduce个数会根据实际的数据总量/每个reduce处理的数据量来决定。

  • 指定reduce任务数
mapred.reduce.tasks=-1

实际运行的reduce个数,默认是-1,会自动根据集群参数计算任务数,但如果在此指定了,那么就不会通过实际的总数据量/hive.exec.reducers.bytes.per.reducer来决定reduce的个数