编辑
2025-09-04
ai
0
请注意,本文编写于 251 天前,最后修改于 251 天前,其中某些信息可能已经过时。

目录

1. app.py
2. requirements.txt
3. readme
SQL下载器
功能特性
安装和使用
1. 安装依赖
2. 运行应用
3. 配置连接
4. 执行查询
项目结构
技术栈
安全说明
开发计划
注意事项

1. app.py

python
import streamlit as st import pandas as pd from odps import ODPS import io import json import os import re from datetime import datetime from streamlit_ace import st_ace import base64 from cryptography.fernet import Fernet from typing import Dict, Optional # 设置页面配置 st.set_page_config( page_title="SQL下载器", page_icon="📊", layout="wide" ) # 自定义CSS样式 st.markdown(""" <style> .main-header { font-size: 2.5rem; font-weight: bold; color: #1f77b4; text-align: center; margin-bottom: 2rem; } .connection-box { background-color: #f0f2f6; padding: 1rem; border-radius: 0.5rem; margin-bottom: 1rem; } .sql-editor-container { margin-top: 1rem; margin-bottom: 2rem; } .status-success { color: #28a745; font-weight: bold; } .status-error { color: #dc3545; font-weight: bold; } .download-section { background-color: #e8f4fd; padding: 1rem; border-radius: 0.5rem; margin-top: 1rem; } </style> """, unsafe_allow_html=True) # ==================== 配置管理类 ==================== class ConfigManager: """配置管理器 - 处理ODPS连接配置的保存、加载和加密""" def __init__(self, config_file: str = '.connection_config.json', key_file: str = '.config_key'): self.config_file = config_file self.key_file = key_file self._key = self._load_or_create_key() self._cipher = Fernet(self._key) def _load_or_create_key(self) -> bytes: """加载或创建加密密钥""" if os.path.exists(self.key_file): with open(self.key_file, 'rb') as f: return f.read() else: key = Fernet.generate_key() with open(self.key_file, 'wb') as f: f.write(key) return key def save_config(self, config: Dict[str, str]) -> bool: """保存配置(加密)""" try: # 加密敏感信息 encrypted_config = {} for key, value in config.items(): if key in ['access_id', 'access_key']: encrypted_config[key] = self._cipher.encrypt(value.encode()).decode() else: encrypted_config[key] = value with open(self.config_file, 'w') as f: json.dump(encrypted_config, f, indent=2) return True except Exception as e: st.error(f"保存配置失败: {e}") return False def load_config(self) -> Dict[str, str]: """加载配置(解密)""" try: if not os.path.exists(self.config_file): return {} with open(self.config_file, 'r') as f: encrypted_config = json.load(f) # 解密敏感信息 config = {} for key, value in encrypted_config.items(): if key in ['access_id', 'access_key']: try: config[key] = self._cipher.decrypt(value.encode()).decode() except: # 如果解密失败,可能是旧格式的配置,直接使用 config[key] = value else: config[key] = value return config except Exception as e: st.error(f"加载配置失败: {e}") return {} def clear_config(self) -> bool: """清除配置文件""" try: if os.path.exists(self.config_file): os.remove(self.config_file) if os.path.exists(self.key_file): os.remove(self.key_file) return True except Exception as e: st.error(f"清除配置失败: {e}") return False # 默认配置 DEFAULT_CONFIG = { 'endpoint': 'http://service.cn.maxcompute.aliyun.com/api', 'project': '', 'access_id': '', 'access_key': '' } # 常用端点 ENDPOINTS = { '公网(经典网络)': 'http://service.cn.maxcompute.aliyun.com/api', '公网(VPC网络)': 'http://service.cn.maxcompute.aliyun-inc.com/api', '华东1(杭州)': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api', '华东2(上海)': 'http://service.cn-shanghai.maxcompute.aliyun.com/api', '华北2(北京)': 'http://service.cn-beijing.maxcompute.aliyun.com/api', '华南1(深圳)': 'http://service.cn-shenzhen.maxcompute.aliyun.com/api' } # ==================== Session State 初始化 ==================== # 初始化session state if 'odps_client' not in st.session_state: st.session_state.odps_client = None if 'connection_status' not in st.session_state: st.session_state.connection_status = False if 'query_result' not in st.session_state: st.session_state.query_result = None if 'sql_history' not in st.session_state: st.session_state.sql_history = [] if 'config_manager' not in st.session_state: st.session_state.config_manager = ConfigManager() # ==================== 配置管理辅助函数 ==================== def save_connection_config(config): """保存连接配置到本地文件(使用加密)""" return st.session_state.config_manager.save_config(config) def load_connection_config(): """从本地文件加载连接配置(自动解密)""" return st.session_state.config_manager.load_config() # ==================== ODPS连接和查询函数 ==================== def connect_to_odps(access_id, access_key, project, endpoint): """连接到ODPS""" try: with st.spinner('正在连接ODPS...'): odps = ODPS(access_id, access_key, project, endpoint=endpoint) # 简单的连接测试:只创建客户端,不执行额外操作 # 如果凭据错误,ODPS构造函数通常会成功,但第一次操作会失败 # 我们将在实际使用时再验证连接 st.session_state.odps_client = odps st.session_state.connection_status = True return True, "连接配置已保存!" except Exception as e: st.session_state.connection_status = False return False, f"连接失败: {str(e)}" def test_odps_connection(): """测试ODPS连接是否有效""" try: if not st.session_state.odps_client: return False, "请先配置连接参数" with st.spinner('正在测试连接...'): # 执行一个简单的查询来测试连接 test_sql = "SELECT 1 as test_col" instance = st.session_state.odps_client.execute_sql(test_sql) instance.wait_for_success() return True, "连接测试成功!" except Exception as e: error_msg = str(e) if "ODPS-0420111" in error_msg or "Invalid credentials" in error_msg: return False, "认证失败,请检查Access ID和Access Key是否正确" elif "ODPS-0130071" in error_msg or "Project not found" in error_msg: return False, "项目不存在,请检查Project名称是否正确" else: return False, f"连接测试失败: {error_msg}" def execute_sql(sql_query): """执行SQL查询""" try: if not st.session_state.odps_client: return False, "请先连接到ODPS", None # 执行查询 with st.spinner('正在执行查询...'): instance = st.session_state.odps_client.execute_sql(sql_query) instance.wait_for_success() # 获取结果 result = instance.open_reader() data = [] for record in result: data.append(record.values) # 获取列名 columns = [field.name for field in result.schema] # 创建DataFrame df = pd.DataFrame(data, columns=columns) # 保存到session state st.session_state.query_result = df # 添加到历史记录 if sql_query not in st.session_state.sql_history: st.session_state.sql_history.append(sql_query) if len(st.session_state.sql_history) > 10: # 保留最近10条 st.session_state.sql_history.pop(0) return True, f"查询成功!返回 {len(df)} 行数据", df except Exception as e: error_msg = str(e) # 提供更友好的错误信息 if "ODPS-0420111" in error_msg or "Invalid credentials" in error_msg: st.session_state.connection_status = False return False, "认证失败,请检查Access ID和Access Key是否正确", None elif "ODPS-0130131" in error_msg or "Table not found" in error_msg: return False, "表不存在,请检查表名是否正确", None elif "ODPS-0130071" in error_msg or "Project not found" in error_msg: st.session_state.connection_status = False return False, "项目不存在,请检查Project名称是否正确", None else: return False, f"查询失败: {error_msg}", None # ==================== 文件下载函数 ==================== def download_csv(df, filename): """生成CSV下载链接""" try: # CSV通常对特殊字符更宽容,但我们仍然可以进行一些清理 csv = df.to_csv(index=False, encoding='utf-8-sig', errors='replace') b64 = base64.b64encode(csv.encode('utf-8-sig')).decode() href = f'<a href="data:text/csv;base64,{b64}" download="{filename}">📥 下载CSV文件</a>' return href except Exception as e: st.error(f"生成CSV文件时出错: {str(e)}") return f'<span style="color: red;">CSV下载失败: {str(e)}</span>' def clean_data_for_excel(df): """清理DataFrame中的非法字符,使其兼容Excel""" def clean_cell_value(value): if pd.isna(value) or value is None: return value if isinstance(value, str): # 移除或替换Excel不支持的控制字符 # Excel不支持ASCII控制字符 (0-31,除了制表符、换行符、回车符) # 也不支持某些Unicode控制字符 # 保留制表符(9)、换行符(10)、回车符(13),移除其他控制字符 clean_value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]', '', value) # 移除某些特殊的Unicode字符 clean_value = re.sub(r'[\uFFFE\uFFFF]', '', clean_value) # 限制字符串长度(Excel单元格最大32767字符) if len(clean_value) > 32767: clean_value = clean_value[:32764] + "..." return clean_value return value # 创建数据副本 cleaned_df = df.copy() # 清理所有列的数据 for column in cleaned_df.columns: cleaned_df[column] = cleaned_df[column].apply(clean_cell_value) return cleaned_df def download_excel(df, filename): """生成Excel下载链接""" try: # 清理数据中的非法字符 cleaned_df = clean_data_for_excel(df) output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: cleaned_df.to_excel(writer, index=False, sheet_name='查询结果') output.seek(0) b64 = base64.b64encode(output.read()).decode() href = f'<a href="data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{b64}" download="{filename}">📥 下载Excel文件</a>' return href except Exception as e: st.error(f"生成Excel文件时出错: {str(e)}") return f'<span style="color: red;">Excel下载失败: {str(e)}</span>' # ==================== 主应用函数 ==================== def main(): # 主标题 st.markdown('<h1 class="main-header">📊 SQL下载器</h1>', unsafe_allow_html=True) # 连接配置区域 - 使用可折叠的expander with st.expander("🔗 ODPS连接配置", expanded=not st.session_state.connection_status): # 加载保存的配置 saved_config = load_connection_config() st.markdown('<div class="connection-box">', unsafe_allow_html=True) access_id = st.text_input( "Access ID", value=saved_config.get('access_id', ''), type="password" ) access_key = st.text_input( "Access Key", value=saved_config.get('access_key', ''), type="password" ) project = st.text_input( "Project", value=saved_config.get('project', '') ) # 端点选择 saved_endpoint = saved_config.get('endpoint', ENDPOINTS['公网(经典网络)']) # 检查保存的端点是否在预定义列表中 if saved_endpoint in ENDPOINTS.values(): # 找到对应的键 default_endpoint_key = [k for k, v in ENDPOINTS.items() if v == saved_endpoint][0] default_index = list(ENDPOINTS.keys()).index(default_endpoint_key) else: default_index = 0 endpoint_option = st.selectbox( "选择端点", options=list(ENDPOINTS.keys()), index=default_index ) endpoint = ENDPOINTS[endpoint_option] # 自定义端点选项 use_custom_endpoint = st.checkbox("使用自定义端点") if use_custom_endpoint: endpoint = st.text_input( "自定义端点", value=saved_endpoint if saved_endpoint not in ENDPOINTS.values() else ENDPOINTS['公网(经典网络)'] ) col1, col2, col3 = st.columns(3) with col1: if st.button("🔗 连接", use_container_width=True): if all([access_id, access_key, project, endpoint]): success, message = connect_to_odps(access_id, access_key, project, endpoint) if success: st.success(message) # 保存配置 config = { 'access_id': access_id, 'access_key': access_key, 'project': project, 'endpoint': endpoint } save_connection_config(config) st.rerun() # 重新加载以收起配置区域 else: st.error(message) else: st.error("请填写所有连接参数") with col2: if st.button("🧪 测试", use_container_width=True): if st.session_state.connection_status: success, message = test_odps_connection() if success: st.success(message) else: st.error(message) st.session_state.connection_status = False else: st.error("请先点击连接按钮") with col3: if st.button("🧹 清除", use_container_width=True): st.session_state.config_manager.clear_config() st.session_state.connection_status = False st.rerun() st.markdown('</div>', unsafe_allow_html=True) # 连接状态显示 if st.session_state.connection_status: st.markdown('<p class="status-success">✅ 连接配置已保存</p>', unsafe_allow_html=True) st.info("💡 建议点击'测试'按钮验证连接有效性") else: st.markdown('<p class="status-error">❌ 未连接</p>', unsafe_allow_html=True) # SQL编辑器区域(全宽) st.markdown('<div class="sql-editor-container">', unsafe_allow_html=True) st.header("📝 SQL编辑器") # SQL历史记录选择 if st.session_state.sql_history: selected_history = st.selectbox( "选择历史查询(可选)", [""] + st.session_state.sql_history, key="history_selector" ) if selected_history: st.session_state.current_sql = selected_history # SQL编辑器(增加高度) sql_query = st_ace( value=getattr(st.session_state, 'current_sql', ''), language='sql', theme='monokai', key="sql_editor", height=450, # 进一步增加高度到450 auto_update=True, font_size=14, tab_size=2, annotations=None, markers=None, wrap=True ) # 执行按钮 if st.button("🚀 执行查询", type="primary", use_container_width=True): if sql_query.strip(): if st.session_state.connection_status: success, message, result = execute_sql(sql_query) if success: st.success(message) else: st.error(message) else: st.error("请先连接到ODPS") else: st.error("请输入SQL查询语句") st.markdown('</div>', unsafe_allow_html=True) # 下载区域 if st.session_state.query_result is not None: st.markdown('<div class="download-section">', unsafe_allow_html=True) st.header("📥 下载查询结果") df = st.session_state.query_result # 显示基本信息 col1, col2, col3 = st.columns(3) with col1: st.metric("总行数", len(df)) with col2: st.metric("列数", len(df.columns)) with col3: # 计算大概的文件大小 size_mb = df.memory_usage(deep=True).sum() / 1024 / 1024 st.metric("预估大小", f"{size_mb:.2f} MB") # 检查是否有特殊字符需要清理(针对Excel) try: has_special_chars = False # 只检查字符串类型的列,并限制检查的数据量 for column in df.select_dtypes(include=['object']).columns[:5]: # 最多检查5列 sample_data = df[column].dropna().astype(str).head(50) # 检查前50行 if any(re.search(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F\uFFFE\uFFFF]', str(val)) for val in sample_data): has_special_chars = True break if has_special_chars: st.info("⚠️ 检测到数据中包含特殊字符,Excel下载时将自动清理这些字符以确保兼容性") except Exception: # 如果检测失败,静默忽略,不影响主要功能 pass # 文件名配置 col1, col2 = st.columns(2) with col1: filename_base = st.text_input( "文件名前缀", value=f"query_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}" ) with col2: st.write("") # 占位符,对齐高度 st.write("") # 下载按钮 col1, col2 = st.columns(2) with col1: csv_filename = f"{filename_base}.csv" csv_link = download_csv(df, csv_filename) st.markdown(csv_link, unsafe_allow_html=True) with col2: excel_filename = f"{filename_base}.xlsx" excel_link = download_excel(df, excel_filename) st.markdown(excel_link, unsafe_allow_html=True) st.markdown('</div>', unsafe_allow_html=True) if __name__ == "__main__": main()

