##@resource_reference{"config.py"} import sys, os sys.path.append(os.path.dirname(os.path.abspath('config.py'))) from config import OSS_CONFIG import oss2 import json from oss2 import defaults from concurrent.futures import ThreadPoolExecutor, as_completed import threading import time import asyncio import aiohttp from typing import List, Generator, Optional import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) OSS_ACCESS_ID = OSS_CONFIG['sh']['id'] OSS_SECRET_ACCESS_KEY = OSS_CONFIG['sh']['key'] OSS_ENDPOINT = OSS_CONFIG['sh']['endpoint'] # 大幅增加连接池 defaults.connection_pool_size = 500 def get_oss_files_fast(bucket, prefix: str, batch_size: int = 10000) -> Generator[List[str], None, None]: """ 方案1: 直接全量扫描(不先获取host目录) 适用于:不需要按host分组的场景 优化点: 1. 不使用delimiter,直接列出所有文件,大幅减少API调用次数 2. 使用max_keys=1000最大化单次请求返回的对象数 """ print(f"开始全量扫描 {prefix}...", flush=True) batch_buffer = [] total_files = 0 start_time = time.time() try: # 不使用delimiter,直接列出所有文件 for obj in oss2.ObjectIteratorV2(bucket=bucket, prefix=prefix, max_keys=1000): if not obj.is_prefix(): batch_buffer.append(obj.key) total_files += 1 if len(batch_buffer) >= batch_size: elapsed = time.time() - start_time rate = total_files / elapsed if elapsed > 0 else 0 print(f"已扫描 {total_files} 个文件,速率: {rate:.0f} 文件/秒", flush=True) yield batch_buffer batch_buffer = [] except Exception as e: print(f"扫描失败: {e}", flush=True) raise if batch_buffer: yield batch_buffer elapsed = time.time() - start_time print(f"扫描完成!共 {total_files} 个文件,耗时 {elapsed:.2f} 秒", flush=True) if __name__ == "__main__": p_date = args['p_date'] P_DATE = args['P_DATE'] biz_code = args['biz_code'] is_need_scan_file = args['is_need_scan_file'] oss_bucket_name = "bucket_name" oss_bucket_endpoint = f"{oss_bucket_name}.{OSS_ENDPOINT}:80" auth = oss2.Auth(OSS_ACCESS_ID, OSS_SECRET_ACCESS_KEY) bucket = oss2.Bucket(auth, OSS_ENDPOINT, oss_bucket_name) if is_need_scan_file == '1': path = f"{biz_code}/pr_html/ymd={p_date}/" print(f'开始扫描路径下的文件:{oss_bucket_name}/{path}', flush=True) _table = o.get_table('table_name') _table.delete_partition(f'ymd={P_DATE},biz_code={biz_code}', if_exists=True) batch_count = 0 write_datas = [] file_generator = get_oss_files_fast(bucket, prefix=path, batch_size=10000) for file_batch in file_generator: batch_count += 1 print(f'处理第{batch_count}批,文件数量:{len(file_batch)}', flush=True) write_datas.extend([[i] for i in file_batch]) if len(write_datas) > 1000000: with _table.open_writer(f'ymd={P_DATE},biz_code={biz_code}', create_partition=True) as writer: writer.write(write_datas) write_datas = [] print('插入数据成功', flush=True) if len(write_datas) > 0: with _table.open_writer(partition=f'ymd={P_DATE},biz_code={biz_code}', create_partition=True) as writer: writer.write(write_datas) write_datas = [] print('插入数据成功', flush=True) else: print('无需扫描路径,开始获取详细信息', flush=True)
本文作者:ender
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!