import streamlit as st
import os
import uuid
import tempfile
from pathlib import Path
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 导入工具模块
from utils.pdf_extractor import extract_text_from_pdf, pdf_to_excel
from utils.ocr_processor import extract_text_from_image, image_to_excel, image_to_text_file
from utils.format_converter import (
excel_to_csv, csv_to_excel, json_to_excel,
excel_to_json, csv_to_json, json_to_csv
)
from utils.web_scraper import scrape_webpage, web_to_excel
from utils.database_exporter import export_sqlite_to_excel, database_to_csv, database_to_json
# 页面配置
st.set_page_config(
page_title="数据提取与转换器",
page_icon="🔧",
layout="wide",
initial_sidebar_state="expanded"
)
# 自定义CSS样式
st.markdown("""
""", unsafe_allow_html=True)
# 页面标题
st.markdown("""
""", unsafe_allow_html=True)
# 侧边栏导航
st.sidebar.title("功能导航")
page = st.sidebar.radio("选择功能", [
"📄 PDF处理",
"🖼️ 图片OCR",
"📸 AI照片评分",
"🔄 格式转换",
"🌐 网页抓取",
"🗄️ 数据库导出"
])
# 文件上传函数
def save_uploaded_file(uploaded_file, file_type):
"""保存上传的文件到临时目录"""
try:
# 创建临时文件
suffix = Path(uploaded_file.name).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
tmp_file.write(uploaded_file.getvalue())
return tmp_file.name
except Exception as e:
st.error(f"文件保存失败: {str(e)}")
return None
# PDF处理页面
if page == "📄 PDF处理":
st.header("📄 PDF文本/表格提取")
uploaded_file = st.file_uploader("选择PDF文件", type=['pdf'])
if uploaded_file is not None:
file_path = save_uploaded_file(uploaded_file, 'pdf')
col1, col2 = st.columns(2)
with col1:
if st.button("提取文本内容", use_container_width=True):
with st.spinner("正在提取文本..."):
try:
text = extract_text_from_pdf(file_path)
st.subheader("提取的文本内容")
st.text_area("文本内容", text, height=300)
st.success("文本提取完成!")
except Exception as e:
st.error(f"提取失败: {str(e)}")
with col2:
if st.button("导出为Excel", use_container_width=True):
with st.spinner("正在转换为Excel..."):
try:
output_path = file_path.replace('.pdf', '_converted.xlsx')
pdf_to_excel(file_path, output_path)
with open(output_path, "rb") as file:
st.download_button(
label="下载Excel文件",
data=file,
file_name=Path(output_path).name,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
st.success("PDF转换完成!")
except Exception as e:
st.error(f"转换失败: {str(e)}")
# AI照片评分页面
elif page == "📸 AI照片评分":
st.header("📸 AI照片质量评分")
# 百度智能云功能状态检查
try:
from utils.baidu_image_analysis import check_baidu_config
baidu_available, baidu_message = check_baidu_config()
except:
baidu_available = False
baidu_message = "百度智能云未配置"
# 显示状态
if baidu_available:
st.success("✅ 百度智能云AI照片评分可用")
else:
st.warning(f"⚠️ 百度智能云AI照片评分: {baidu_message}")
if not baidu_available:
st.info("""
**百度智能云配置说明:**
1. **注册百度智能云账号**: https://cloud.baidu.com
2. **开通图像分析服务**: 在控制台搜索"图像分析"或"图像识别"
3. **获取API密钥**: 创建应用并获取API Key和Secret Key
4. **在.env文件中配置**:
```
BAIDU_API_KEY=您的API Key
BAIDU_SECRET_KEY=您的Secret Key
```
""")
uploaded_file = st.file_uploader("选择照片文件", type=['jpg', 'jpeg', 'png', 'gif', 'bmp'])
if uploaded_file is not None:
file_path = save_uploaded_file(uploaded_file, 'image')
# AI文案生成功能状态检查
try:
from utils.ai_copywriter import check_copywriter_config
copywriter_available, copywriter_message = check_copywriter_config()
except:
copywriter_available = False
copywriter_message = "AI文案生成未配置"
# 显示AI文案生成状态
if copywriter_available:
st.success("✅ AI文案生成可用")
else:
st.warning(f"⚠️ AI文案生成: {copywriter_message}")
col1, col2, col3, col4 = st.columns(4)
with col1:
if st.button("质量评分", use_container_width=True, disabled=not baidu_available):
with st.spinner("正在分析照片质量..."):
try:
from utils.baidu_image_analysis import analyze_image_quality
from utils.photo_advice_generator import get_quality_improvement_advice
quality_result = analyze_image_quality(file_path)
st.subheader("📊 照片质量评分")
# 显示总体评分
score = quality_result['score']
st.metric("总体评分", f"{score}/100", f"{score - 75}")
# 显示质量维度
st.subheader("质量维度分析")
quality_scores = {}
for dimension, info in quality_result['dimensions'].items():
col_dim1, col_dim2 = st.columns([1, 3])
with col_dim1:
st.progress(info['score'] / 100)
with col_dim2:
st.write(f"**{dimension}**: {info['comment']} ({info['score']}/100)")
quality_scores[dimension] = info['score']
# 生成详细改进建议
advice_result = get_quality_improvement_advice(quality_scores)
# 显示总体建议
st.subheader("💡 总体改进建议")
for suggestion in advice_result.get('overall', []):
st.info(f"📌 {suggestion}")
# 显示优先级建议
if advice_result.get('priority'):
st.subheader("🎯 优先级改进")
for priority in advice_result['priority']:
st.warning(f"⚠️ {priority}")
# 显示具体维度建议
st.subheader("🔧 具体改进措施")
for dimension, suggestions in advice_result.get('specific', {}).items():
with st.expander(f"{dimension}改进建议"):
for i, suggestion in enumerate(suggestions, 1):
st.write(f"{i}. {suggestion}")
# 显示技术建议
st.subheader("📚 技术学习建议")
from utils.photo_advice_generator import get_technical_advice
tech_advice = get_technical_advice()
for category, suggestions in tech_advice.items():
with st.expander(f"{category}技术建议"):
for i, suggestion in enumerate(suggestions[:3], 1):
st.write(f"{i}. {suggestion}")
st.success("照片质量分析完成!已生成详细改进建议")
except Exception as e:
st.error(f"质量评分失败: {str(e)}")
with col2:
if st.button("内容分析", use_container_width=True, disabled=not baidu_available):
with st.spinner("正在分析照片内容..."):
try:
from utils.baidu_image_analysis import analyze_image_content
content_result = analyze_image_content(file_path)
st.subheader("🔍 照片内容分析")
if content_result['objects']:
st.write("**识别到的对象:**")
for i, obj in enumerate(content_result['objects'][:5], 1):
st.write(f"{i}. **{obj['name']}** (置信度: {obj['confidence']:.2%})")
if obj.get('baike_info'):
st.write(f" 描述: {obj['baike_info'].get('description', '无描述')}")
if content_result['summary']:
st.write(f"**内容摘要:** {content_result['summary']}")
st.success("照片内容分析完成!")
except Exception as e:
st.error(f"内容分析失败: {str(e)}")
with col3:
if st.button("美学评分", use_container_width=True, disabled=not baidu_available):
with st.spinner("正在评估照片美学..."):
try:
from utils.baidu_image_analysis import get_image_aesthetic_score
from utils.photo_advice_generator import get_aesthetic_improvement_advice
aesthetic_result = get_image_aesthetic_score(file_path)
st.subheader("🎨 照片美学评分")
# 显示美学评分
aesthetic_score = aesthetic_result['aesthetic_score']
st.metric("美学评分", f"{aesthetic_score}/100", f"{aesthetic_score - 75}")
# 显示美学维度
st.subheader("美学维度分析")
col_comp, col_color, col_light, col_focus = st.columns(4)
with col_comp:
st.metric("构图", aesthetic_result['composition'])
with col_color:
st.metric("色彩和谐", aesthetic_result['color_harmony'])
with col_light:
st.metric("光线", aesthetic_result['lighting'])
with col_focus:
st.metric("对焦", aesthetic_result['focus'])
# 生成详细美学建议
advice_result = get_aesthetic_improvement_advice(aesthetic_score)
# 显示总体美学建议
st.subheader("💡 总体美学建议")
for suggestion in advice_result.get('general', []):
st.info(f"🎨 {suggestion}")
# 显示具体美学建议
st.subheader("🔧 具体美学改进")
if advice_result.get('composition'):
with st.expander("构图改进建议"):
for i, suggestion in enumerate(advice_result['composition'], 1):
st.write(f"{i}. {suggestion}")
if advice_result.get('lighting'):
with st.expander("用光改进建议"):
for i, suggestion in enumerate(advice_result['lighting'], 1):
st.write(f"{i}. {suggestion}")
if advice_result.get('subject'):
with st.expander("主体表现建议"):
for i, suggestion in enumerate(advice_result['subject'], 1):
st.write(f"{i}. {suggestion}")
# 显示创意建议
if advice_result.get('creative'):
st.subheader("🌟 创意提升建议")
for suggestion in advice_result['creative']:
st.success(f"✨ {suggestion}")
# 显示个性化建议
st.subheader("📋 个性化学习计划")
from utils.photo_advice_generator import get_personalized_advice
# 获取照片内容用于个性化建议
from utils.baidu_image_analysis import analyze_image_content
content_result = analyze_image_content(file_path)
photo_content = content_result.get('summary', '一般照片')
# 生成质量分数用于个性化建议
from utils.baidu_image_analysis import analyze_image_quality
quality_result = analyze_image_quality(file_path)
quality_scores = {dim: info['score'] for dim, info in quality_result['dimensions'].items()}
personalized_advice = get_personalized_advice(quality_scores, aesthetic_score, photo_content)
for category, suggestions in personalized_advice.items():
if suggestions:
with st.expander(f"{category}"):
for i, suggestion in enumerate(suggestions, 1):
st.write(f"{i}. {suggestion}")
st.success("照片美学评估完成!已生成详细改进建议")
except Exception as e:
st.error(f"美学评分失败: {str(e)}")
with col4:
if st.button("AI写文案", use_container_width=True, disabled=not copywriter_available):
with st.spinner("正在生成创意文案..."):
try:
# 先进行内容分析获取照片描述
from utils.baidu_image_analysis import analyze_image_content
content_result = analyze_image_content(file_path)
# 使用AI生成文案
from utils.ai_copywriter import generate_multiple_captions, analyze_photo_suitability
# 获取照片描述
image_description = content_result.get('summary', '一张美丽的照片')
# 分析适合的文案风格
suitability_result = analyze_photo_suitability(image_description)
st.subheader("✍️ AI创意文案生成")
# 显示照片描述
st.write(f"**照片描述**: {image_description}")
# 显示推荐风格
st.write(f"**推荐风格**: {', '.join(suitability_result['recommended_styles'][:3])}")
# 生成多个文案选项
captions = generate_multiple_captions(image_description, count=3, style=suitability_result['most_suitable'])
st.subheader("📝 文案选项")
for caption_info in captions:
with st.expander(f"选项 {caption_info['option']} ({caption_info.get('length', '适中')} - {caption_info['char_count']}字)"):
st.write(caption_info['caption'])
# 复制按钮
if st.button(f"复制文案 {caption_info['option']}", key=f"copy_{caption_info['option']}"):
st.code(caption_info['caption'], language='text')
st.success("文案已复制到剪贴板!")
st.subheader("🎨 文案风格选择")
# 风格选择
selected_style = st.selectbox(
"选择文案风格",
['creative', 'social', 'professional', 'marketing', 'emotional', 'simple'],
format_func=lambda x: {
'creative': '创意文艺',
'social': '社交媒体',
'professional': '专业正式',
'marketing': '营销推广',
'emotional': '情感表达',
'simple': '简单描述'
}[x]
)
# 长度选择
selected_length = st.selectbox(
"选择文案长度",
['short', 'medium', 'long'],
format_func=lambda x: {
'short': '简短精炼',
'medium': '适中长度',
'long': '详细描述'
}[x]
)
if st.button("重新生成文案", use_container_width=True):
with st.spinner("正在重新生成文案..."):
new_caption = generate_photo_caption(image_description, selected_style, selected_length)
st.subheader("🆕 新生成文案")
st.write(new_caption)
st.success("新文案生成完成!")
st.success("AI文案生成完成!")
except Exception as e:
st.error(f"AI文案生成失败: {str(e)}")
# 显示图片预览
st.subheader("📷 照片预览")
st.image(uploaded_file, caption="上传的照片", use_column_width=True)
# 图片OCR页面
elif page == "🖼️ 图片OCR":
st.header("🖼️ 图片文字识别 (OCR)")
# OCR功能状态检查
try:
import pytesseract
# 测试Tesseract是否可用
pytesseract.get_tesseract_version()
tesseract_available = True
except:
tesseract_available = False
# AI OCR功能状态检查
try:
from utils.aliyun_ocr import check_aliyun_config
ai_available, ai_message = check_aliyun_config()
except:
ai_available = False
ai_message = "阿里云OCR未配置"
# 显示OCR状态
col_status1, col_status2 = st.columns(2)
with col_status1:
if tesseract_available:
st.success("✅ Tesseract OCR可用")
else:
st.warning("⚠️ Tesseract OCR未安装")
with col_status2:
if ai_available:
st.success("✅ AI大模型OCR可用")
else:
st.warning(f"⚠️ AI大模型OCR: {ai_message}")
# OCR模式选择
ocr_mode = st.radio("选择OCR模式",
["传统OCR (Tesseract)", "AI大模型OCR (阿里云)"],
disabled=not (tesseract_available or ai_available))
if not tesseract_available and not ai_available:
st.info("""
**OCR功能配置说明:**
**传统OCR (推荐免费):**
1. 下载Tesseract OCR: https://github.com/UB-Mannheim/tesseract/wiki
2. 安装到默认路径并添加到PATH
**AI大模型OCR (高精度):**
1. 注册阿里云账号: https://www.aliyun.com
2. 开通OCR服务并获取AccessKey
3. 在.env文件中配置ALIYUN_ACCESS_KEY_ID和ALIYUN_ACCESS_KEY_SECRET
""")
uploaded_file = st.file_uploader("选择图片文件", type=['jpg', 'jpeg', 'png', 'gif', 'bmp'])
if uploaded_file is not None:
file_path = save_uploaded_file(uploaded_file, 'image')
# 根据选择的模式启用/禁用按钮
use_ai = ocr_mode == "AI大模型OCR (阿里云)"
button_disabled = (use_ai and not ai_available) or (not use_ai and not tesseract_available)
col1, col2, col3 = st.columns(3)
with col1:
if st.button("识别文字", use_container_width=True, disabled=button_disabled):
with st.spinner("正在识别文字..."):
try:
if use_ai:
text = extract_text_from_image(file_path, use_ai=True, ai_provider='aliyun')
else:
text = extract_text_from_image(file_path)
st.subheader("识别的文字内容")
st.text_area("文字内容", text, height=300)
st.success("文字识别完成!")
except Exception as e:
st.error(f"识别失败: {str(e)}")
with col2:
if st.button("导出为Excel", use_container_width=True, disabled=button_disabled):
with st.spinner("正在转换为Excel..."):
try:
output_path = file_path.rsplit('.', 1)[0] + '_converted.xlsx'
if use_ai:
# 使用AI OCR导出到Excel
from utils.ocr_processor import extract_text_with_ai
text = extract_text_with_ai(file_path, 'aliyun', 'general')
import pandas as pd
lines = [line.strip() for line in text.split('\n') if line.strip()]
df = pd.DataFrame({
'行号': range(1, len(lines) + 1),
'内容': lines
})
df.to_excel(output_path, index=False)
else:
image_to_excel(file_path, output_path)
with open(output_path, "rb") as file:
st.download_button(
label="下载Excel文件",
data=file,
file_name=Path(output_path).name,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
st.success("图片转换完成!")
except Exception as e:
st.error(f"转换失败: {str(e)}")
with col3:
if st.button("导出为文本", use_container_width=True, disabled=button_disabled):
with st.spinner("正在转换为文本..."):
try:
output_path = file_path.rsplit('.', 1)[0] + '_converted.txt'
if use_ai:
# 使用AI OCR导出到文本
from utils.ocr_processor import extract_text_with_ai
text = extract_text_with_ai(file_path, 'aliyun', 'general')
with open(output_path, 'w', encoding='utf-8') as f:
f.write(text)
else:
image_to_text_file(file_path, output_path)
with open(output_path, "rb") as file:
st.download_button(
label="下载文本文件",
data=file,
file_name=Path(output_path).name,
mime="text/plain"
)
st.success("图片转换完成!")
except Exception as e:
st.error(f"转换失败: {str(e)}")
# 显示图片预览
st.subheader("图片预览")
st.image(uploaded_file, caption="上传的图片", use_column_width=True)
# 显示OCR模式信息
st.info(f"当前使用: {ocr_mode}")
# 格式转换页面
elif page == "🔄 格式转换":
st.header("🔄 文件格式转换")
uploaded_file = st.file_uploader("选择文件", type=['xlsx', 'xls', 'csv', 'json'])
if uploaded_file is not None:
file_path = save_uploaded_file(uploaded_file, 'format')
file_ext = Path(uploaded_file.name).suffix.lower()
# 根据文件类型显示可转换的格式
if file_ext in ['.xlsx', '.xls']:
target_format = st.selectbox("转换为", ["CSV", "JSON"])
elif file_ext == '.csv':
target_format = st.selectbox("转换为", ["Excel", "JSON"])
elif file_ext == '.json':
target_format = st.selectbox("转换为", ["Excel", "CSV"])
if st.button("开始转换", use_container_width=True):
with st.spinner("正在转换格式..."):
try:
if file_ext in ['.xlsx', '.xls'] and target_format == "CSV":
output_path = file_path.replace(file_ext, '.csv')
excel_to_csv(file_path, output_path)
mime_type = "text/csv"
elif file_ext in ['.xlsx', '.xls'] and target_format == "JSON":
output_path = file_path.replace(file_ext, '.json')
excel_to_json(file_path, output_path)
mime_type = "application/json"
elif file_ext == '.csv' and target_format == "Excel":
output_path = file_path.replace('.csv', '.xlsx')
csv_to_excel(file_path, output_path)
mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
elif file_ext == '.csv' and target_format == "JSON":
output_path = file_path.replace('.csv', '.json')
csv_to_json(file_path, output_path)
mime_type = "application/json"
elif file_ext == '.json' and target_format == "Excel":
output_path = file_path.replace('.json', '.xlsx')
json_to_excel(file_path, output_path)
mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
elif file_ext == '.json' and target_format == "CSV":
output_path = file_path.replace('.json', '.csv')
json_to_csv(file_path, output_path)
mime_type = "text/csv"
with open(output_path, "rb") as file:
st.download_button(
label=f"下载{target_format}文件",
data=file,
file_name=Path(output_path).name,
mime=mime_type
)
st.success("格式转换完成!")
except Exception as e:
st.error(f"转换失败: {str(e)}")
# 网页抓取页面
elif page == "🌐 网页抓取":
st.header("🌐 网页数据抓取")
url = st.text_input("网页URL", placeholder="https://example.com")
selector = st.text_input("CSS选择器 (可选)", placeholder="例如: .content, #main, p")
col1, col2 = st.columns(2)
with col1:
if st.button("抓取内容", use_container_width=True):
if not url:
st.error("请输入网页URL")
else:
with st.spinner("正在抓取网页内容..."):
try:
content = scrape_webpage(url, selector if selector else None)
st.subheader("抓取的内容")
st.text_area("网页内容", content, height=300)
st.success("网页抓取完成!")
except Exception as e:
st.error(f"抓取失败: {str(e)}")
with col2:
if st.button("导出为Excel", use_container_width=True):
if not url:
st.error("请输入网页URL")
else:
with st.spinner("正在导出为Excel..."):
try:
output_filename = f"web_content_{uuid.uuid4().hex[:8]}.xlsx"
output_path = os.path.join(tempfile.gettempdir(), output_filename)
web_to_excel(url, output_path, selector if selector else None)
with open(output_path, "rb") as file:
st.download_button(
label="下载Excel文件",
data=file,
file_name=output_filename,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
st.success("网页导出完成!")
except Exception as e:
st.error(f"导出失败: {str(e)}")
# 数据库导出页面
elif page == "🗄️ 数据库导出":
st.header("🗄️ 数据库导出")
uploaded_file = st.file_uploader("选择数据库文件", type=['db', 'sqlite', 'mdf'])
table_name = st.text_input("表名 (可选)", placeholder="留空则导出所有表")
if uploaded_file is not None:
file_path = save_uploaded_file(uploaded_file, 'database')
target_format = st.selectbox("导出为", ["Excel", "CSV", "JSON"])
if st.button("开始导出", use_container_width=True):
with st.spinner("正在导出数据库..."):
try:
file_ext = Path(file_path).suffix.lower()
continue_processing = True # 默认继续处理
if file_ext in ['.db', '.sqlite']:
if target_format == "Excel":
output_path = file_path.replace(file_ext, '_exported.xlsx')
export_sqlite_to_excel(file_path, output_path, table_name if table_name else None)
mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
elif target_format == "CSV":
output_path = file_path.replace(file_ext, '_exported.csv')
database_to_csv(file_path, output_path, table_name if table_name else None)
mime_type = "text/csv"
elif target_format == "JSON":
output_path = file_path.replace(file_ext, '_exported.json')
database_to_json(file_path, output_path, table_name if table_name else None)
mime_type = "application/json"
elif file_ext == '.mdf':
# MDF文件处理
try:
import pyodbc
# 测试SQL Server连接
test_conn = pyodbc.connect("DRIVER={SQL Server};SERVER=localhost;Trusted_Connection=yes;timeout=3")
test_conn.close()
sql_server_available = True
except:
sql_server_available = False
st.warning("⚠️ SQL Server未运行或无法连接")
st.info("""
**MDF文件导出需要SQL Server支持:**
1. **安装SQL Server Express** (免费)
- 下载: https://www.microsoft.com/en-us/sql-server/sql-server-downloads
2. **确保SQL Server服务运行**
- 打开"服务"管理器 (services.msc)
- 启动"SQL Server (MSSQLSERVER)"服务
3. **配置连接权限**
- 使用Windows身份验证或配置sa密码
安装完成后重启应用即可使用MDF导出功能。
""")
# 不执行后续操作
if sql_server_available:
if target_format == "Excel":
output_path = file_path.replace(file_ext, '_exported.xlsx')
from utils.database_exporter import export_mssql_mdf_to_excel
export_mssql_mdf_to_excel(file_path, output_path, table_name if table_name else None)
mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
elif target_format == "CSV":
output_path = file_path.replace(file_ext, '_exported.csv')
database_to_csv(file_path, output_path, table_name if table_name else None)
mime_type = "text/csv"
elif target_format == "JSON":
output_path = file_path.replace(file_ext, '_exported.json')
database_to_json(file_path, output_path, table_name if table_name else None)
mime_type = "application/json"
else:
st.error("不支持的数据库格式")
# 不执行后续操作
continue_processing = False
# 只有在成功处理时才执行下载操作
if continue_processing and 'output_path' in locals() and os.path.exists(output_path):
with open(output_path, "rb") as file:
st.download_button(
label=f"下载{target_format}文件",
data=file,
file_name=Path(output_path).name,
mime=mime_type
)
st.success("数据库导出完成!")
elif not continue_processing:
# 不支持的格式,不显示下载按钮
pass
else:
st.error("导出文件创建失败")
except Exception as e:
st.error(f"导出失败: {str(e)}")
# 页脚信息
st.sidebar.markdown("---")
st.sidebar.markdown("""
### 使用说明
1. 选择功能模块
2. 上传文件或输入URL
3. 点击相应按钮处理
4. 下载处理结果
### 支持格式
- **PDF**: .pdf
- **图片**: .jpg, .jpeg, .png, .gif, .bmp
- **数据文件**: .xlsx, .xls, .csv, .json
- **数据库**: .db, .sqlite, .mdf
""")