SPC/app.py

335 lines
11 KiB
Python
Raw Normal View History

import os
import sqlite3
import json
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename
import uuid
from datetime import datetime
from dotenv import load_dotenv
from openai import OpenAI
load_dotenv()
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
app.config['DATABASE'] = 'knowledge_base.db'
DEEPSEEK_API_KEY = os.getenv('DEEPSEEK_API_KEY')
DEEPSEEK_BASE_URL = os.getenv('DEEPSEEK_BASE_URL', 'https://api.deepseek.com')
client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL
)
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx'}
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
documents = {}
def init_db():
conn = sqlite3.connect(app.config['DATABASE'])
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
question TEXT NOT NULL,
answer TEXT NOT NULL,
sources TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
chunks INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def get_db_connection():
conn = sqlite3.connect(app.config['DATABASE'])
conn.row_factory = sqlite3.Row
return conn
def load_documents_from_db():
conn = get_db_connection()
docs = conn.execute('SELECT * FROM documents ORDER BY created_at DESC').fetchall()
conn.close()
global documents
documents = {doc['id']: dict(doc) for doc in docs}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def read_document_content(doc_id):
try:
for file in os.listdir(app.config['UPLOAD_FOLDER']):
if file.startswith(doc_id):
filepath = os.path.join(app.config['UPLOAD_FOLDER'], file)
# 根据文件扩展名判断类型
if file.lower().endswith('.txt'):
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
elif file.lower().endswith('.pdf'):
import pypdf
with open(filepath, 'rb') as f:
reader = pypdf.PdfReader(f)
text = ''
for page in reader.pages:
text += page.extract_text() + '\n'
return text
elif file.lower().endswith('.docx'):
from docx import Document
doc = Document(filepath)
text = ''
for paragraph in doc.paragraphs:
text += paragraph.text + '\n'
return text
# 如果没有扩展名,尝试按顺序尝试不同格式
else:
# 先尝试作为 docx 文件
try:
from docx import Document
doc = Document(filepath)
text = ''
for paragraph in doc.paragraphs:
text += paragraph.text + '\n'
if text.strip():
return text
except:
pass
# 再尝试作为 txt 文件
try:
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
if text.strip():
return text
except:
pass
# 最后尝试作为 pdf 文件
try:
import pypdf
with open(filepath, 'rb') as f:
reader = pypdf.PdfReader(f)
text = ''
for page in reader.pages:
text += page.extract_text() + '\n'
if text.strip():
return text
except:
pass
return None
except Exception as e:
print(f"Error reading document: {e}")
import traceback
traceback.print_exc()
return None
@app.route('/')
def index():
load_documents_from_db()
return render_template('index.html')
@app.route('/api/upload', methods=['POST'])
def upload_document():
try:
if 'file' not in request.files:
return jsonify({'error': '没有文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
if file and allowed_file(file.filename):
doc_id = str(uuid.uuid4())
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{doc_id}_{filename}")
file.save(filepath)
conn = get_db_connection()
conn.execute(
'INSERT INTO documents (id, name, status, chunks) VALUES (?, ?, ?, ?)',
(doc_id, filename, 'completed', 1)
)
conn.commit()
conn.close()
load_documents_from_db()
return jsonify({
'id': doc_id,
'name': filename,
'status': 'completed'
})
return jsonify({'error': '不支持的文件格式'}), 400
except Exception as e:
return jsonify({'error': f'上传失败:{str(e)}'}), 500
@app.route('/api/ask', methods=['POST'])
def ask_question():
try:
data = request.json
question = data.get('question', '')
if not question or not question.strip():
return jsonify({'error': '请输入问题'}), 400
if len(question) > 1000:
return jsonify({'error': '问题长度不能超过1000字'}), 400
load_documents_from_db()
if not documents:
return jsonify({'error': '请先上传文档'}), 400
context_parts = []
sources = []
for doc_id, doc_info in documents.items():
if doc_info['status'] == 'completed':
content = read_document_content(doc_id)
if content:
context_parts.append(f"文档:{doc_info['name']}\n内容:{content[:3000]}")
sources.append({
'doc_id': doc_id,
'name': doc_info['name'],
'page': 1
})
if not context_parts:
return jsonify({'error': '没有可用的文档内容'}), 400
context = '\n\n'.join(context_parts)
system_prompt = """你是一个智能知识库问答助手。请基于提供的文档内容回答用户的问题。
要求
1. 只使用文档中的信息回答问题
2. 如果文档中没有相关信息请明确说明
3. 回答要准确简洁有条理
4. 使用中文回答"""
user_prompt = f"""文档内容:
{context}
用户问题{question}
请基于以上文档内容回答用户的问题"""
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.7,
max_tokens=2000
)
answer = response.choices[0].message.content
result = {
'question': question,
'answer': answer,
'sources': sources
}
conn = get_db_connection()
conn.execute(
'INSERT INTO conversations (question, answer, sources) VALUES (?, ?, ?)',
(question, result['answer'], json.dumps(result['sources']))
)
conn.commit()
conn.close()
return jsonify(result)
except Exception as api_error:
print(f"DeepSeek API Error: {api_error}")
return jsonify({'error': f'AI服务暂时不可用{str(api_error)}'}), 500
except Exception as e:
return jsonify({'error': f'回答问题时出错:{str(e)}'}), 500
@app.route('/api/documents', methods=['GET'])
def get_documents():
try:
load_documents_from_db()
return jsonify(list(documents.values()))
except Exception as e:
return jsonify({'error': f'获取文档列表失败:{str(e)}'}), 500
@app.route('/api/documents/<doc_id>', methods=['DELETE'])
def delete_document(doc_id):
try:
conn = get_db_connection()
cursor = conn.execute('DELETE FROM documents WHERE id = ?', (doc_id,))
if cursor.rowcount == 0:
conn.close()
return jsonify({'error': '文档不存在'}), 404
conn.commit()
conn.close()
load_documents_from_db()
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': f'删除文档失败:{str(e)}'}), 500
@app.route('/api/conversations', methods=['GET'])
def get_conversations():
try:
conn = get_db_connection()
conversations = conn.execute(
'SELECT * FROM conversations ORDER BY created_at DESC LIMIT 50'
).fetchall()
conn.close()
result = []
for conv in conversations:
conv_dict = dict(conv)
conv_dict['sources'] = json.loads(conv_dict['sources']) if conv_dict['sources'] else []
result.append(conv_dict)
return jsonify(result)
except Exception as e:
return jsonify({'error': f'获取对话历史失败:{str(e)}'}), 500
@app.route('/api/conversations', methods=['DELETE'])
def clear_conversations():
try:
conn = get_db_connection()
conn.execute('DELETE FROM conversations')
conn.commit()
conn.close()
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': f'清除对话历史失败:{str(e)}'}), 500
if __name__ == '__main__':
init_db()
load_documents_from_db()
app.run(debug=True, port=5000)