SQLTask is updating meta informationpython##@resource_reference{"lark_utils.py"}
import sys,os
sys.path.append(os.path.dirname(os.path.abspath('lark_utils.py')))
from lark_utils import *
token = get_tenant_access_token()
TARGET_CHAT = 'oc_9923c92de5b3c17952b1be2281107cc4'
import datetime
import traceback
import time
def run_sql(sql,hints ={}, title = '循环跑数', is_notice_success=False, is_finish = False):
if is_finish:
content_body = {
"zh_cn": {
"title": "{title} 任务运行成功",
"content": [
[
{
"tag": "text",
"text": '恭喜'
}
]
]
}
}
token = get_tenant_access_token()
send_full_msg(token, 'post', content_body, 'chat_id', TARGET_CHAT)
return True
time_start = time.time()
msg = ''
try:
_run_sql(sql,hints=hints)
except:
msg = traceback.format_exc()
time_end = time.time()
if not is_notice_success and msg == '':
return True if msg =='' else False
content_body = {
"zh_cn": {
"title": f"{title} 任务运行{'成功' if msg =='' else '失败'}",
"content": [
[
{
"tag": "text",
"text": f'耗时:{time_end - time_start}\n{msg}'
}
]
]
}
}
token = get_tenant_access_token()
send_full_msg(token, 'post', content_body, 'chat_id', TARGET_CHAT)
return True if msg =='' else False
def _run_sql(sql,hints):
instance = o.run_sql(sql, hints=hints)
logview = instance.get_logview_address()
print(logview)
instance.wait_for_success()
start_num = 0
end_num = 39999 # 938868
size = 10000
for i in range(start_num, end_num, size):
print(i+1,i+size)
delete_sql = 'drop table if exists dwd_crawler_sites_host_crawled_detail_di21;'
_run_sql(delete_sql, {})
create_sql = '''
CREATE EXTERNAL TABLE dwd_crawler_sites_host_crawled_detail_di21
(
host_url string,
url string,
inlink_list array<string>
)
partitioned BY (batchid string,host string)
stored AS parquet
location 'oss://oss-cn-shanghai-internal.aliyuncs.com/xxx/data_delivery/data_type=host_crawled_detail/'
tblproperties( 'mcfed.parquet.block.row.count.limit' = '100', 'mcfed.parquet.page.size.row.check.max' = '100', 'mcfed.parquet.page.size.row.check.min' = '100')
'''
_run_sql(create_sql, {})
sql = f'''
INSERT OVERWRITE TABLE dwd_crawler_sites_host_crawled_detail_di21 PARTITION(batchid,host)
SELECT
a.host_url,
a.url,
a.inlink_list,
'xxx' AS batchid,
host
FROM
dwd_crawler_shennai_1210_data_tmp_15 a
WHERE
rk BETWEEN {i+1} AND {i+size}
'''
hints = {
'odps.instance.priority': 0,
'odps.isolation.session.enable': True,
'odps.stage.mapper.mem': 512,
'odps.stage.mapper.split.size': 20,
'odps.sql.unstructured.oss.commit.mode': True,
'odps.service.mode': 'off',
'odps.sql.reshuffle.dynamicpt': True,
'odps.adaptive.shuffle.desired.partition.size': 50,
}
res = run_sql(sql,hints = hints, title = f'sql运行-{i+1}', is_notice_success=False)
if not res:
sys.exit(1)
本文作者:ender
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!