编辑
2025-07-23
数仓
0
请注意,本文编写于 294 天前,最后修改于 149 天前,其中某些信息可能已经过时。

目录

阿里云个人使用的一些姿势
一、登录
二、临时 Sql 运行
三、周期任务
sql 任务
PySpark 简单模式
PySpark兼容模式
Python pyodps节点
Udf(python)
四、运维中心
五、数据地图
六、空间设置
七、脚本变动
FROM_UNIXTIME
GET_VERSION
DATEDIFF
INSERT OVERWRITE TABLE
LATERAL VIEW EXPLODE 语句,exploded_table 别名不能省略
APPROX_PERCENTILE
数据类型转换
八、AWS 历史数据同步到 MC
在 mc 上建外表
修复分区
九、阿里云导出到 clickhouse

阿里云个人使用的一些姿势

一、登录

登录 dataworks,选择对应工作空间进入,显示无空间找管理员开通

Image Image

二、临时 Sql 运行

临时查询下新建一个目录,在下面新建 odps sql 节点即可

三、周期任务

sql 任务

业务流程 MaxCompute 数据开发目录下新建 ODPS Sql 节点

PySpark 简单模式

官方文档

  1. 资源中新建 python 脚本
Image
python
from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName("spark sql").getOrCreate() spark.sql("SELECT 1").show()
  1. 数据开发文件夹新建odps spark节点
Image

发布上线皆可,sparkui会在日志中打印

PySpark兼容模式

阿里云的PySpark是基于Python 3.7 + Spark2的,而且看上去已经疏于维护了。新任务建议直接用PyODPS3,而历史包袱较重的旧任务,如需从AWS快速迁移,则可以参考如下方式:

  1. 资源中新建python脚本(和简单模式相同)

  2. 数据开发文件夹新建odps spark节点(和简单模式相同),语言选择Python,spark版本选择Spark 3.x,然后添加Python脚本作为主Python资源

  3. 点击【配置项】的【添加一条】按钮,依次添加如下配置(全都是必须添加的配置):

配置项
配置值
spark.hadoop.odps.cupid.resources
talkie_ug.python310-spark3.tar.gz
spark.pyspark.python
./talkie_ug.python310-spark3.tar.gz/python3/bin/python3.10
spark.executorEnv.LD_LIBRARY_PATH
$LD_LIBRARY_PATH:./talkie_ug.python310-spark3.tar.gz/python3/3rdlib
spark.yarn.appMasterEnv.LD_LIBRARY_PATH
$LD_LIBRARY_PATH:./talkie_ug.python310-spark3.tar.gz/python3/3rdlib

添加完成后,配置其他业务参数(如p_date),即可运行。

Image

Python pyodps节点

其他都差不多,主要是参数和第三方包

参数:args['p_date']

第三方包:可以在调度资源组上安装

写表:

python
write_records=[] #二维数组 if len(write_records)>0: _table = o.get_table('dw_risk_warehouse.ods_meta_cwf_node_info_df') _table.delete_partition('pt={0}'.format(pt), if_exists=True) with _table.open_writer(partition='pt={0}'.format(pt), create_partition=True) as writer: writer.write(write_records) print('插入数据成功')

读表:

python
pt='20221025' query_sql = ''' SELECT * FROM xxx WHERE pt = '{0}' '''.format(pt) instance = o.run_sql(query_sql, hints={'odps.instance.priority': 0}) logview = instance.get_logview_address() print(logview) instance.wait_for_success() with instance.open_reader(tunnel=True) as reader: for index,record in enumerate(reader): try: node_id = record[0] database_name = record[1] node_name = record[2] except: pass

Udf(python)

  1. 进入数据开发界面,在业务流程下的 maxcompute/资源 下新建资源,类型选择python
Image Image
  1. 编写代码,官方文档,简单示例如下
python
from odps.udf import annotate @annotate("string->bigint") class GetTextLen(object): def evaluate(self, input_str): try: if not input_str or input_str == '' or input_str is None: return 0 return len(input_str.split()) except: return 0

写好之后保存并提交

  1. 在函数下新建函数,内容如图
Image Image
  1. 使用 python2 版本
sql
SELECT talkie_dialogue.GET_TEXT_LEN('34d dfds'); -- 跨空间使用需要加库名,本空间不需要

python3 版本,这两句选中一起跑

