编辑
2026-01-12
数仓
0
请注意,本文编写于 122 天前,最后修改于 121 天前,其中某些信息可能已经过时。
  • 一共需要分938868个动态分区(文件夹)
  • odps直接写入 一直显示SQLTask is updating meta information
  • 进行拆分,进过测试写入1w个分区 时间短 也好把控
python
##@resource_reference{"lark_utils.py"} import sys,os sys.path.append(os.path.dirname(os.path.abspath('lark_utils.py'))) from lark_utils import * token = get_tenant_access_token() TARGET_CHAT = 'oc_9923c92de5b3c17952b1be2281107cc4' import datetime import traceback import time def run_sql(sql,hints ={}, title = '循环跑数', is_notice_success=False, is_finish = False): if is_finish: content_body = { "zh_cn": { "title": "{title} 任务运行成功", "content": [ [ { "tag": "text", "text": '恭喜' } ] ] } } token = get_tenant_access_token() send_full_msg(token, 'post', content_body, 'chat_id', TARGET_CHAT) return True time_start = time.time() msg = '' try: _run_sql(sql,hints=hints) except: msg = traceback.format_exc() time_end = time.time() if not is_notice_success and msg == '': return True if msg =='' else False content_body = { "zh_cn": { "title": f"{title} 任务运行{'成功' if msg =='' else '失败'}", "content": [ [ { "tag": "text", "text": f'耗时:{time_end - time_start}\n{msg}' } ] ] } } token = get_tenant_access_token() send_full_msg(token, 'post', content_body, 'chat_id', TARGET_CHAT) return True if msg =='' else False def _run_sql(sql,hints): instance = o.run_sql(sql, hints=hints) logview = instance.get_logview_address() print(logview) instance.wait_for_success() start_num = 0 end_num = 39999 # 938868 size = 10000 for i in range(start_num, end_num, size): print(i+1,i+size) delete_sql = 'drop table if exists dwd_crawler_sites_host_crawled_detail_di21;' _run_sql(delete_sql, {}) create_sql = ''' CREATE EXTERNAL TABLE dwd_crawler_sites_host_crawled_detail_di21 ( host_url string, url string, inlink_list array<string> ) partitioned BY (batchid string,host string) stored AS parquet location 'oss://oss-cn-shanghai-internal.aliyuncs.com/xxx/data_delivery/data_type=host_crawled_detail/' tblproperties( 'mcfed.parquet.block.row.count.limit' = '100', 'mcfed.parquet.page.size.row.check.max' = '100', 'mcfed.parquet.page.size.row.check.min' = '100') ''' _run_sql(create_sql, {}) sql = f''' INSERT OVERWRITE TABLE dwd_crawler_sites_host_crawled_detail_di21 PARTITION(batchid,host) SELECT a.host_url, a.url, a.inlink_list, 'xxx' AS batchid, host FROM dwd_crawler_shennai_1210_data_tmp_15 a WHERE rk BETWEEN {i+1} AND {i+size} ''' hints = { 'odps.instance.priority': 0, 'odps.isolation.session.enable': True, 'odps.stage.mapper.mem': 512, 'odps.stage.mapper.split.size': 20, 'odps.sql.unstructured.oss.commit.mode': True, 'odps.service.mode': 'off', 'odps.sql.reshuffle.dynamicpt': True, 'odps.adaptive.shuffle.desired.partition.size': 50, } res = run_sql(sql,hints = hints, title = f'sql运行-{i+1}', is_notice_success=False) if not res: sys.exit(1)

本文作者:ender

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!