2. requirements.txt

streamlit>=1.28.0 pyodps>=0.11.0 pandas>=1.5.0 openpyxl>=3.1.0 streamlit-ace>=0.1.1 numpy>=1.24.0 cryptography>=3.4.8 python-dotenv>=1.0.0

3. readme

SQL下载器

基于pyodps和streamlit构建的MaxCompute SQL查询结果下载器,提供简洁易用的Web界面。

功能特性

  • 🔗 ODPS连接管理: 安全的连接配置保存和加载
  • 📝 SQL编辑器: 支持语法高亮的代码编辑器
  • 🚀 查询执行: 实时执行SQL查询并显示进度
  • 📥 数据下载: 支持CSV和Excel格式导出
  • 📋 查询模板: 内置常用SQL模板
  • 📚 历史记录: 自动保存查询历史

安装和使用

1. 安装依赖

python3.8以上

bash
pip install -r requirements.txt
2. 运行应用
bash
streamlit run app.py
3. 配置连接

在侧边栏填入ODPS连接信息:

  • Access ID: 阿里云Access Key ID
  • Access Key: 阿里云Access Key Secret
  • Project: MaxCompute项目名称
  • Endpoint: MaxCompute服务端点(默认为公网端点)
4. 执行查询
  1. 在SQL编辑器中输入查询语句
  2. 点击"执行查询"按钮
  3. 等待查询完成
  4. 在下载区域选择下载格式

项目结构

sql_download/ ├── app.py # 主应用文件(包含所有功能) ├── requirements.txt # 项目依赖 ├── environment.yml # Conda环境配置 ├── start.sh # 快速启动脚本 ├── .gitignore # Git忽略文件 ├── README.md # 项目说明 ├── INSTALL.md # 安装指南 └── TODO.md # 开发计划

技术栈

  • streamlit: Web应用框架
  • pyodps: MaxCompute Python SDK
  • pandas: 数据处理
  • streamlit-ace: 代码编辑器组件
  • openpyxl: Excel文件处理

安全说明

  • 连接配置保存在本地文件中,请注意保护
  • Access Key等敏感信息在界面中以密码形式显示
  • 建议在生产环境中使用更安全的认证方式

开发计划

详细的开发计划请查看 TODO.md

注意事项

  1. 确保网络可以访问MaxCompute服务
  2. 大数据集下载可能需要较长时间
  3. 注意MaxCompute的查询配额限制

本文作者:ender

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!