pythonimport streamlit as st
import pandas as pd
from odps import ODPS
import io
import json
import os
import re
from datetime import datetime
from streamlit_ace import st_ace
import base64
from cryptography.fernet import Fernet
from typing import Dict, Optional
# 设置页面配置
st.set_page_config(
page_title="SQL下载器",
page_icon="📊",
layout="wide"
)
# 自定义CSS样式
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
font-weight: bold;
color: #1f77b4;
text-align: center;
margin-bottom: 2rem;
}
.connection-box {
background-color: #f0f2f6;
padding: 1rem;
border-radius: 0.5rem;
margin-bottom: 1rem;
}
.sql-editor-container {
margin-top: 1rem;
margin-bottom: 2rem;
}
.status-success {
color: #28a745;
font-weight: bold;
}
.status-error {
color: #dc3545;
font-weight: bold;
}
.download-section {
background-color: #e8f4fd;
padding: 1rem;
border-radius: 0.5rem;
margin-top: 1rem;
}
</style>
""", unsafe_allow_html=True)
# ==================== 配置管理类 ====================
class ConfigManager:
"""配置管理器 - 处理ODPS连接配置的保存、加载和加密"""
def __init__(self, config_file: str = '.connection_config.json',
key_file: str = '.config_key'):
self.config_file = config_file
self.key_file = key_file
self._key = self._load_or_create_key()
self._cipher = Fernet(self._key)
def _load_or_create_key(self) -> bytes:
"""加载或创建加密密钥"""
if os.path.exists(self.key_file):
with open(self.key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(self.key_file, 'wb') as f:
f.write(key)
return key
def save_config(self, config: Dict[str, str]) -> bool:
"""保存配置(加密)"""
try:
# 加密敏感信息
encrypted_config = {}
for key, value in config.items():
if key in ['access_id', 'access_key']:
encrypted_config[key] = self._cipher.encrypt(value.encode()).decode()
else:
encrypted_config[key] = value
with open(self.config_file, 'w') as f:
json.dump(encrypted_config, f, indent=2)
return True
except Exception as e:
st.error(f"保存配置失败: {e}")
return False
def load_config(self) -> Dict[str, str]:
"""加载配置(解密)"""
try:
if not os.path.exists(self.config_file):
return {}
with open(self.config_file, 'r') as f:
encrypted_config = json.load(f)
# 解密敏感信息
config = {}
for key, value in encrypted_config.items():
if key in ['access_id', 'access_key']:
try:
config[key] = self._cipher.decrypt(value.encode()).decode()
except:
# 如果解密失败,可能是旧格式的配置,直接使用
config[key] = value
else:
config[key] = value
return config
except Exception as e:
st.error(f"加载配置失败: {e}")
return {}
def clear_config(self) -> bool:
"""清除配置文件"""
try:
if os.path.exists(self.config_file):
os.remove(self.config_file)
if os.path.exists(self.key_file):
os.remove(self.key_file)
return True
except Exception as e:
st.error(f"清除配置失败: {e}")
return False
# 默认配置
DEFAULT_CONFIG = {
'endpoint': 'http://service.cn.maxcompute.aliyun.com/api',
'project': '',
'access_id': '',
'access_key': ''
}
# 常用端点
ENDPOINTS = {
'公网(经典网络)': 'http://service.cn.maxcompute.aliyun.com/api',
'公网(VPC网络)': 'http://service.cn.maxcompute.aliyun-inc.com/api',
'华东1(杭州)': 'http://service.cn-hangzhou.maxcompute.aliyun.com/api',
'华东2(上海)': 'http://service.cn-shanghai.maxcompute.aliyun.com/api',
'华北2(北京)': 'http://service.cn-beijing.maxcompute.aliyun.com/api',
'华南1(深圳)': 'http://service.cn-shenzhen.maxcompute.aliyun.com/api'
}
# ==================== Session State 初始化 ====================
# 初始化session state
if 'odps_client' not in st.session_state:
st.session_state.odps_client = None
if 'connection_status' not in st.session_state:
st.session_state.connection_status = False
if 'query_result' not in st.session_state:
st.session_state.query_result = None
if 'sql_history' not in st.session_state:
st.session_state.sql_history = []
if 'config_manager' not in st.session_state:
st.session_state.config_manager = ConfigManager()
# ==================== 配置管理辅助函数 ====================
def save_connection_config(config):
"""保存连接配置到本地文件(使用加密)"""
return st.session_state.config_manager.save_config(config)
def load_connection_config():
"""从本地文件加载连接配置(自动解密)"""
return st.session_state.config_manager.load_config()
# ==================== ODPS连接和查询函数 ====================
def connect_to_odps(access_id, access_key, project, endpoint):
"""连接到ODPS"""
try:
with st.spinner('正在连接ODPS...'):
odps = ODPS(access_id, access_key, project, endpoint=endpoint)
# 简单的连接测试:只创建客户端,不执行额外操作
# 如果凭据错误,ODPS构造函数通常会成功,但第一次操作会失败
# 我们将在实际使用时再验证连接
st.session_state.odps_client = odps
st.session_state.connection_status = True
return True, "连接配置已保存!"
except Exception as e:
st.session_state.connection_status = False
return False, f"连接失败: {str(e)}"
def test_odps_connection():
"""测试ODPS连接是否有效"""
try:
if not st.session_state.odps_client:
return False, "请先配置连接参数"
with st.spinner('正在测试连接...'):
# 执行一个简单的查询来测试连接
test_sql = "SELECT 1 as test_col"
instance = st.session_state.odps_client.execute_sql(test_sql)
instance.wait_for_success()
return True, "连接测试成功!"
except Exception as e:
error_msg = str(e)
if "ODPS-0420111" in error_msg or "Invalid credentials" in error_msg:
return False, "认证失败,请检查Access ID和Access Key是否正确"
elif "ODPS-0130071" in error_msg or "Project not found" in error_msg:
return False, "项目不存在,请检查Project名称是否正确"
else:
return False, f"连接测试失败: {error_msg}"
def execute_sql(sql_query):
"""执行SQL查询"""
try:
if not st.session_state.odps_client:
return False, "请先连接到ODPS", None
# 执行查询
with st.spinner('正在执行查询...'):
instance = st.session_state.odps_client.execute_sql(sql_query)
instance.wait_for_success()
# 获取结果
result = instance.open_reader()
data = []
for record in result:
data.append(record.values)
# 获取列名
columns = [field.name for field in result.schema]
# 创建DataFrame
df = pd.DataFrame(data, columns=columns)
# 保存到session state
st.session_state.query_result = df
# 添加到历史记录
if sql_query not in st.session_state.sql_history:
st.session_state.sql_history.append(sql_query)
if len(st.session_state.sql_history) > 10: # 保留最近10条
st.session_state.sql_history.pop(0)
return True, f"查询成功!返回 {len(df)} 行数据", df
except Exception as e:
error_msg = str(e)
# 提供更友好的错误信息
if "ODPS-0420111" in error_msg or "Invalid credentials" in error_msg:
st.session_state.connection_status = False
return False, "认证失败,请检查Access ID和Access Key是否正确", None
elif "ODPS-0130131" in error_msg or "Table not found" in error_msg:
return False, "表不存在,请检查表名是否正确", None
elif "ODPS-0130071" in error_msg or "Project not found" in error_msg:
st.session_state.connection_status = False
return False, "项目不存在,请检查Project名称是否正确", None
else:
return False, f"查询失败: {error_msg}", None
# ==================== 文件下载函数 ====================
def download_csv(df, filename):
"""生成CSV下载链接"""
try:
# CSV通常对特殊字符更宽容,但我们仍然可以进行一些清理
csv = df.to_csv(index=False, encoding='utf-8-sig', errors='replace')
b64 = base64.b64encode(csv.encode('utf-8-sig')).decode()
href = f'<a href="data:text/csv;base64,{b64}" download="{filename}">📥 下载CSV文件</a>'
return href
except Exception as e:
st.error(f"生成CSV文件时出错: {str(e)}")
return f'<span style="color: red;">CSV下载失败: {str(e)}</span>'
def clean_data_for_excel(df):
"""清理DataFrame中的非法字符,使其兼容Excel"""
def clean_cell_value(value):
if pd.isna(value) or value is None:
return value
if isinstance(value, str):
# 移除或替换Excel不支持的控制字符
# Excel不支持ASCII控制字符 (0-31,除了制表符、换行符、回车符)
# 也不支持某些Unicode控制字符
# 保留制表符(9)、换行符(10)、回车符(13),移除其他控制字符
clean_value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]', '', value)
# 移除某些特殊的Unicode字符
clean_value = re.sub(r'[\uFFFE\uFFFF]', '', clean_value)
# 限制字符串长度(Excel单元格最大32767字符)
if len(clean_value) > 32767:
clean_value = clean_value[:32764] + "..."
return clean_value
return value
# 创建数据副本
cleaned_df = df.copy()
# 清理所有列的数据
for column in cleaned_df.columns:
cleaned_df[column] = cleaned_df[column].apply(clean_cell_value)
return cleaned_df
def download_excel(df, filename):
"""生成Excel下载链接"""
try:
# 清理数据中的非法字符
cleaned_df = clean_data_for_excel(df)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
cleaned_df.to_excel(writer, index=False, sheet_name='查询结果')
output.seek(0)
b64 = base64.b64encode(output.read()).decode()
href = f'<a href="data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{b64}" download="{filename}">📥 下载Excel文件</a>'
return href
except Exception as e:
st.error(f"生成Excel文件时出错: {str(e)}")
return f'<span style="color: red;">Excel下载失败: {str(e)}</span>'
# ==================== 主应用函数 ====================
def main():
# 主标题
st.markdown('<h1 class="main-header">📊 SQL下载器</h1>', unsafe_allow_html=True)
# 连接配置区域 - 使用可折叠的expander
with st.expander("🔗 ODPS连接配置", expanded=not st.session_state.connection_status):
# 加载保存的配置
saved_config = load_connection_config()
st.markdown('<div class="connection-box">', unsafe_allow_html=True)
access_id = st.text_input(
"Access ID",
value=saved_config.get('access_id', ''),
type="password"
)
access_key = st.text_input(
"Access Key",
value=saved_config.get('access_key', ''),
type="password"
)
project = st.text_input(
"Project",
value=saved_config.get('project', '')
)
# 端点选择
saved_endpoint = saved_config.get('endpoint', ENDPOINTS['公网(经典网络)'])
# 检查保存的端点是否在预定义列表中
if saved_endpoint in ENDPOINTS.values():
# 找到对应的键
default_endpoint_key = [k for k, v in ENDPOINTS.items() if v == saved_endpoint][0]
default_index = list(ENDPOINTS.keys()).index(default_endpoint_key)
else:
default_index = 0
endpoint_option = st.selectbox(
"选择端点",
options=list(ENDPOINTS.keys()),
index=default_index
)
endpoint = ENDPOINTS[endpoint_option]
# 自定义端点选项
use_custom_endpoint = st.checkbox("使用自定义端点")
if use_custom_endpoint:
endpoint = st.text_input(
"自定义端点",
value=saved_endpoint if saved_endpoint not in ENDPOINTS.values()
else ENDPOINTS['公网(经典网络)']
)
col1, col2, col3 = st.columns(3)
with col1:
if st.button("🔗 连接", use_container_width=True):
if all([access_id, access_key, project, endpoint]):
success, message = connect_to_odps(access_id, access_key, project, endpoint)
if success:
st.success(message)
# 保存配置
config = {
'access_id': access_id,
'access_key': access_key,
'project': project,
'endpoint': endpoint
}
save_connection_config(config)
st.rerun() # 重新加载以收起配置区域
else:
st.error(message)
else:
st.error("请填写所有连接参数")
with col2:
if st.button("🧪 测试", use_container_width=True):
if st.session_state.connection_status:
success, message = test_odps_connection()
if success:
st.success(message)
else:
st.error(message)
st.session_state.connection_status = False
else:
st.error("请先点击连接按钮")
with col3:
if st.button("🧹 清除", use_container_width=True):
st.session_state.config_manager.clear_config()
st.session_state.connection_status = False
st.rerun()
st.markdown('</div>', unsafe_allow_html=True)
# 连接状态显示
if st.session_state.connection_status:
st.markdown('<p class="status-success">✅ 连接配置已保存</p>', unsafe_allow_html=True)
st.info("💡 建议点击'测试'按钮验证连接有效性")
else:
st.markdown('<p class="status-error">❌ 未连接</p>', unsafe_allow_html=True)
# SQL编辑器区域(全宽)
st.markdown('<div class="sql-editor-container">', unsafe_allow_html=True)
st.header("📝 SQL编辑器")
# SQL历史记录选择
if st.session_state.sql_history:
selected_history = st.selectbox(
"选择历史查询(可选)",
[""] + st.session_state.sql_history,
key="history_selector"
)
if selected_history:
st.session_state.current_sql = selected_history
# SQL编辑器(增加高度)
sql_query = st_ace(
value=getattr(st.session_state, 'current_sql', ''),
language='sql',
theme='monokai',
key="sql_editor",
height=450, # 进一步增加高度到450
auto_update=True,
font_size=14,
tab_size=2,
annotations=None,
markers=None,
wrap=True
)
# 执行按钮
if st.button("🚀 执行查询", type="primary", use_container_width=True):
if sql_query.strip():
if st.session_state.connection_status:
success, message, result = execute_sql(sql_query)
if success:
st.success(message)
else:
st.error(message)
else:
st.error("请先连接到ODPS")
else:
st.error("请输入SQL查询语句")
st.markdown('</div>', unsafe_allow_html=True)
# 下载区域
if st.session_state.query_result is not None:
st.markdown('<div class="download-section">', unsafe_allow_html=True)
st.header("📥 下载查询结果")
df = st.session_state.query_result
# 显示基本信息
col1, col2, col3 = st.columns(3)
with col1:
st.metric("总行数", len(df))
with col2:
st.metric("列数", len(df.columns))
with col3:
# 计算大概的文件大小
size_mb = df.memory_usage(deep=True).sum() / 1024 / 1024
st.metric("预估大小", f"{size_mb:.2f} MB")
# 检查是否有特殊字符需要清理(针对Excel)
try:
has_special_chars = False
# 只检查字符串类型的列,并限制检查的数据量
for column in df.select_dtypes(include=['object']).columns[:5]: # 最多检查5列
sample_data = df[column].dropna().astype(str).head(50) # 检查前50行
if any(re.search(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F\uFFFE\uFFFF]', str(val)) for val in sample_data):
has_special_chars = True
break
if has_special_chars:
st.info("⚠️ 检测到数据中包含特殊字符,Excel下载时将自动清理这些字符以确保兼容性")
except Exception:
# 如果检测失败,静默忽略,不影响主要功能
pass
# 文件名配置
col1, col2 = st.columns(2)
with col1:
filename_base = st.text_input(
"文件名前缀",
value=f"query_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
with col2:
st.write("") # 占位符,对齐高度
st.write("")
# 下载按钮
col1, col2 = st.columns(2)
with col1:
csv_filename = f"{filename_base}.csv"
csv_link = download_csv(df, csv_filename)
st.markdown(csv_link, unsafe_allow_html=True)
with col2:
excel_filename = f"{filename_base}.xlsx"
excel_link = download_excel(df, excel_filename)
st.markdown(excel_link, unsafe_allow_html=True)
st.markdown('</div>', unsafe_allow_html=True)
if __name__ == "__main__":
main()
streamlit>=1.28.0 pyodps>=0.11.0 pandas>=1.5.0 openpyxl>=3.1.0 streamlit-ace>=0.1.1 numpy>=1.24.0 cryptography>=3.4.8 python-dotenv>=1.0.0
基于pyodps和streamlit构建的MaxCompute SQL查询结果下载器,提供简洁易用的Web界面。
python3.8以上
bashpip install -r requirements.txt
bashstreamlit run app.py
在侧边栏填入ODPS连接信息:
sql_download/ ├── app.py # 主应用文件(包含所有功能) ├── requirements.txt # 项目依赖 ├── environment.yml # Conda环境配置 ├── start.sh # 快速启动脚本 ├── .gitignore # Git忽略文件 ├── README.md # 项目说明 ├── INSTALL.md # 安装指南 └── TODO.md # 开发计划
详细的开发计划请查看 TODO.md
本文作者:ender
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!