编辑
2026-01-23
数仓
0
请注意,本文编写于 110 天前,最后修改于 110 天前,其中某些信息可能已经过时。

目录

生产
消费

生产

sql
CREATE TEMPORARY TABLE odps_source ( url VARCHAR, dt VARCHAR, page_type VARCHAR, biz_code VARCHAR, host VARCHAR, id VARCHAR, insert_hour VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = 'https://service.cn-shanghai-vpc.maxcompute.aliyun-inc.com/api', 'tunnelEndpoint' = 'https://dt.cn-shanghai-vpc.maxcompute.aliyun-inc.com', 'project' = 'xxx', 'tableName' = 'xxx', 'accessId' = '${secret_values.ak}', 'accessKey' = '${secret_values.sk}', 'startPartition' = 'ymd=2025-12-04,hour=15' -- 从20180905对应分区开始读取 ); CREATE TEMPORARY TABLE kafka_dynamic_sink( id string, url string, dt string, page_type string, biz_code string, dynamic_topic STRING METADATA FROM 'topic', PRIMARY KEY(id) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic-pattern' = 'xxx.*', 'properties.bootstrap.servers' = 'xxx', 'key.format'='raw', 'value.format'='json', 'value.fields-include' = 'EXCEPT_KEY' ); INSERT INTO kafka_dynamic_sink SELECT id, url, dt, page_type, biz_code, CONCAT('xxx_', insert_hour) AS dynamic_topic -- 根据字段动态决定生产到哪个topic FROM odps_source;

消费

sql
CREATE TEMPORARY TABLE `log_from_kafka` ( url string, batch_id string, task_id string, status string, `error` string, status_code bigint, `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL ) WITH ( -- 定义 Kafka 参数 'connector' = 'kafka', 'topic' = 'xxx', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'latest-offset', -- 'scan.startup.timestamp-millis' = '1764000000000', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = 'xxxx', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'GID_xxx', -- 必选参数, 一定要指定 Group ID -- 定义数据格式 (JSON 格式) 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。 ); CREATE TEMPORARY TABLE `sink_table` ( --步骤 2 :创建数据结果表(Sink) Data_Output url string, batch_id string, task_id string, status string, `error` string, status_code bigint, ts TIMESTAMP(3), `ymd` string COMMENT '分区,yyyymmdd', `hour` string ) WITH ( 'connector' = 'odps' ,'endpoint' = 'https://service.cn-shanghai-vpc.maxcompute.aliyun-inc.com/api' ,'tunnelEndpoint' = 'https://dt.cn-shanghai-vpc.maxcompute.aliyun-inc.com' ,'project' = 'xxx' ,'tableName' = 'xxx' ,'accessId' = '${secret_values.ak}' ,'accessKey' = '${secret_values.sk}' ,'dynamicPartitionLimit' = '10000' ,'partition' = 'ymd,hour' ); INSERT INTO `sink_table` SELECT url, batch_id, task_id, status, `error`, status_code, ts, DATE_FORMAT(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'yyyyMMdd') as ymd, DATE_FORMAT(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'HH') as `hour` FROM `log_from_kafka`;

本文作者:ender

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!