sql
set odps.sql.python.version=cp37; -- cp311 SELECT talkie_dialogue.GET_TEXT_LEN('34d dfds')

四、运维中心

点击数据开发界面的运维中心,或者在已上线的任务右键运维中心

主要用下红框的几个模块

Image
  1. 运维中心可以看到任务概览
  2. 周期任务:即在数据开发界面提交的任务,可进行下线、修改责任人、冻结等操作,也可以补数据
  3. 周期实例:阿里云根据周期任务的调度配置生成的实例,可进行实例状态查看、实例冻结、重跑、置成功等操作
  4. 补数据:查看补数据任务情况和手动补数据
  5. 规则管理:可设置任务出错、超时、未开始等监控

五、数据地图

全部产品中选中数据地图进入

或者链接

Image

搜索界面可以查看库中有哪些表

Image

可以看到表相关信息

Image
  1. 产出任务:有对应空间的访客权限即可看到任务脚本,在需要依赖的时候,可以复制括号中的节点 id 进行依赖
  2. 字段信息中的生成 ddl,可以生成建表语句
  3. 分区信息:可以看到分区的相关信息
  4. 血缘关系:可供参考,可能不准
  5. 数据质量:配置了 DQC 之后可以看到每天检查的数据质量情况
  6. 数据预览
  7. 权限申请:申请表级或者字段级的数据权限,查看申请 select 和 describe 即可
Image Image

六、空间设置

每人独立的设置

主要可以调整下调度设置

Image Image

七、脚本变动

FROM_UNIXTIME

python
FROM_UNIXTIME(create_time / 1000, 'yyyy-MM-dd') 改为 TO_CHAR(FROM_UNIXTIME(BIGINT(create_time / 1000)),'yyyy-mm-dd') 并且这里FROM_UNIXTIME只接受BIGINT类型

GET_VERSION

已经在 talkie_da 库下建好,应该直接使用 talkie_da.GET_VERSION(app_version) 即可计算

DATEDIFF

python
DATEDIFF('${P_DATE}', a.min_connect_date) 改为 DATEDIFF(TO_DATE('${P_DATE}','yyyy-mm-dd'), TO_DATE(a.min_connect_date,'yyyy-mm-dd'), 'dd')

INSERT OVERWRITE TABLE

INSERT OVERWRITE 表名 改成 INSERT OVERWRITE TABLE 表名

LATERAL VIEW EXPLODE 语句,exploded_table 别名不能省略

Spark sql,exploded_table 可以不写,但是阿里云不行

sql
SELECT key_column, value_column FROM your_table LATERAL VIEW EXPLODE(map_column) exploded_table AS key_column, value_column;

APPROX_PERCENTILE

改为 PERCENTILE_APPROX

数据类型转换

阿里云表定义数据类型和插入数据类型不一致时候会报错,int 和 bigint 也算作不同的类型。bigint 到 double 也不会做自动转换

Image

八、AWS 历史数据同步到 MC

在 mc 上建外表

  1. 请表名前面加上 glue_前缀用来区分
  2. oss(阿里云的对象存储)路径: 整体结构和 s3 一致
  3. 数据目前应该是同步到了 05-26,大家先用,我看下能否对关键表搞成例行同步的,这样就能随时读到最新的分区了。
sql
create external table glue_<aws_table_name> ( -- 这里字段和aws建表语句保持一致即可 xxx string, yyy bigint, ... ) PARTITIONED BY ( ymd string COMMENT '' ) stored as parquet location 'oss://minimax-data-analysis/db/<aws_db_name>/<aws_table_name>';

修复分区

sql
msck repair table glue_<aws_table_name> add partitions

注意看执行日志,会有 add xxx partitions 的提示,出现这行日志的时候就代表修复成功了

九、阿里云导出到 clickhouse

示例任务:ads_talkie_launch_metric_di 同步

Image

十、python udf 使用第三方包

https://help.aliyun.com/zh/maxcompute/user-guide/reference-a-third-party-package-in-a-pyodps-node-1

使用 docker 打包工具

pyodps-pack -o 包名.tar.gz 包名

udf 内容:

python
import sys from odps.udf import annotate @annotate("double->double")class MyPsi(object): def __init__(self): _# 将路径增加到引用路径_ sys.path.insert(0, "work/包名.tar.gz/packages") def evaluate(self, arg0): _# 将 import 语句保持在 evaluate 函数内部_from scipy.special import psi return float(psi(arg0))

本文作者:ender

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!