登录 dataworks,选择对应工作空间进入,显示无空间找管理员开通
临时查询下新建一个目录,在下面新建 odps sql 节点即可
业务流程 MaxCompute 数据开发目录下新建 ODPS Sql 节点
pythonfrom pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName("spark sql").getOrCreate()
spark.sql("SELECT 1").show()
发布上线皆可,sparkui会在日志中打印
阿里云的PySpark是基于Python 3.7 + Spark2的,而且看上去已经疏于维护了。新任务建议直接用PyODPS3,而历史包袱较重的旧任务,如需从AWS快速迁移,则可以参考如下方式:
资源中新建python脚本(和简单模式相同)
数据开发文件夹新建odps spark节点(和简单模式相同),语言选择Python,spark版本选择Spark 3.x,然后添加Python脚本作为主Python资源
点击【配置项】的【添加一条】按钮,依次添加如下配置(全都是必须添加的配置):
| 配置项 | 配置值 |
| spark.hadoop.odps.cupid.resources | talkie_ug.python310-spark3.tar.gz |
| spark.pyspark.python | ./talkie_ug.python310-spark3.tar.gz/python3/bin/python3.10 |
| spark.executorEnv.LD_LIBRARY_PATH | $LD_LIBRARY_PATH:./talkie_ug.python310-spark3.tar.gz/python3/3rdlib |
| spark.yarn.appMasterEnv.LD_LIBRARY_PATH | $LD_LIBRARY_PATH:./talkie_ug.python310-spark3.tar.gz/python3/3rdlib |
添加完成后,配置其他业务参数(如p_date),即可运行。
其他都差不多,主要是参数和第三方包
参数:args['p_date']
第三方包:可以在调度资源组上安装
写表:
pythonwrite_records=[] #二维数组
if len(write_records)>0:
_table = o.get_table('dw_risk_warehouse.ods_meta_cwf_node_info_df')
_table.delete_partition('pt={0}'.format(pt), if_exists=True)
with _table.open_writer(partition='pt={0}'.format(pt), create_partition=True) as writer:
writer.write(write_records)
print('插入数据成功')
读表:
pythonpt='20221025'
query_sql = '''
SELECT *
FROM xxx
WHERE pt = '{0}'
'''.format(pt)
instance = o.run_sql(query_sql, hints={'odps.instance.priority': 0})
logview = instance.get_logview_address()
print(logview)
instance.wait_for_success()
with instance.open_reader(tunnel=True) as reader:
for index,record in enumerate(reader):
try:
node_id = record[0]
database_name = record[1]
node_name = record[2]
except:
pass
pythonfrom odps.udf import annotate
@annotate("string->bigint")
class GetTextLen(object):
def evaluate(self, input_str):
try:
if not input_str or input_str == '' or input_str is None:
return 0
return len(input_str.split())
except:
return 0
写好之后保存并提交
sql SELECT talkie_dialogue.GET_TEXT_LEN('34d dfds'); -- 跨空间使用需要加库名,本空间不需要
python3 版本,这两句选中一起跑
sqlset odps.sql.python.version=cp37; -- cp311
SELECT talkie_dialogue.GET_TEXT_LEN('34d dfds')
点击数据开发界面的运维中心,或者在已上线的任务右键运维中心
主要用下红框的几个模块
全部产品中选中数据地图进入
或者链接
搜索界面可以查看库中有哪些表
可以看到表相关信息
每人独立的设置
主要可以调整下调度设置
pythonFROM_UNIXTIME(create_time / 1000, 'yyyy-MM-dd')
改为
TO_CHAR(FROM_UNIXTIME(BIGINT(create_time / 1000)),'yyyy-mm-dd')
并且这里FROM_UNIXTIME只接受BIGINT类型
已经在 talkie_da 库下建好,应该直接使用 talkie_da.GET_VERSION(app_version) 即可计算
pythonDATEDIFF('${P_DATE}', a.min_connect_date)
改为
DATEDIFF(TO_DATE('${P_DATE}','yyyy-mm-dd'), TO_DATE(a.min_connect_date,'yyyy-mm-dd'), 'dd')
INSERT OVERWRITE 表名 改成 INSERT OVERWRITE TABLE 表名
Spark sql,exploded_table 可以不写,但是阿里云不行
sqlSELECT key_column, value_column
FROM your_table
LATERAL VIEW EXPLODE(map_column) exploded_table AS key_column, value_column;
阿里云表定义数据类型和插入数据类型不一致时候会报错,int 和 bigint 也算作不同的类型。bigint 到 double 也不会做自动转换
sqlcreate external table glue_<aws_table_name>
(
-- 这里字段和aws建表语句保持一致即可
xxx string,
yyy bigint,
...
)
PARTITIONED BY (
ymd string COMMENT ''
)
stored as parquet
location 'oss://minimax-data-analysis/db/<aws_db_name>/<aws_table_name>';
sqlmsck repair table glue_<aws_table_name> add partitions
注意看执行日志,会有 add xxx partitions 的提示,出现这行日志的时候就代表修复成功了
示例任务:ads_talkie_launch_metric_di 同步
十、python udf 使用第三方包
https://help.aliyun.com/zh/maxcompute/user-guide/reference-a-third-party-package-in-a-pyodps-node-1
使用 docker 打包工具
pyodps-pack -o 包名.tar.gz 包名
udf 内容:
pythonimport sys
from odps.udf import annotate
@annotate("double->double")class MyPsi(object):
def __init__(self):
_# 将路径增加到引用路径_
sys.path.insert(0, "work/包名.tar.gz/packages")
def evaluate(self, arg0):
_# 将 import 语句保持在 evaluate 函数内部_from scipy.special import psi
return float(psi(arg0))
本文作者:ender
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!