G3/src/deepseek_agent_optimized.py

493 lines
18 KiB
Python
Raw Normal View History

2026-01-16 05:58:32 +08:00
"""优化版 DeepSeek API 驱动的智能情感分析 Agent"""
import asyncio
import json
import re
from typing import Optional, Dict, Any
from functools import lru_cache
import httpx
from pydantic import BaseModel, Field
from src.config import Config
class APIError(Exception):
"""API 错误异常类"""
def __init__(self, message: str, status_code: Optional[int] = None):
self.message = message
self.status_code = status_code
super().__init__(self.message)
class DeepSeekClient:
"""优化版 DeepSeek API 客户端"""
def __init__(self):
api_key = Config.get_api_key()
if not api_key:
raise ValueError("DeepSeek API Key 未配置")
self.api_key = api_key
self.base_url = Config.get_base_url()
self.model = Config.DEEPSEEK_MODEL
async def chat_completion_with_retry(
self,
messages: list,
temperature: float = 0.3,
max_tokens: int = 500
) -> str:
"""带重试机制的 API 调用"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": False
}
last_error = None
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
async with httpx.AsyncClient(timeout=Config.REQUEST_TIMEOUT) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=data
)
if response.status_code == 200:
result = response.json()
return result["choices"][0]["message"]["content"]
elif response.status_code == 401:
raise APIError("API 密钥无效", response.status_code)
elif response.status_code == 429:
# 限流,等待后重试
wait_time = 2 ** attempt # 指数退避
await asyncio.sleep(wait_time)
continue
else:
raise APIError(f"API 调用失败: {response.status_code}", response.status_code)
except (httpx.ConnectError, httpx.TimeoutException) as e:
last_error = e
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
await asyncio.sleep(1) # 等待1秒后重试
continue
else:
raise APIError(f"网络连接失败: {str(e)}")
except Exception as e:
last_error = e
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
await asyncio.sleep(1)
continue
else:
raise APIError(f"API 调用异常: {str(e)}")
raise last_error or APIError("未知错误")
class SentimentAnalysisResult(BaseModel):
"""情感分析结果"""
sentiment: str = Field(description="情感类别: negative/neutral/positive")
confidence: float = Field(description="置信度 (0-1)")
reasoning: str = Field(description="情感判断的推理过程")
key_factors: list[str] = Field(description="影响情感判断的关键因素")
intensity: str = Field(description="情感强度: mild/moderate/strong")
class DisposalPlan(BaseModel):
"""处置方案"""
priority: str = Field(description="处理优先级: high/medium/low")
action_type: str = Field(description="行动类型: response/investigate/monitor/ignore")
suggested_response: Optional[str] = Field(description="建议回复内容", default=None)
follow_up_actions: list[str] = Field(description="后续行动建议")
reasoning: str = Field(description="处置方案制定的理由")
urgency_level: str = Field(description="紧急程度: immediate/soon/normal")
class TweetAnalysisResult(BaseModel):
"""推文分析完整结果"""
tweet_text: str = Field(description="原始推文文本")
airline: str = Field(description="航空公司")
sentiment_analysis: SentimentAnalysisResult = Field(description="情感分析结果")
disposal_plan: DisposalPlan = Field(description="处置方案")
processing_time: float = Field(description="处理耗时(秒)")
api_used: bool = Field(description="是否使用了 API")
class ResponseParser:
"""API 响应解析器"""
@staticmethod
def parse_sentiment_response(response: str) -> Dict[str, Any]:
"""解析情感分析响应"""
# 使用正则表达式进行更精确的解析
patterns = {
"sentiment": r"情感类别[:]\s*(negative|neutral|positive)",
"confidence": r"置信度[:]\s*([0-9]*\.?[0-9]+)",
"intensity": r"情感强度[:]\s*(mild|moderate|strong)",
}
result = {}
for key, pattern in patterns.items():
match = re.search(pattern, response, re.IGNORECASE)
if match:
result[key] = match.group(1).lower() if key != "confidence" else float(match.group(1))
# 解析关键因素
factors_match = re.search(r"关键因素[:]([^\n]*)(?:\n|$)", response)
if factors_match:
factors_text = factors_match.group(1).strip()
result["key_factors"] = [f.strip() for f in factors_text.split(",") if f.strip()]
else:
result["key_factors"] = []
# 提取推理过程
reasoning_match = re.search(r"推理过程[:]([^\n]*)(?:\n|$)", response)
if reasoning_match:
result["reasoning"] = reasoning_match.group(1).strip()
else:
# 如果找不到,使用默认推理
result["reasoning"] = "基于推文内容和航空行业特点进行综合分析"
return result
@staticmethod
def parse_disposal_response(response: str) -> Dict[str, Any]:
"""解析处置方案响应"""
patterns = {
"priority": r"优先级[:]\s*(high|medium|low)",
"action_type": r"行动类型[:]\s*(response|investigate|monitor|ignore)",
"urgency_level": r"紧急程度[:]\s*(immediate|soon|normal)",
}
result = {}
for key, pattern in patterns.items():
match = re.search(pattern, response, re.IGNORECASE)
if match:
result[key] = match.group(1).lower()
# 解析建议回复
response_match = re.search(r"建议回复[:]([^\n]*)(?:\n|$)", response)
if response_match:
result["suggested_response"] = response_match.group(1).strip()
# 解析后续行动
actions_match = re.search(r"后续行动[:]([^\n]*)(?:\n|$)", response)
if actions_match:
actions_text = actions_match.group(1).strip()
result["follow_up_actions"] = [a.strip() for a in actions_text.split(",") if a.strip()]
else:
result["follow_up_actions"] = []
# 解析制定理由
reasoning_match = re.search(r"制定理由[:]([^\n]*)(?:\n|$)", response)
if reasoning_match:
result["reasoning"] = reasoning_match.group(1).strip()
else:
result["reasoning"] = "基于情感分析结果制定"
return result
class OptimizedDeepSeekTweetAgent:
"""优化版 DeepSeek 推文分析 Agent"""
def __init__(self):
self.client = DeepSeekClient()
self.parser = ResponseParser()
async def analyze_sentiment(self, text: str, airline: str) -> SentimentAnalysisResult:
"""优化版情感分析"""
prompt = f"""
你是一位专业的航空行业情感分析专家请分析以下推文的情感倾向
推文内容"{text}"
航空公司{airline}
请严格按照以下JSON格式输出分析结果
{{
"sentiment": "negative/neutral/positive",
"confidence": 0.0-1.0之间的数值,
"intensity": "mild/moderate/strong",
"key_factors": ["因素1", "因素2", "因素3"],
"reasoning": "详细的情感判断推理过程"
}}
分析要求
1. 情感判断要准确反映推文的真实情感
2. 置信度要基于推文的明确程度和情感强度
3. 关键因素要具体相关
4. 推理过程要详细有逻辑
请只输出JSON格式的结果不要有其他内容
"""
messages = [
{
"role": "system",
"content": "你是一位专业的航空行业情感分析专家,擅长准确识别推文中的情感倾向。"
},
{"role": "user", "content": prompt}
]
try:
response = await self.client.chat_completion_with_retry(messages, temperature=0.1)
# 清理响应文本,移除可能的标记和空白
cleaned_response = response.strip()
# 尝试解析JSON响应
try:
# 尝试提取JSON部分如果响应包含其他文本
json_match = re.search(r'\{[^}]+\}', cleaned_response)
if json_match:
json_text = json_match.group(0)
result_data = json.loads(json_text)
else:
result_data = json.loads(cleaned_response)
# 验证必需字段
required_fields = ["sentiment", "confidence", "intensity"]
for field in required_fields:
if field not in result_data:
raise ValueError(f"缺少必需字段: {field}")
return SentimentAnalysisResult(**result_data)
except (json.JSONDecodeError, ValueError) as json_error:
# JSON解析失败使用正则解析
print(f"JSON解析失败使用正则解析: {json_error}")
parsed_data = self.parser.parse_sentiment_response(response)
# 确保必需字段有默认值
default_values = {
"sentiment": "neutral",
"confidence": 0.5,
"intensity": "moderate"
}
for field, default_value in default_values.items():
if field not in parsed_data or not parsed_data[field]:
parsed_data[field] = default_value
return SentimentAnalysisResult(**parsed_data)
except APIError as e:
# API调用失败返回默认结果
print(f"API调用失败: {e.message}")
return SentimentAnalysisResult(
sentiment="neutral",
confidence=0.5,
intensity="moderate",
key_factors=["API调用失败使用默认分析"],
reasoning=f"API调用失败: {e.message}"
)
async def generate_disposal_plan(
self,
text: str,
airline: str,
sentiment_result: SentimentAnalysisResult
) -> DisposalPlan:
"""生成优化版处置方案"""
prompt = f"""
基于以下推文分析和情感判断结果为航空公司制定一个合理的处置方案
推文内容"{text}"
航空公司{airline}
情感分析结果
- 情感类别{sentiment_result.sentiment}
- 置信度{sentiment_result.confidence:.1%}
- 情感强度{sentiment_result.intensity}
- 关键因素{', '.join(sentiment_result.key_factors)}
请严格按照以下JSON格式输出处置方案
{{
"priority": "high/medium/low",
"action_type": "response/investigate/monitor/ignore",
"suggested_response": "具体的回复建议(如适用)",
"follow_up_actions": ["行动1", "行动2"],
"reasoning": "制定此方案的理由",
"urgency_level": "immediate/soon/normal"
}}
要求
1. 优先级要基于情感强度和置信度
2. 行动类型要符合航空行业最佳实践
3. 建议回复要专业有同理心
4. 后续行动要具体可执行
请只输出JSON格式的结果
"""
messages = [
{
"role": "system",
"content": "你是一位航空公司的客户服务专家,擅长制定合理的客户反馈处置方案。"
},
{"role": "user", "content": prompt}
]
try:
response = await self.client.chat_completion_with_retry(messages, temperature=0.3)
# 清理响应文本
cleaned_response = response.strip()
try:
# 尝试提取JSON部分
json_match = re.search(r'\{[^}]+\}', cleaned_response)
if json_match:
json_text = json_match.group(0)
result_data = json.loads(json_text)
else:
result_data = json.loads(cleaned_response)
# 验证必需字段
required_fields = ["priority", "action_type", "reasoning"]
for field in required_fields:
if field not in result_data:
raise ValueError(f"缺少必需字段: {field}")
return DisposalPlan(**result_data)
except (json.JSONDecodeError, ValueError) as json_error:
# JSON解析失败使用正则解析
print(f"处置方案JSON解析失败使用正则解析: {json_error}")
parsed_data = self.parser.parse_disposal_response(response)
# 确保必需字段有默认值
default_values = {
"priority": "medium",
"action_type": "monitor",
"reasoning": "基于情感分析结果制定",
"follow_up_actions": [],
"urgency_level": "normal"
}
for field, default_value in default_values.items():
if field not in parsed_data or not parsed_data[field]:
parsed_data[field] = default_value
return DisposalPlan(**parsed_data)
except APIError as e:
# API调用失败返回默认处置方案
print(f"处置方案API调用失败: {e.message}")
return self._generate_default_disposal_plan(sentiment_result)
def _generate_default_disposal_plan(self, sentiment_result: SentimentAnalysisResult) -> DisposalPlan:
"""生成默认处置方案"""
if sentiment_result.sentiment == "negative":
return DisposalPlan(
priority="medium",
action_type="investigate",
suggested_response=None,
follow_up_actions=["进一步核实情况", "根据核实结果决定行动"],
reasoning="负面情感需要进一步调查",
urgency_level="soon"
)
elif sentiment_result.sentiment == "positive":
return DisposalPlan(
priority="low",
action_type="monitor",
suggested_response=None,
follow_up_actions=["持续关注用户动态"],
reasoning="正面情感保持关注即可",
urgency_level="normal"
)
else:
return DisposalPlan(
priority="low",
action_type="monitor",
suggested_response=None,
follow_up_actions=["常规关注"],
reasoning="中性情感常规处理",
urgency_level="normal"
)
async def analyze_tweet(self, text: str, airline: str) -> TweetAnalysisResult:
"""完整的推文分析流程"""
import time
start_time = time.time()
# 1. 情感分析
sentiment_result = await self.analyze_sentiment(text, airline)
# 2. 生成处置方案
disposal_plan = await self.generate_disposal_plan(text, airline, sentiment_result)
# 3. 计算处理时间
processing_time = time.time() - start_time
# 返回完整结果
return TweetAnalysisResult(
tweet_text=text,
airline=airline,
sentiment_analysis=sentiment_result,
disposal_plan=disposal_plan,
processing_time=processing_time,
api_used=True
)
# 同步版本的包装函数
async def analyze_tweet_async(text: str, airline: str) -> TweetAnalysisResult:
"""异步版本的推文分析"""
agent = OptimizedDeepSeekTweetAgent()
return await agent.analyze_tweet(text, airline)
def analyze_tweet_sync(text: str, airline: str) -> TweetAnalysisResult:
"""同步版本的推文分析函数"""
return asyncio.run(analyze_tweet_async(text, airline))
# 终极版本 - 完全不需要航空公司参数
async def analyze_tweet_ultimate_async(text: str) -> TweetAnalysisResult:
"""终极版本异步推文分析 - 无需航空公司参数"""
agent = OptimizedDeepSeekTweetAgent()
# 自动检测航空公司或使用通用标识
airline = "通用航空公司"
# 简单的航空公司检测逻辑
airline_keywords = {
"united": "United Airlines",
"delta": "Delta Air Lines",
"american": "American Airlines",
"southwest": "Southwest Airlines",
"jetblue": "JetBlue Airways",
"air china": "中国国际航空",
"china eastern": "中国东方航空",
"china southern": "中国南方航空"
}
text_lower = text.lower()
for keyword, airline_name in airline_keywords.items():
if keyword in text_lower:
airline = airline_name
break
return await agent.analyze_tweet(text, airline)
def analyze_tweet_sync_ultimate(text: str) -> TweetAnalysisResult:
"""终极版本同步推文分析 - 完全无需航空公司参数"""
return asyncio.run(analyze_tweet_ultimate_async(text))