DataSkew 数据倾斜
1. Hive 里的数据倾斜
1.1 null值 空值
尽量提前过滤,或者把无效值替换成随机字符串,比如 if(a is null or trim(a) = "" or trim(a) = "NULL", rand(1)+"")
,由于是无效值所以不影响最终的取数,但是因为把关联字段打散了,所以就不会把所有的key发送到同一个reduce上,但是需要控制rand(1)的数据范围,小数点位数过多会导致分配的reduce太多,也会影响最终效率。
打散key的方式同样适用于MapReduce和Spark。
1.2 小文件倾斜
这一点在 Hive 里主要表现在模型汇总程度越高,数据量越小,如果再带有分区,那么在HDFS里就会有很多小文件。HDFS的文件元信息,包括位置、大小、分块信息都保存在NameNode的内存中,文件越多占的内存越大,所以读取小文件就会消耗时间。还有一种产生的可能是MapReduce的reduce数量增加,也会增加输出结果文件的数量,从而导致小文件过多。
最后这种情况主要针对的是 MapReduce。当然Hive也可以通过配置项来尽可能减少reduce输出的小文件。
解决方法有两种:
- map输入端的小文件合并
1 | -- 每个Map最大输入大小,决定合并后的文件数 |
- reduce输出端的小文件合并
1 | hive.merge.mapfiles=True True时在map输出后合并文件,默认true |
1.3 mapjoin 和 streamtable
这两个类似于注解一样,放在 select 的后面即可
1.3.1 mapjoin
适合于:N个较大的表,一个较小的表
mapjoin 会把指定的小表在客户端打成一个哈希表序列化文件的压缩包,通过分布式缓存均匀分发到作业执行的每一个结点上。然后在结点上进行解压,在内存中完成关联,有一个超级小表的情况下,目前hive能做到自动判断并转为map join。Map Join全过程不会使用Reduce,非常均匀,不会存在数据倾斜问题
1 | set hive.auto.convert.join = true; # 默认为false |
1.3.2 streamtable
适合于:N个较小的表,一个超大的表
join一般都是在reduce阶段完成的,因为在map阶段无法使同样key值的分在一个map上。
而在reduce阶段的join,hive默认把左表数据(小表)放在缓存中,右表数据(大表)放入到磁盘中,然后把内存中的表的数据逐条与硬盘上user表的数据做Join。
通过/*+ streamtable(大表表名) */
来指定大表。
最好每次写join时,小表放左边,大表放右边。
1.4 单纯的多对多
主键拼接一个rand(1),放入不同的reduce,group by 的时候把真正的主键做一个截取即可
2. MapReduce 中的数据倾斜
2.1 小文件倾斜
原理和Hive是一样的,太多的小文件,导致在ApplicationMaster在向ResourceManager去申请资源时,要频繁访问HDFS的NameNode,再去DataNode拿数据,准备数据时间太长。
所以可以在 map 端开始之前先把小文件进行合并,再传递给MapReduce。比如客户端产生数据文件时进行合并,或者文件数据进入map之前通过继承CombineFileInputFormat类实现小文件的合并。job.setInputFormatClass(CombineTextInputFormat.class),然后在代码里的Configuration中设置切分的块的最小值最大值等等,在同一节点上的数据块会合并,超过最大值就生产新的分片。
hadoopConf.set(“mapreduce.input.fileinputformat.split.maxsize”, “512000000”)
hadoopConf.set(“mapreduce.input.fileinputformat.split.minsize”, “268435456”)
2.2 key 值倾斜
同一个key的数据太多,导致全部传递到一个reduce上,导致其中一个reduce运行时间过长。
可以先创建一个类,通过 extends Partitioner 来重写 int getPartition()
方法,将key先都拼接一个rand()随机数,但是这个随机数应该和最多能启动起来的reduceNum个数是一样的(感觉是这样,随机数不能太多不一样的值,不然会传递到太多的reduce上,虽然每个reduceTask上的数据相对少了,但是启动过多的reduceTask也会增加时间),然后再创建一个类,通过 extends WritableComparator 来重写 public int compare(WritableComparable a, WritableComparable b)
方法,reduce 会自动调用 WritableComparator 类中的 int compare
方法,来判断循环的key是否和前一个是同一个来做累加,所以如果修改了partition分区方法,一般都是需要再覆写 WritableComparator 类的。
3. Spark 中的数据倾斜
数据源数据文件不均匀
SparkContext.textFile(“”) 读取文件数据的时候底层实现是通过Hadoop读取文件的方式
在 Hadoop 读取文件时,会通过
getPartitions()
方法来获取分区,在这个方法里会调用 InputFormat 这个抽象类的getSplits()
方法,这个方法的具体实现是在 FileInputFormat 类中对getSplits()
方法进行覆写。如何去切分文件数据的思路是:- inputPath 如果是一个文件夹的话,遍历里面的所有文件,累加所有文件的size作为totalSize
- 求一个平均文件大小
goalSize = totalSize / numSplits; // numSplits 指的是 min(想要的分区个数,默认分区个数2个) 在spark里面是这样的,mr可能是读的配置
while(fileSize / goalSize > 1.1L){fileSize -= goalSize}
如果当前文件的大小超过平均大小的1.1倍,那么就从平均大小对应的offset那里进行切分文件- 综上,如果文件不可切分,那么一个分区就是那一整个不可切分文件;如果文件可切分,计算出一个理想文件大小,然后依次判断文件大小和这个理想文件大小的关系,幅度在1.1倍之内的都可以接受,否则就需要切割。
每一个partition都会是一个task,所以如果文件不可分割,那么就会input一个特别大的文件,造成倾斜
适用场景:对于数据源单个spark input read数据量过大,或者单个task 相对于其他task spark input read较大的情况,读取数据源明显不均匀
解决方式:尽量使用可切割的文本存储,生成尽量多的task进行并行计算
优点:从数据源避免倾斜,并且从源头增大并行度,避免倾斜
缺点:需要改造数据源,支持可切割计算过程中key的分布不均(Shuffle导致的数据不均,在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜)
- 单个rdd中进行groupby的时候key分布不均
groupby(customerPart)
map(key+rand()).reducebykey().reducebykey() 执行两次 - 多个rdd进行join过程中key的不均匀
- 一个大表和一个小表的时候,将较小的RDD中的数据存到一个Broadcast变量(即广播变量)中,先广播到各个节点中,即在每个节点的内存中缓存一份,然后对大表RDD进行map算子,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。需要调整一下广播的阈值。
- 单个rdd中进行groupby的时候key分布不均