sqlCREATE TEMPORARY TABLE odps_source (
url VARCHAR,
dt VARCHAR,
page_type VARCHAR,
biz_code VARCHAR,
host VARCHAR,
id VARCHAR,
insert_hour VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = 'https://service.cn-shanghai-vpc.maxcompute.aliyun-inc.com/api',
'tunnelEndpoint' = 'https://dt.cn-shanghai-vpc.maxcompute.aliyun-inc.com',
'project' = 'xxx',
'tableName' = 'xxx',
'accessId' = '${secret_values.ak}',
'accessKey' = '${secret_values.sk}',
'startPartition' = 'ymd=2025-12-04,hour=15' -- 从20180905对应分区开始读取
);
CREATE TEMPORARY TABLE kafka_dynamic_sink(
id string,
url string,
dt string,
page_type string,
biz_code string,
dynamic_topic STRING METADATA FROM 'topic',
PRIMARY KEY(id) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic-pattern' = 'xxx.*',
'properties.bootstrap.servers' = 'xxx',
'key.format'='raw',
'value.format'='json',
'value.fields-include' = 'EXCEPT_KEY'
);
INSERT INTO kafka_dynamic_sink
SELECT
id,
url,
dt,
page_type,
biz_code,
CONCAT('xxx_', insert_hour) AS dynamic_topic -- 根据字段动态决定生产到哪个topic
FROM
odps_source;
sql
CREATE TEMPORARY TABLE `log_from_kafka` (
url string,
batch_id string,
task_id string,
status string,
`error` string,
status_code bigint,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
-- 定义 Kafka 参数
'connector' = 'kafka',
'topic' = 'xxx',
-- 替换为您要消费的 Topic
'scan.startup.mode' = 'latest-offset',
-- 'scan.startup.timestamp-millis' = '1764000000000',
-- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
'properties.bootstrap.servers' = 'xxxx',
-- 替换为您的 Kafka 连接地址
'properties.group.id' = 'GID_xxx',
-- 必选参数, 一定要指定 Group ID
-- 定义数据格式 (JSON 格式)
'format' = 'json',
'json.fail-on-missing-field' = 'false',
-- 如果设置为 false, 则遇到缺失字段不会报错。
'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
);
CREATE TEMPORARY TABLE `sink_table` (
--步骤 2 :创建数据结果表(Sink) Data_Output
url string,
batch_id string,
task_id string,
status string,
`error` string,
status_code bigint,
ts TIMESTAMP(3),
`ymd` string COMMENT '分区,yyyymmdd',
`hour` string
) WITH (
'connector' = 'odps'
,'endpoint' = 'https://service.cn-shanghai-vpc.maxcompute.aliyun-inc.com/api'
,'tunnelEndpoint' = 'https://dt.cn-shanghai-vpc.maxcompute.aliyun-inc.com'
,'project' = 'xxx'
,'tableName' = 'xxx'
,'accessId' = '${secret_values.ak}'
,'accessKey' = '${secret_values.sk}'
,'dynamicPartitionLimit' = '10000'
,'partition' = 'ymd,hour'
);
INSERT INTO
`sink_table`
SELECT
url,
batch_id,
task_id,
status,
`error`,
status_code,
ts,
DATE_FORMAT(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'yyyyMMdd') as ymd,
DATE_FORMAT(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'HH') as `hour`
FROM
`log_from_kafka`;
本文作者:ender
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!