数据倾斜的各种原因及处理方案
数据倾斜的本质是Shuffle 过程中 key 分布极度不均导致个别 Task 处理的数据量远超其他 Task成为整个作业的短板。一、业务数据本身分布不均热点 Key例子搜索日志中统计每个搜索词的点击量像热门词如“天气”、“淘宝”每天上亿次搜索而其他的只有几十次。处理1. 加盐两阶段聚合人为将同一个热点 Key 拆成多个“子 Key”使它们分散到不同的 Reduce 分区先局部聚合然后再将各局部结果合并为最终结果。局部聚合加盐分散 全局聚合-- 第一阶段加盐局部聚合 WITH salted_agg AS ( select salted_query, COUNT(1) AS local_cnt from ( SELECT -- 热点词加随机后缀 (0~99)非热点词保持不变 CASE WHEN query IN (天气, 淘宝) THEN CONCAT(query, _, CAST(FLOOR(RAND() * 100) AS INT)) ELSE query END AS salted_query FROM search_log ) as tmp group by salted_query ) -- 第二阶段去后缀全局聚合 SELECT CASE WHEN salted_query LIKE %\\_% THEN split(salted_query,_)[0] ELSE salted_query END AS query, SUM(local_cnt) AS total_click FROM salted_agg GROUP BY CASE WHEN salted_query LIKE %\\_% THEN split(salted_query,_)[0] ELSE salted_query END;2. Combiner在 Map 端预先聚合减少传输量但不能根治倾斜。3. 优化 Reduce 端内存与并行度增大 Reduce Task 的内存堆空间、提高 shuffle 缓冲区比例、增加并行拷贝线程数等只能让倾斜任务跑得“不那么容易 OOM”但无法改变它处理的数据量是别的 Task 几十倍的事实所以是辅助手段。二、Join 时连接键严重偏斜例子订单表10 亿行关联用户表其中用户user_id10086拥有 5 亿条订单其余用户只有几十条。处理1. 小表广播大表join小表如果是大表join小表的情况可以将小表进行广播到各节点在 Map 端完成 Join消除 Shuffle 和倾斜。维度HiveMap JoinSpark SQLBroadcast Hash Join自动转换开关hive.auto.convert.jointruespark.sql.autoBroadcastJoinThreshold非负值阈值控制hive.mapjoin.smalltable.filesize(25MB)autoBroadcastJoinThreshold(10MB)手动 Hint/* MAPJOIN(t) *//* BROADCAST(t) */合并多个小表hive.auto.convert.join.noconditionaltask如果多张表的大小总和不超过阈值允许将多个 Map Join 合并成一个 Map-Only 任务进一步提升效率。每个 Join 单独广播但 AQE 可优化底层实现Map 端内存哈希表Executor 内存广播变量 哈希表2. 倾斜键分离大表 Join 大表步骤如下步骤 1识别倾斜 Key通过采样找出出现次数远超平均值的连接键。例如发现user_id 10086是倾斜 Key。步骤 2分离数据将 orders 表和 user 表中的记录按“是否包含倾斜 Key”分别过滤形成倾斜部分orders_skew (user_id10086) 和 user_skew (user_id10086)正常部分orders_normal (user_id≠10086) 和 user_normal (user_id≠10086)步骤 3正常部分常规 Join正常部分没有热点直接走普通的 Shuffle Join 或者 Map Join如果 user_normal 足够小不会产生倾斜。步骤 4倾斜部分特殊处理——加盐膨胀对于倾斜部分采用加盐的思路但针对 Join 的特点需要将小表膨胀对 user_skew 中的那条user_id10086记录复制 N 份N 如 100每份加上一个随机后缀变成user_id_salt 10086_0, 10086_1, ..., 10086_99。这样小表由 1 条膨胀为 100 条。同时对 orders_skew 中user_id10086的订单记录每条也加上 0~99 之间的随机后缀生成user_id_salt。现在两表在user_id_salt字段上 Join由于加上了随机后缀原来的 3 亿条订单被均匀打散到 100 个 Reduce 上每个 Reduce 处理约 300 万条订单和 1 条用户记录负载均衡。步骤 5合并结果将 Step 3 和 Step 4 的输出用 Union All 合并得到完整的 Join 结果。3. Hive参数调优hive.optimize.skewjointrue自动检测倾斜键将其分离并启动独立的 Task 处理。hive.skewjoin.key指定倾斜键阈值默认 100000超过的记录数视为倾斜。4. Spark AQE 自适应优化spark.sql.adaptive.enabledtrue作用开启 AQE 总开关。原理Spark 在 Shuffle 完成后不再使用固定的并行度和计划而是根据每个分区的实际数据量动态进行运行时优化包括合并小分区将多个数据量很小的分区合并成一个减少 Task 数量避免资源浪费。自动切换 Join 策略如果发现某张表的数据量在 Shuffle 后变得很小可实时将 Sort Merge Join 切换为 Broadcast Hash Join。倾斜分区拆分允许下面的skewJoin优化生效。效果无需人工调参让 Spark 根据真实数据流动态决策提高稳定性和性能。spark.sql.adaptive.skewJoin.enabledtrue作用开启 AQE 下的倾斜 Join 自动拆分优化依赖总开关已开启。原理当 Spark 检测到某个 Join 分区的大小明显超过中位数分区大小的5倍默认可配置时会将该倾斜分区拆分成多个小分区并与另一张表对应的数据分别 Join然后将结果合并。效果自动解决 Join 时由于热点键导致某个 Task 数据量过大的数据倾斜问题无需手动加盐或设计分桶表。5. 分桶 SMB JoinSort-Merge Bucket Join分桶 SMB Join 是 Hive 中专门为大表 Join 大表设计的一种极致优化手段。它的核心思想是提前把数据按连接键分桶且排序Join 时直接按桶进行归并彻底消除 Shuffle 和 Reduce 阶段。SMB Join 是一种Map-Only 操作没有 Reduce它的执行过程很直接表必须是分桶表且按 Join Key 排序建表时用CLUSTERED BY (join_key) SORTED BY (join_key) INTO N BUCKETS定义。这样每个桶内部的数据已经按连接键排好序了。两表的分桶数必须一致或成比例最常见是两个表桶数相同这样每个桶 ID 一一对应直接配对 Join。Join 时Hive 启动 Map Task每个 Map Task 会读取两张表中相同桶 ID 的文件如 orders 的 bucket 0 和 users 的 bucket 0。因为桶内数据已按 key 排序Map Task 只需将它们进行归并排序式的归并连接就像归并两个有序链表边读边匹配内存占用很低且纯顺序 I/O。示意图orders 桶 0 和 users 桶 0 → Map Task 0orders 桶 1 和 users 桶 1 → Map Task 1...没有 Shuffle没有 Reduce所有工作都在 Map 端完成。注意两张表都必须是分桶排序表且分桶键和排序键需是 Join 键。桶的数量必须相等或者一方的桶数是另一方的整数倍此时多个小桶对应一个大桶。生产上通常设为相同桶数。如果桶数相等Join 的桶列数据类型和分桶算法必须一致避免逻辑桶号不匹配。表的数据不能频繁更新/追加否则需要重新分桶排序维护。三、分组聚合Group By时 Key 不均例子按城市分组统计人口大城如北京、上海的人口是普通县城的千倍聚合时对应 Reduce 数据量巨大。处理1. Combiner 预聚合Map 端合并Combiner 在 Map 端对province, 1做局部求和输出province, 局部总数。这样每个 Map Task 只发少量汇总记录给 Reducer大幅减少 Shuffle 数据量和倾斜 Reducer 的输入。2. 加盐两阶段聚合第一阶段 city 加随机后缀做局部聚合第二阶段去掉后缀做全局聚合。3. Hive参数调优hive.groupby.skewindata是 Hive 中一个专门用于应对Group By 聚合操作造成的数据倾斜的优化参数。当它被设置为true默认是false时Hive 会自动将存在倾斜的聚合操作拆分成两个 MapReduce Job来执行通过“分步聚合”的方式平衡 Reduce 端的负载4.Spark AQE 自适应优化spark.sql.adaptive.enabledtrue开启 AQE 总开关。四、去重/计数Distinct产生的倾斜例子SELECT COUNT(DISTINCT user_id) FROM orders;在大表上执行所有同一 user_id 的去重任务会聚集到一个 Reduce数据量大时严重倾斜。处理1. 改写为 Group ByCOUNT(DISTINCT col)之所以能通过改写为GROUP BY解决倾斜核心原因在于执行计划的本质差异前者通常限制在单任务做全局去重后者可以自然分散到多任务并行并利用 Map 端预聚合减少数据量。方式典型执行计划数据流向是否可预聚合COUNT(DISTINCT col)所有数据汇集到一个 Reducer严格去重后输出总条数全量数据单点汇集否去重必须看到全部数据SELECT COUNT(*) FROM (SELECT col FROM t GROUP BY col)Map 端先局部去重组合并Shuffle 到多个 Reducer 做全局分组最后再统计分组个数数据分散到多 Reducer是Map 端 Combiner 可局部去重2. 加盐两阶段聚合同前面局部聚合加盐分散 全局聚合3. Hive参数调优hive.optimize.distinct.rewritetrue(Hive 1.2.0)这个参数是为了解决早期版本中COUNT(DISTINCT)将所有数据拉到单个 Reducer 导致性能瓶颈的问题。作用它会对查询逻辑进行重写效果类似于手动将COUNT(DISTINCT col)改写为SELECT COUNT(*) FROM (SELECT col FROM table GROUP BY col) t。↔️分组场景如果GROUP BY字段的基数很高查询会被拆分为多个 MapReduce 任务虽然能避免单点瓶颈但也可能因任务增加而引入额外开销。✅适用场景主要针对没有 Group By或分组粒度较粗的查询可以有效缓解单Reducer过载问题。hive.optimize.countdistincttrue(Hive 3.0.0默认开启)这是Hive 3.0引入的更高级优化是对前者的补充和增强。作用它会根据数据量和基数信息进行基于成本的优化自动选择最优执行计划。即使原始查询中有多个count(distinct col)它也能智能地进行多次重写和聚合。多维优化能够处理多列去重、多Grouping Sets等复杂场景通过多级聚合模式实现比单一重写规则更好的优化效果。兼容性已知在部分版本中同时开启hive.optimize.countdistinct与hive.groupby.skewindata可能导致结果错误如果遇到问题可考虑关闭其中一个参数。4. Spark AQE自适应优化spark.sql.adaptive.skewJoin.enabledtrue5.approx_count_distinct(近似函数)业务允许误差如1-2%时使用approx_count_distinct()完全避开 Shuffle 和倾斜在 Hive 和 Spark 中均适用且性能极佳。五、大量空值或脏数据引起倾斜例子日志中user_id字段很多为 NULL导致所有 NULL 记录进入同一 Reduce数据量巨大。处理1. 直接过滤空值或脏数据若业务允许直接过滤掉空值或脏数据。2.加盐两阶段聚合若还是需要这部分数据还是采用加盐两阶段聚合的方式。六、总结口诀能过滤的直接过滤空值、脏数据能预聚合的先 Combiner求和、计数小表直接广播Map Join 消除 Shuffle大表加盐打散两阶段聚合、倾斜键分离膨胀引擎有自动优化的优先用Hive skewindata/skewjoin、Spark AQE