from flask import Flask, render_template, request, jsonify, send_file, redirect, url_for import os import uuid from werkzeug.utils import secure_filename from config import Config # 导入工具模块 from utils.pdf_extractor import extract_text_from_pdf, pdf_to_excel from utils.ocr_processor import extract_text_from_image, image_to_excel, image_to_text_file from utils.format_converter import ( excel_to_csv, csv_to_excel, json_to_excel, excel_to_json, csv_to_json, json_to_csv ) from utils.web_scraper import scrape_webpage, web_to_excel from utils.database_exporter import export_sqlite_to_excel, database_to_csv, database_to_json app = Flask(__name__) app.config.from_object(Config) # 确保上传目录存在 os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) def allowed_file(filename): """检查文件类型是否允许""" return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] @app.route('/') def index(): """首页""" return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): """文件上传处理""" if 'file' not in request.files: return jsonify({'error': '没有选择文件'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': '没有选择文件'}), 400 if file and allowed_file(file.filename): filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{uuid.uuid4()}_{filename}") file.save(filepath) return jsonify({ 'success': True, 'filename': filename, 'filepath': filepath, 'file_type': filename.rsplit('.', 1)[1].lower() }) return jsonify({'error': '不支持的文件类型'}), 400 @app.route('/process/pdf', methods=['POST']) def process_pdf(): """处理PDF文件""" try: data = request.json filepath = data.get('filepath') action = data.get('action', 'extract') # extract, to_excel if not filepath or not os.path.exists(filepath): return jsonify({'error': '文件不存在'}), 400 if action == 'extract': text = extract_text_from_pdf(filepath) return jsonify({'success': True, 'text': text}) elif action == 'to_excel': output_path = filepath.replace('.pdf', '_converted.xlsx') pdf_to_excel(filepath, output_path) return jsonify({ 'success': True, 'download_url': f'/download/{os.path.basename(output_path)}' }) else: return jsonify({'error': '不支持的操作'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/process/image', methods=['POST']) def process_image(): """处理图片文件""" try: data = request.json filepath = data.get('filepath') action = data.get('action', 'extract') # extract, to_excel, to_text if not filepath or not os.path.exists(filepath): return jsonify({'error': '文件不存在'}), 400 if action == 'extract': text = extract_text_from_image(filepath) return jsonify({'success': True, 'text': text}) elif action == 'to_excel': output_path = filepath.rsplit('.', 1)[0] + '_converted.xlsx' image_to_excel(filepath, output_path) return jsonify({ 'success': True, 'download_url': f'/download/{os.path.basename(output_path)}' }) elif action == 'to_text': output_path = filepath.rsplit('.', 1)[0] + '_converted.txt' image_to_text_file(filepath, output_path) return jsonify({ 'success': True, 'download_url': f'/download/{os.path.basename(output_path)}' }) else: return jsonify({'error': '不支持的操作'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/process/format', methods=['POST']) def process_format(): """处理格式转换""" try: data = request.json filepath = data.get('filepath') target_format = data.get('target_format') # excel, csv, json if not filepath or not os.path.exists(filepath): return jsonify({'error': '文件不存在'}), 400 file_ext = filepath.rsplit('.', 1)[1].lower() # 根据源格式和目标格式选择转换函数 if file_ext == 'xlsx' and target_format == 'csv': output_path = filepath.replace('.xlsx', '.csv') excel_to_csv(filepath, output_path) elif file_ext == 'csv' and target_format == 'excel': output_path = filepath.replace('.csv', '.xlsx') csv_to_excel(filepath, output_path) elif file_ext == 'json' and target_format == 'excel': output_path = filepath.replace('.json', '.xlsx') json_to_excel(filepath, output_path) elif file_ext == 'xlsx' and target_format == 'json': output_path = filepath.replace('.xlsx', '.json') excel_to_json(filepath, output_path) elif file_ext == 'csv' and target_format == 'json': output_path = filepath.replace('.csv', '.json') csv_to_json(filepath, output_path) elif file_ext == 'json' and target_format == 'csv': output_path = filepath.replace('.json', '.csv') json_to_csv(filepath, output_path) else: return jsonify({'error': '不支持的格式转换'}), 400 return jsonify({ 'success': True, 'download_url': f'/download/{os.path.basename(output_path)}' }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/process/web', methods=['POST']) def process_web(): """处理网页抓取""" try: data = request.json url = data.get('url') selector = data.get('selector', '') if not url: return jsonify({'error': '请输入URL'}), 400 # 抓取网页内容 content = scrape_webpage(url, selector if selector else None) # 导出为Excel output_filename = f"web_content_{uuid.uuid4().hex[:8]}.xlsx" output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename) web_to_excel(url, output_path, selector) return jsonify({ 'success': True, 'content': content if isinstance(content, str) else '内容已提取', 'download_url': f'/download/{output_filename}' }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/process/database', methods=['POST']) def process_database(): """处理数据库导出""" try: data = request.json filepath = data.get('filepath') target_format = data.get('target_format', 'excel') # excel, csv, json table_name = data.get('table_name', '') # 可选:指定表名 if not filepath or not os.path.exists(filepath): return jsonify({'error': '文件不存在'}), 400 file_ext = filepath.rsplit('.', 1)[1].lower() if file_ext in ['db', 'sqlite']: if target_format == 'excel': output_path = filepath.replace(f'.{file_ext}', '_exported.xlsx') export_sqlite_to_excel(filepath, output_path, table_name) elif target_format == 'csv': output_path = filepath.replace(f'.{file_ext}', '_exported.csv') database_to_csv(filepath, output_path, table_name) elif target_format == 'json': output_path = filepath.replace(f'.{file_ext}', '_exported.json') database_to_json(filepath, output_path, table_name) else: return jsonify({'error': '不支持的导出格式'}), 400 else: return jsonify({'error': '不支持的数据库格式'}), 400 return jsonify({ 'success': True, 'download_url': f'/download/{os.path.basename(output_path)}' }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/download/') def download_file(filename): """文件下载""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(filepath): return send_file(filepath, as_attachment=True) return jsonify({'error': '文件不存在'}), 404 if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)