在 MaxCompute/ODPS 中,经常使用 ROW_NUMBER() OVER(PARTITION BY ...) + WHERE rk = 1 的模式来进行分组取第一条记录的操作。但当分区键存在数据倾斜(某些 key 出现次数过多)时,会导致单个 reducer 处理数据量过大,造成长尾任务,严重影响性能。
sqlSELECT
* EXCEPT(rk)
FROM
(
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY url ORDER BY IF(anchor_text != '', 1, 2)) AS rk
FROM
source_table
)
WHERE
rk = 1
性能问题原因:
MaxCompute 支持 MAX_BY(value, order_key) / MIN_BY(value, order_key) 聚合函数,可以直接获取满足排序条件的行,避免 ROW_NUMBER 的排序开销。
优化前:
sqlSELECT
source_host,
source_url,
url,
url_analyze(url, False) AS url_analyze_result,
url_from,
anchor_text,
source_biz_code,
source_parse_lang,
refer_num,
source_html_classification_tag
FROM
(
SELECT
source_host,
source_url,
url,
url_from,
anchor_text,
source_biz_code,
source_parse_lang,
source_html_classification_tag,
COUNT(1) OVER(PARTITION BY url) AS refer_num,
ROW_NUMBER() OVER(PARTITION BY url ORDER BY IF(anchor_text != '', 1, 2)) AS rk
FROM source_table
)
WHERE rk = 1
优化后:
sqlSELECT
MAX_BY(source_host, IF(anchor_text != '', 1, 0)) AS source_host,
MAX_BY(source_url, IF(anchor_text != '', 1, 0)) AS source_url,
url,
url_analyze(url, False) AS url_analyze_result,
MAX_BY(url_from, IF(anchor_text != '', 1, 0)) AS url_from,
MAX(IF(anchor_text != '', anchor_text, '')) AS anchor_text,
MAX_BY(source_biz_code, IF(anchor_text != '', 1, 0)) AS source_biz_code,
MAX_BY(source_parse_lang, IF(anchor_text != '', 1, 0)) AS source_parse_lang,
COUNT(1) AS refer_num,
MAX_BY(source_html_classification_tag, IF(anchor_text != '', 1, 0)) AS source_html_classification_tag
FROM source_table
GROUP BY url
当只需要去重、不关心取哪一条时,使用 ANY_VALUE 更简单:
优化前:
sqlSELECT * EXCEPT(rk)
FROM (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY url) AS rk
FROM source_table
)
WHERE rk = 1
优化后:
sqlSELECT
ANY_VALUE(host) AS host,
url,
ANY_VALUE(url_analyze_result) AS url_analyze_result,
ANY_VALUE(anchor_text) AS anchor_text
-- ... 其他字段
FROM source_table
GROUP BY url
当排序条件复杂时(如 ORDER BY insert_date ASC, weight DESC, type DESC),可以构造字符串排序键:
优化前:
sqlSELECT * EXCEPT(rk)
FROM (
SELECT
*,
ROW_NUMBER() OVER(
PARTITION BY url
ORDER BY insert_date, weight DESC, IF(outlink_url_from='content', 2, 1) DESC
) AS rk
FROM source_table
)
WHERE rk = 1
优化后:
sqlSELECT
url,
-- 构造排序键:insert_date ASC, weight DESC (用100-weight转升序), content优先
MIN_BY(host, CONCAT(insert_date, LPAD(CAST(100-weight AS STRING), 3, '0'), IF(outlink_url_from='content', '1', '2'))) AS host,
MIN_BY(url_type, CONCAT(insert_date, LPAD(CAST(100-weight AS STRING), 3, '0'), IF(outlink_url_from='content', '1', '2'))) AS url_type,
-- ... 其他字段类似
FROM source_table
GROUP BY url
如果 MAX_BY 不可用,可以使用 STRUCT 排序:
sqlSELECT
url,
MAX(STRUCT(
IF(anchor_text != '', 1, 0), -- 排序键
source_host,
source_url,
url_from,
anchor_text
)).col2 AS source_host,
-- col3, col4, col5 获取其他字段
FROM source_table
GROUP BY url
sql-- 开启 Group By 倾斜优化
set odps.sql.groupby.skewindata=true;
-- 开启 Join 倾斜优化
set odps.sql.skewjoin=true;
-- 调整 reducer 数量和内存
set odps.stage.reducer.num=10000;
set odps.stage.reducer.mem=4096;
优化前日志:
优化前执行时间:
优化后日志:
优化后执行时间:
优化前日志:
优化前执行时间:
优化后日志:
优化后执行时间:
| 原写法 | 优化写法 | 适用场景 |
|---|---|---|
ROW_NUMBER() + WHERE rk=1 | GROUP BY + MAX_BY/MIN_BY | 需要按条件取第一条 |
ROW_NUMBER() + WHERE rk=1 | GROUP BY + ANY_VALUE | 只需去重,不关心取哪条 |
| 复杂多字段排序 | 构造字符串排序键 + MIN_BY | 多字段混合升降序 |
核心优势:
odps.sql.groupby.skewindata=true 可自动处理数据倾斜本文作者:ender
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!