import streamlit as st import os import uuid import tempfile from pathlib import Path from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 导入工具模块 from utils.pdf_extractor import extract_text_from_pdf, pdf_to_excel from utils.ocr_processor import extract_text_from_image, image_to_excel, image_to_text_file from utils.format_converter import ( excel_to_csv, csv_to_excel, json_to_excel, excel_to_json, csv_to_json, json_to_csv ) from utils.web_scraper import scrape_webpage, web_to_excel from utils.database_exporter import export_sqlite_to_excel, database_to_csv, database_to_json # 页面配置 st.set_page_config( page_title="数据提取与转换器", page_icon="🔧", layout="wide", initial_sidebar_state="expanded" ) # 自定义CSS样式 st.markdown(""" """, unsafe_allow_html=True) # 页面标题 st.markdown("""

🔧 数据提取与转换器

多功能数据处理工具

""", unsafe_allow_html=True) # 侧边栏导航 st.sidebar.title("功能导航") page = st.sidebar.radio("选择功能", [ "📄 PDF处理", "🖼️ 图片OCR", "📸 AI照片评分", "🔄 格式转换", "🌐 网页抓取", "🗄️ 数据库导出" ]) # 文件上传函数 def save_uploaded_file(uploaded_file, file_type): """保存上传的文件到临时目录""" try: # 创建临时文件 suffix = Path(uploaded_file.name).suffix with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: tmp_file.write(uploaded_file.getvalue()) return tmp_file.name except Exception as e: st.error(f"文件保存失败: {str(e)}") return None # PDF处理页面 if page == "📄 PDF处理": st.header("📄 PDF文本/表格提取") uploaded_file = st.file_uploader("选择PDF文件", type=['pdf']) if uploaded_file is not None: file_path = save_uploaded_file(uploaded_file, 'pdf') col1, col2 = st.columns(2) with col1: if st.button("提取文本内容", use_container_width=True): with st.spinner("正在提取文本..."): try: text = extract_text_from_pdf(file_path) st.subheader("提取的文本内容") st.text_area("文本内容", text, height=300) st.success("文本提取完成!") except Exception as e: st.error(f"提取失败: {str(e)}") with col2: if st.button("导出为Excel", use_container_width=True): with st.spinner("正在转换为Excel..."): try: output_path = file_path.replace('.pdf', '_converted.xlsx') pdf_to_excel(file_path, output_path) with open(output_path, "rb") as file: st.download_button( label="下载Excel文件", data=file, file_name=Path(output_path).name, mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) st.success("PDF转换完成!") except Exception as e: st.error(f"转换失败: {str(e)}") # AI照片评分页面 elif page == "📸 AI照片评分": st.header("📸 AI照片质量评分") # 百度智能云功能状态检查 try: from utils.baidu_image_analysis import check_baidu_config baidu_available, baidu_message = check_baidu_config() except: baidu_available = False baidu_message = "百度智能云未配置" # 显示状态 if baidu_available: st.success("✅ 百度智能云AI照片评分可用") else: st.warning(f"⚠️ 百度智能云AI照片评分: {baidu_message}") if not baidu_available: st.info(""" **百度智能云配置说明:** 1. **注册百度智能云账号**: https://cloud.baidu.com 2. **开通图像分析服务**: 在控制台搜索"图像分析"或"图像识别" 3. **获取API密钥**: 创建应用并获取API Key和Secret Key 4. **在.env文件中配置**: ``` BAIDU_API_KEY=您的API Key BAIDU_SECRET_KEY=您的Secret Key ``` """) uploaded_file = st.file_uploader("选择照片文件", type=['jpg', 'jpeg', 'png', 'gif', 'bmp']) if uploaded_file is not None: file_path = save_uploaded_file(uploaded_file, 'image') # AI文案生成功能状态检查 try: from utils.ai_copywriter import check_copywriter_config copywriter_available, copywriter_message = check_copywriter_config() except: copywriter_available = False copywriter_message = "AI文案生成未配置" # 显示AI文案生成状态 if copywriter_available: st.success("✅ AI文案生成可用") else: st.warning(f"⚠️ AI文案生成: {copywriter_message}") col1, col2, col3, col4 = st.columns(4) with col1: if st.button("质量评分", use_container_width=True, disabled=not baidu_available): with st.spinner("正在分析照片质量..."): try: from utils.baidu_image_analysis import analyze_image_quality from utils.photo_advice_generator import get_quality_improvement_advice quality_result = analyze_image_quality(file_path) st.subheader("📊 照片质量评分") # 显示总体评分 score = quality_result['score'] st.metric("总体评分", f"{score}/100", f"{score - 75}") # 显示质量维度 st.subheader("质量维度分析") quality_scores = {} for dimension, info in quality_result['dimensions'].items(): col_dim1, col_dim2 = st.columns([1, 3]) with col_dim1: st.progress(info['score'] / 100) with col_dim2: st.write(f"**{dimension}**: {info['comment']} ({info['score']}/100)") quality_scores[dimension] = info['score'] # 生成详细改进建议 advice_result = get_quality_improvement_advice(quality_scores) # 显示总体建议 st.subheader("💡 总体改进建议") for suggestion in advice_result.get('overall', []): st.info(f"📌 {suggestion}") # 显示优先级建议 if advice_result.get('priority'): st.subheader("🎯 优先级改进") for priority in advice_result['priority']: st.warning(f"⚠️ {priority}") # 显示具体维度建议 st.subheader("🔧 具体改进措施") for dimension, suggestions in advice_result.get('specific', {}).items(): with st.expander(f"{dimension}改进建议"): for i, suggestion in enumerate(suggestions, 1): st.write(f"{i}. {suggestion}") # 显示技术建议 st.subheader("📚 技术学习建议") from utils.photo_advice_generator import get_technical_advice tech_advice = get_technical_advice() for category, suggestions in tech_advice.items(): with st.expander(f"{category}技术建议"): for i, suggestion in enumerate(suggestions[:3], 1): st.write(f"{i}. {suggestion}") st.success("照片质量分析完成!已生成详细改进建议") except Exception as e: st.error(f"质量评分失败: {str(e)}") with col2: if st.button("内容分析", use_container_width=True, disabled=not baidu_available): with st.spinner("正在分析照片内容..."): try: from utils.baidu_image_analysis import analyze_image_content content_result = analyze_image_content(file_path) st.subheader("🔍 照片内容分析") if content_result['objects']: st.write("**识别到的对象:**") for i, obj in enumerate(content_result['objects'][:5], 1): st.write(f"{i}. **{obj['name']}** (置信度: {obj['confidence']:.2%})") if obj.get('baike_info'): st.write(f" 描述: {obj['baike_info'].get('description', '无描述')}") if content_result['summary']: st.write(f"**内容摘要:** {content_result['summary']}") st.success("照片内容分析完成!") except Exception as e: st.error(f"内容分析失败: {str(e)}") with col3: if st.button("美学评分", use_container_width=True, disabled=not baidu_available): with st.spinner("正在评估照片美学..."): try: from utils.baidu_image_analysis import get_image_aesthetic_score from utils.photo_advice_generator import get_aesthetic_improvement_advice aesthetic_result = get_image_aesthetic_score(file_path) st.subheader("🎨 照片美学评分") # 显示美学评分 aesthetic_score = aesthetic_result['aesthetic_score'] st.metric("美学评分", f"{aesthetic_score}/100", f"{aesthetic_score - 75}") # 显示美学维度 st.subheader("美学维度分析") col_comp, col_color, col_light, col_focus = st.columns(4) with col_comp: st.metric("构图", aesthetic_result['composition']) with col_color: st.metric("色彩和谐", aesthetic_result['color_harmony']) with col_light: st.metric("光线", aesthetic_result['lighting']) with col_focus: st.metric("对焦", aesthetic_result['focus']) # 生成详细美学建议 advice_result = get_aesthetic_improvement_advice(aesthetic_score) # 显示总体美学建议 st.subheader("💡 总体美学建议") for suggestion in advice_result.get('general', []): st.info(f"🎨 {suggestion}") # 显示具体美学建议 st.subheader("🔧 具体美学改进") if advice_result.get('composition'): with st.expander("构图改进建议"): for i, suggestion in enumerate(advice_result['composition'], 1): st.write(f"{i}. {suggestion}") if advice_result.get('lighting'): with st.expander("用光改进建议"): for i, suggestion in enumerate(advice_result['lighting'], 1): st.write(f"{i}. {suggestion}") if advice_result.get('subject'): with st.expander("主体表现建议"): for i, suggestion in enumerate(advice_result['subject'], 1): st.write(f"{i}. {suggestion}") # 显示创意建议 if advice_result.get('creative'): st.subheader("🌟 创意提升建议") for suggestion in advice_result['creative']: st.success(f"✨ {suggestion}") # 显示个性化建议 st.subheader("📋 个性化学习计划") from utils.photo_advice_generator import get_personalized_advice # 获取照片内容用于个性化建议 from utils.baidu_image_analysis import analyze_image_content content_result = analyze_image_content(file_path) photo_content = content_result.get('summary', '一般照片') # 生成质量分数用于个性化建议 from utils.baidu_image_analysis import analyze_image_quality quality_result = analyze_image_quality(file_path) quality_scores = {dim: info['score'] for dim, info in quality_result['dimensions'].items()} personalized_advice = get_personalized_advice(quality_scores, aesthetic_score, photo_content) for category, suggestions in personalized_advice.items(): if suggestions: with st.expander(f"{category}"): for i, suggestion in enumerate(suggestions, 1): st.write(f"{i}. {suggestion}") st.success("照片美学评估完成!已生成详细改进建议") except Exception as e: st.error(f"美学评分失败: {str(e)}") with col4: if st.button("AI写文案", use_container_width=True, disabled=not copywriter_available): with st.spinner("正在生成创意文案..."): try: # 先进行内容分析获取照片描述 from utils.baidu_image_analysis import analyze_image_content content_result = analyze_image_content(file_path) # 使用AI生成文案 from utils.ai_copywriter import generate_multiple_captions, analyze_photo_suitability # 获取照片描述 image_description = content_result.get('summary', '一张美丽的照片') # 分析适合的文案风格 suitability_result = analyze_photo_suitability(image_description) st.subheader("✍️ AI创意文案生成") # 显示照片描述 st.write(f"**照片描述**: {image_description}") # 显示推荐风格 st.write(f"**推荐风格**: {', '.join(suitability_result['recommended_styles'][:3])}") # 生成多个文案选项 captions = generate_multiple_captions(image_description, count=3, style=suitability_result['most_suitable']) st.subheader("📝 文案选项") for caption_info in captions: with st.expander(f"选项 {caption_info['option']} ({caption_info.get('length', '适中')} - {caption_info['char_count']}字)"): st.write(caption_info['caption']) # 复制按钮 if st.button(f"复制文案 {caption_info['option']}", key=f"copy_{caption_info['option']}"): st.code(caption_info['caption'], language='text') st.success("文案已复制到剪贴板!") st.subheader("🎨 文案风格选择") # 风格选择 selected_style = st.selectbox( "选择文案风格", ['creative', 'social', 'professional', 'marketing', 'emotional', 'simple'], format_func=lambda x: { 'creative': '创意文艺', 'social': '社交媒体', 'professional': '专业正式', 'marketing': '营销推广', 'emotional': '情感表达', 'simple': '简单描述' }[x] ) # 长度选择 selected_length = st.selectbox( "选择文案长度", ['short', 'medium', 'long'], format_func=lambda x: { 'short': '简短精炼', 'medium': '适中长度', 'long': '详细描述' }[x] ) if st.button("重新生成文案", use_container_width=True): with st.spinner("正在重新生成文案..."): new_caption = generate_photo_caption(image_description, selected_style, selected_length) st.subheader("🆕 新生成文案") st.write(new_caption) st.success("新文案生成完成!") st.success("AI文案生成完成!") except Exception as e: st.error(f"AI文案生成失败: {str(e)}") # 显示图片预览 st.subheader("📷 照片预览") st.image(uploaded_file, caption="上传的照片", use_column_width=True) # 图片OCR页面 elif page == "🖼️ 图片OCR": st.header("🖼️ 图片文字识别 (OCR)") # OCR功能状态检查 try: import pytesseract # 测试Tesseract是否可用 pytesseract.get_tesseract_version() tesseract_available = True except: tesseract_available = False # AI OCR功能状态检查 try: from utils.aliyun_ocr import check_aliyun_config ai_available, ai_message = check_aliyun_config() except: ai_available = False ai_message = "阿里云OCR未配置" # 显示OCR状态 col_status1, col_status2 = st.columns(2) with col_status1: if tesseract_available: st.success("✅ Tesseract OCR可用") else: st.warning("⚠️ Tesseract OCR未安装") with col_status2: if ai_available: st.success("✅ AI大模型OCR可用") else: st.warning(f"⚠️ AI大模型OCR: {ai_message}") # OCR模式选择 ocr_mode = st.radio("选择OCR模式", ["传统OCR (Tesseract)", "AI大模型OCR (阿里云)"], disabled=not (tesseract_available or ai_available)) if not tesseract_available and not ai_available: st.info(""" **OCR功能配置说明:** **传统OCR (推荐免费):** 1. 下载Tesseract OCR: https://github.com/UB-Mannheim/tesseract/wiki 2. 安装到默认路径并添加到PATH **AI大模型OCR (高精度):** 1. 注册阿里云账号: https://www.aliyun.com 2. 开通OCR服务并获取AccessKey 3. 在.env文件中配置ALIYUN_ACCESS_KEY_ID和ALIYUN_ACCESS_KEY_SECRET """) uploaded_file = st.file_uploader("选择图片文件", type=['jpg', 'jpeg', 'png', 'gif', 'bmp']) if uploaded_file is not None: file_path = save_uploaded_file(uploaded_file, 'image') # 根据选择的模式启用/禁用按钮 use_ai = ocr_mode == "AI大模型OCR (阿里云)" button_disabled = (use_ai and not ai_available) or (not use_ai and not tesseract_available) col1, col2, col3 = st.columns(3) with col1: if st.button("识别文字", use_container_width=True, disabled=button_disabled): with st.spinner("正在识别文字..."): try: if use_ai: text = extract_text_from_image(file_path, use_ai=True, ai_provider='aliyun') else: text = extract_text_from_image(file_path) st.subheader("识别的文字内容") st.text_area("文字内容", text, height=300) st.success("文字识别完成!") except Exception as e: st.error(f"识别失败: {str(e)}") with col2: if st.button("导出为Excel", use_container_width=True, disabled=button_disabled): with st.spinner("正在转换为Excel..."): try: output_path = file_path.rsplit('.', 1)[0] + '_converted.xlsx' if use_ai: # 使用AI OCR导出到Excel from utils.ocr_processor import extract_text_with_ai text = extract_text_with_ai(file_path, 'aliyun', 'general') import pandas as pd lines = [line.strip() for line in text.split('\n') if line.strip()] df = pd.DataFrame({ '行号': range(1, len(lines) + 1), '内容': lines }) df.to_excel(output_path, index=False) else: image_to_excel(file_path, output_path) with open(output_path, "rb") as file: st.download_button( label="下载Excel文件", data=file, file_name=Path(output_path).name, mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) st.success("图片转换完成!") except Exception as e: st.error(f"转换失败: {str(e)}") with col3: if st.button("导出为文本", use_container_width=True, disabled=button_disabled): with st.spinner("正在转换为文本..."): try: output_path = file_path.rsplit('.', 1)[0] + '_converted.txt' if use_ai: # 使用AI OCR导出到文本 from utils.ocr_processor import extract_text_with_ai text = extract_text_with_ai(file_path, 'aliyun', 'general') with open(output_path, 'w', encoding='utf-8') as f: f.write(text) else: image_to_text_file(file_path, output_path) with open(output_path, "rb") as file: st.download_button( label="下载文本文件", data=file, file_name=Path(output_path).name, mime="text/plain" ) st.success("图片转换完成!") except Exception as e: st.error(f"转换失败: {str(e)}") # 显示图片预览 st.subheader("图片预览") st.image(uploaded_file, caption="上传的图片", use_column_width=True) # 显示OCR模式信息 st.info(f"当前使用: {ocr_mode}") # 格式转换页面 elif page == "🔄 格式转换": st.header("🔄 文件格式转换") uploaded_file = st.file_uploader("选择文件", type=['xlsx', 'xls', 'csv', 'json']) if uploaded_file is not None: file_path = save_uploaded_file(uploaded_file, 'format') file_ext = Path(uploaded_file.name).suffix.lower() # 根据文件类型显示可转换的格式 if file_ext in ['.xlsx', '.xls']: target_format = st.selectbox("转换为", ["CSV", "JSON"]) elif file_ext == '.csv': target_format = st.selectbox("转换为", ["Excel", "JSON"]) elif file_ext == '.json': target_format = st.selectbox("转换为", ["Excel", "CSV"]) if st.button("开始转换", use_container_width=True): with st.spinner("正在转换格式..."): try: if file_ext in ['.xlsx', '.xls'] and target_format == "CSV": output_path = file_path.replace(file_ext, '.csv') excel_to_csv(file_path, output_path) mime_type = "text/csv" elif file_ext in ['.xlsx', '.xls'] and target_format == "JSON": output_path = file_path.replace(file_ext, '.json') excel_to_json(file_path, output_path) mime_type = "application/json" elif file_ext == '.csv' and target_format == "Excel": output_path = file_path.replace('.csv', '.xlsx') csv_to_excel(file_path, output_path) mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" elif file_ext == '.csv' and target_format == "JSON": output_path = file_path.replace('.csv', '.json') csv_to_json(file_path, output_path) mime_type = "application/json" elif file_ext == '.json' and target_format == "Excel": output_path = file_path.replace('.json', '.xlsx') json_to_excel(file_path, output_path) mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" elif file_ext == '.json' and target_format == "CSV": output_path = file_path.replace('.json', '.csv') json_to_csv(file_path, output_path) mime_type = "text/csv" with open(output_path, "rb") as file: st.download_button( label=f"下载{target_format}文件", data=file, file_name=Path(output_path).name, mime=mime_type ) st.success("格式转换完成!") except Exception as e: st.error(f"转换失败: {str(e)}") # 网页抓取页面 elif page == "🌐 网页抓取": st.header("🌐 网页数据抓取") url = st.text_input("网页URL", placeholder="https://example.com") selector = st.text_input("CSS选择器 (可选)", placeholder="例如: .content, #main, p") col1, col2 = st.columns(2) with col1: if st.button("抓取内容", use_container_width=True): if not url: st.error("请输入网页URL") else: with st.spinner("正在抓取网页内容..."): try: content = scrape_webpage(url, selector if selector else None) st.subheader("抓取的内容") st.text_area("网页内容", content, height=300) st.success("网页抓取完成!") except Exception as e: st.error(f"抓取失败: {str(e)}") with col2: if st.button("导出为Excel", use_container_width=True): if not url: st.error("请输入网页URL") else: with st.spinner("正在导出为Excel..."): try: output_filename = f"web_content_{uuid.uuid4().hex[:8]}.xlsx" output_path = os.path.join(tempfile.gettempdir(), output_filename) web_to_excel(url, output_path, selector if selector else None) with open(output_path, "rb") as file: st.download_button( label="下载Excel文件", data=file, file_name=output_filename, mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) st.success("网页导出完成!") except Exception as e: st.error(f"导出失败: {str(e)}") # 数据库导出页面 elif page == "🗄️ 数据库导出": st.header("🗄️ 数据库导出") uploaded_file = st.file_uploader("选择数据库文件", type=['db', 'sqlite', 'mdf']) table_name = st.text_input("表名 (可选)", placeholder="留空则导出所有表") if uploaded_file is not None: file_path = save_uploaded_file(uploaded_file, 'database') target_format = st.selectbox("导出为", ["Excel", "CSV", "JSON"]) if st.button("开始导出", use_container_width=True): with st.spinner("正在导出数据库..."): try: file_ext = Path(file_path).suffix.lower() continue_processing = True # 默认继续处理 if file_ext in ['.db', '.sqlite']: if target_format == "Excel": output_path = file_path.replace(file_ext, '_exported.xlsx') export_sqlite_to_excel(file_path, output_path, table_name if table_name else None) mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" elif target_format == "CSV": output_path = file_path.replace(file_ext, '_exported.csv') database_to_csv(file_path, output_path, table_name if table_name else None) mime_type = "text/csv" elif target_format == "JSON": output_path = file_path.replace(file_ext, '_exported.json') database_to_json(file_path, output_path, table_name if table_name else None) mime_type = "application/json" elif file_ext == '.mdf': # MDF文件处理 try: import pyodbc # 测试SQL Server连接 test_conn = pyodbc.connect("DRIVER={SQL Server};SERVER=localhost;Trusted_Connection=yes;timeout=3") test_conn.close() sql_server_available = True except: sql_server_available = False st.warning("⚠️ SQL Server未运行或无法连接") st.info(""" **MDF文件导出需要SQL Server支持:** 1. **安装SQL Server Express** (免费) - 下载: https://www.microsoft.com/en-us/sql-server/sql-server-downloads 2. **确保SQL Server服务运行** - 打开"服务"管理器 (services.msc) - 启动"SQL Server (MSSQLSERVER)"服务 3. **配置连接权限** - 使用Windows身份验证或配置sa密码 安装完成后重启应用即可使用MDF导出功能。 """) # 不执行后续操作 if sql_server_available: if target_format == "Excel": output_path = file_path.replace(file_ext, '_exported.xlsx') from utils.database_exporter import export_mssql_mdf_to_excel export_mssql_mdf_to_excel(file_path, output_path, table_name if table_name else None) mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" elif target_format == "CSV": output_path = file_path.replace(file_ext, '_exported.csv') database_to_csv(file_path, output_path, table_name if table_name else None) mime_type = "text/csv" elif target_format == "JSON": output_path = file_path.replace(file_ext, '_exported.json') database_to_json(file_path, output_path, table_name if table_name else None) mime_type = "application/json" else: st.error("不支持的数据库格式") # 不执行后续操作 continue_processing = False # 只有在成功处理时才执行下载操作 if continue_processing and 'output_path' in locals() and os.path.exists(output_path): with open(output_path, "rb") as file: st.download_button( label=f"下载{target_format}文件", data=file, file_name=Path(output_path).name, mime=mime_type ) st.success("数据库导出完成!") elif not continue_processing: # 不支持的格式,不显示下载按钮 pass else: st.error("导出文件创建失败") except Exception as e: st.error(f"导出失败: {str(e)}") # 页脚信息 st.sidebar.markdown("---") st.sidebar.markdown(""" ### 使用说明 1. 选择功能模块 2. 上传文件或输入URL 3. 点击相应按钮处理 4. 下载处理结果 ### 支持格式 - **PDF**: .pdf - **图片**: .jpg, .jpeg, .png, .gif, .bmp - **数据文件**: .xlsx, .xls, .csv, .json - **数据库**: .db, .sqlite, .mdf """)