feat: 添加航空公司情感分析与智能客服系统初始代码

- 实现数据预处理模块(data.py)和模型训练模块(train.py)
- 添加智能客服Agent应用(agent_app.py)和DNA解码系统(dna_decoder.py)
- 包含补偿推荐系统(compensation_recommender.py)和可视化支持
- 添加项目配置文件(pyproject.toml)和README文档
- 提供多种启动脚本(start_app.*, fix_path_and_run.bat等)
This commit is contained in:
Your Name 2026-01-13 00:43:15 +08:00
commit bd5d8d108c
24 changed files with 23451 additions and 0 deletions

1
CourseDesign Submodule

@ -0,0 +1 @@
Subproject commit dda07904bba2be7157b1edefd4d0f4a2d49e3fd9

6
bigwork/.env.example Normal file
View File

@ -0,0 +1,6 @@
# DeepSeek API Key
DEEPSEEK_API_KEY=your-key-here
# 项目配置
DATA_FILE_PATH=data/Tweets.csv
MODEL_SAVE_PATH=models/sentiment_model.pkl

10
bigwork/.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
__pycache__/
*.py[cod]
*$py.class
.venv/
.env
.DS_Store
models/*.pkl
.pytest_cache/
*.log
.ipynb_checkpoints/

1
bigwork/.python-version Normal file
View File

@ -0,0 +1 @@
3.12.5

55
bigwork/README.md Normal file
View File

@ -0,0 +1,55 @@
# 航空公司情感分析与智能客服优先级系统
基于社交媒体数据的航空公司服务质量监控与客户服务优先级排序系统。
## 项目特色
- 🎯 **情感分析**:对航空公司推文进行情感分类
- 🤖 **智能Agent**:生成结构化客服处置方案
- 📊 **优先级排序**:自动识别紧急服务请求
- 📈 **质量监控**:实时监测服务质量波动
## 快速开始
### 1. 环境配置
```bash
# 安装依赖
uv sync
# 配置API密钥
cp .env.example .env
# 编辑.env文件填入DeepSeek API Key
```
### 2. 运行应用
```bash
# 训练模型
uv run python src/train.py
# 启动可视化界面
uv run streamlit run src/streamlit_app.py
# 运行Agent应用
uv run python src/agent_app.py
```
## 项目结构
```
├── src/ # 源代码
│ ├── data.py # 数据处理模块
│ ├── train.py # 模型训练
│ ├── streamlit_app.py # 可视化界面
│ └── agent_app.py # Agent应用
├── data/ # 数据文件
├── models/ # 保存的模型
└── tests/ # 测试文件
```
## 技术栈
- **机器学习**scikit-learn, LightGBM
- **数据处理**polars, pandas, pandera
- **可视化**streamlit, seaborn
- **Agent框架**pydantic-ai
- **API**DeepSeek LLM

33
bigwork/auto_fix_path.ps1 Normal file
View File

@ -0,0 +1,33 @@
# PowerShell 自动修复 PATH 脚本
# 将此文件内容复制到 PowerShell 配置文件中
function Fix-Path {
Write-Host "🔧 自动修复 PATH 环境变量..." -ForegroundColor Yellow
# 检查并修复 Python Scripts 路径
$pythonScriptsPath = "C:\Users\马艺洁\AppData\Local\Programs\Python\Python312\Scripts"
if (-not ($env:PATH -like "*$pythonScriptsPath*")) {
Write-Host "✅ 添加正确的 Python Scripts 路径到 PATH" -ForegroundColor Green
$env:PATH = "$pythonScriptsPath;" + $env:PATH
}
# 修复其他格式错误
if ($env:PATH -like "*C:;Users*") {
Write-Host "✅ 修复 C:;Users 格式错误" -ForegroundColor Green
$env:PATH = $env:PATH.Replace('C:;Users', 'C:\Users')
}
if ($env:PATH -like "*;%S;stemRoot%*") {
Write-Host "✅ 修复 ;%S;stemRoot% 格式错误" -ForegroundColor Green
$env:PATH = $env:PATH.Replace(';%S;stemRoot%', ';%SystemRoot%')
}
Write-Host "✅ PATH 修复完成!现在可以直接使用 'uv' 命令" -ForegroundColor Green
}
# 自动执行修复
Fix-Path
# 创建快捷命令
alias uv-fix Fix-Path

14873
bigwork/data/Tweets.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,21 @@
@echo off
echo ========================================
echo 自动修复 PATH 并启动应用
echo ========================================
echo.
echo [1/3] 检查并修复 PATH 环境变量...
set PATH=%PATH:C:;Users=C:\Users%
set PATH=%PATH:;%S;stemRoot%=;%SystemRoot%%
echo [2/3] 验证 UV 命令是否可用...
uv --version >nul 2>&1
if %errorlevel% neq 0 (
echo 错误: UV 命令不可用,尝试使用 python -m uv...
python -m uv run streamlit run src/streamlit_app.py
) else (
echo [3/3] 使用 UV 启动 Streamlit 应用...
uv run streamlit run src/streamlit_app.py
)
pause

51
bigwork/pyproject.toml Normal file
View File

@ -0,0 +1,51 @@
[project]
name = "airline-sentiment-analysis"
version = "1.0.0"
description = "航空公司情感分析与智能客服优先级系统"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pydantic>=2.10",
"pandera>=0.21",
"pydantic-ai>=0.7",
"polars>=1.0",
"pandas>=2.2",
"scikit-learn>=1.5",
"xgboost>=3.1",
"seaborn>=0.13",
"joblib>=1.4",
"python-dotenv>=1.0",
"streamlit>=1.40",
"plotly>=5.0",
"networkx>=3.0",
"wordcloud>=1.9",
"jieba>=0.42",
]
[[tool.uv.index]]
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true
[dependency-groups]
dev = [
"pytest>=8.0",
"pytest-asyncio>=1.3",
"ruff>=0.8",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src"]
[tool.ruff]
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I"]
[tool.pytest.ini_options]
testpaths = ["tests"]

36
bigwork/quick_start.ps1 Normal file
View File

@ -0,0 +1,36 @@
# 快速启动脚本 - 智能修复 PATH 并启动应用
Write-Host "🚀 快速启动脚本" -ForegroundColor Cyan
Write-Host "========================================" -ForegroundColor Cyan
# 智能修复 PATH
if ($env:PATH -like "*C:;Users*" -or $env:PATH -like "*;%S;stemRoot%*") {
Write-Host "🔧 检测到 PATH 格式错误,正在修复..." -ForegroundColor Yellow
$env:PATH = $env:PATH.Replace('C:;Users', 'C:\Users').Replace(';%S;stemRoot%', ';%SystemRoot%')
Write-Host "✅ PATH 修复完成" -ForegroundColor Green
}
# 切换到项目目录
Set-Location "d:\HuaweiMoveData\Users\马艺洁\Desktop\MLwork\bigwork"
# 智能选择运行方式
Write-Host "🔍 检查 UV 命令可用性..." -ForegroundColor Cyan
$uvAvailable = $false
try {
$uvVersion = uv --version 2>$null
if ($LASTEXITCODE -eq 0) {
$uvAvailable = $true
Write-Host "✅ UV 命令可用: $uvVersion" -ForegroundColor Green
}
} catch {
Write-Host "⚠️ UV 命令不可用,将使用 python -m uv" -ForegroundColor Yellow
}
# 启动应用
if ($uvAvailable) {
Write-Host "🚀 使用 UV 启动 Streamlit 应用..." -ForegroundColor Green
uv run streamlit run src/streamlit_app.py
} else {
Write-Host "🚀 使用 python -m uv 启动 Streamlit 应用..." -ForegroundColor Green
python -m uv run streamlit run src/streamlit_app.py
}

240
bigwork/src/agent_app.py Normal file
View File

@ -0,0 +1,240 @@
"""航空公司情感分析智能Agent应用"""
import os
import sys
from pathlib import Path
from typing import List, Dict, Literal
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from dotenv import load_dotenv
import joblib
import pandas as pd
# 添加项目根目录到Python路径
sys.path.append(str(Path(__file__).parent.parent))
from src.data import DataProcessor
# 加载环境变量
load_dotenv()
class DisposalPlan(BaseModel):
"""客服处置方案结构化输出"""
问题分类: str = Field(description="问题类型,如'航班延误''行李问题''服务态度'")
严重程度: Literal["紧急", "重要", "常规"] = Field(description="问题紧急程度")
建议措施: List[str] = Field(description="具体的处理建议和行动项")
时间线: Dict[str, str] = Field(description="各阶段处理时间安排")
责任部门: str = Field(description="主要负责部门")
预期解决时间: str = Field(description="预计问题解决时间")
class SentimentAnalyzer:
"""情感分析器"""
def __init__(self):
self.model_path = Path("models/sentiment_model.pkl")
self.feature_names_path = Path("models/feature_names.pkl")
if self.model_path.exists():
self.model = joblib.load(self.model_path)
self.feature_names = joblib.load(self.feature_names_path)
else:
raise FileNotFoundError("模型文件不存在,请先运行训练脚本")
def is_chinese_text(self, text: str) -> bool:
"""检测是否为中文文本"""
chinese_chars = sum(1 for char in str(text) if '\u4e00' <= char <= '\u9fff')
return chinese_chars / max(len(str(text)), 1) > 0.3 # 30%以上为中文字符
def analyze_chinese_sentiment(self, text: str) -> Dict:
"""中文情感分析(基于关键词匹配)"""
text_lower = str(text).lower()
# 中文负面情感关键词
negative_keywords_cn = [
'难吃', '太难吃', '糟糕', '太糟糕', '', '很差', '非常差',
'失望', '很失望', '非常失望', '生气', '很生气', '非常生气',
'讨厌', '很讨厌', '非常讨厌', '恶心', '很恶心', '非常恶心',
'延误', '取消', '丢失', '损坏', '破损', '故障', '问题'
]
# 中文正面情感关键词
positive_keywords_cn = [
'好吃', '很好吃', '美味', '很美味', '', '很好', '非常好',
'满意', '很满意', '非常满意', '喜欢', '很喜欢', '非常喜欢',
'', '很棒', '非常棒', '优秀', '很优秀', '非常优秀',
'准时', '顺利', '舒适', '干净', '专业', '热情'
]
# 计算关键词匹配
negative_count = sum(1 for word in negative_keywords_cn if word in text_lower)
positive_count = sum(1 for word in positive_keywords_cn if word in text_lower)
# 情感判断逻辑
if negative_count > positive_count:
sentiment = 'negative'
confidence = min(0.95, 0.7 + negative_count * 0.1)
elif positive_count > negative_count:
sentiment = 'positive'
confidence = min(0.95, 0.7 + positive_count * 0.1)
else:
sentiment = 'neutral'
confidence = 0.8
return {
'sentiment': sentiment,
'confidence': confidence,
'probabilities': {
'negative': 0.33 if sentiment == 'negative' else 0.1,
'neutral': 0.33 if sentiment == 'neutral' else 0.1,
'positive': 0.33 if sentiment == 'positive' else 0.1
}
}
def predict_sentiment(self, text: str, airline: str = "unknown") -> Dict:
"""预测推文情感"""
# 如果是中文文本,使用中文情感分析
if self.is_chinese_text(text):
return self.analyze_chinese_sentiment(text)
# 英文文本使用原模型
processor = DataProcessor("data/Tweets.csv")
# 创建测试数据(包含所有必需列)
test_data = {
'text': [text],
'airline': [airline],
'airline_sentiment_confidence': [0.8], # 默认置信度
'negativereason': [None], # 添加negativereason列
'tweet_created': [pd.Timestamp.now()]
}
df_test = pd.DataFrame(test_data)
df_processed = processor.extract_features(df_test)
# 准备特征
feature_cols = (self.feature_names['numeric_features'] +
self.feature_names['categorical_features'] +
[self.feature_names['text_feature']])
X_test = df_processed[feature_cols]
# 预测
prediction = self.model.predict(X_test)[0]
probability = self.model.predict_proba(X_test)[0]
return {
'sentiment': prediction,
'confidence': max(probability),
'probabilities': dict(zip(self.model.classes_, probability))
}
# 创建AI智能客服Agent使用DeepSeek API
agent = Agent(
'deepseek:deepseek-chat',
system_prompt="""
你是一个航空公司客户服务专家专门处理社交媒体上的客户反馈
根据推文内容和情感分析结果生成结构化的客服处置方案
请根据以下信息生成处置方案
1. 推文内容反映的具体问题
2. 情感分析结果正面/中性/负面
3. 航空公司和问题上下文
4. 行业最佳实践
输出必须是结构化的JSON格式
""",
output_type=DisposalPlan
)
@agent.tool
async def analyze_tweet_sentiment(ctx, text: str, airline: str = "unknown") -> Dict:
"""分析推文情感"""
analyzer = SentimentAnalyzer()
return analyzer.predict_sentiment(text, airline)
@agent.tool
async def get_airline_info(ctx, airline: str) -> Dict:
"""获取航空公司信息"""
# 模拟航空公司信息
airline_info = {
"united": {"客服电话": "1-800-864-8331", "紧急联系人": "客服经理", "响应时间": "2小时内"},
"american": {"客服电话": "1-800-433-7300", "紧急联系人": "运营总监", "响应时间": "1小时内"},
"delta": {"客服电话": "1-800-221-1212", "紧急联系人": "客户关系部", "响应时间": "4小时内"},
"southwest": {"客服电话": "1-800-435-9792", "紧急联系人": "社交媒体团队", "响应时间": "6小时内"},
"us airways": {"客服电话": "1-800-428-4322", "紧急联系人": "服务质量部", "响应时间": "3小时内"}
}
return airline_info.get(airline.lower(), {
"客服电话": "待确认",
"紧急联系人": "客服主管",
"响应时间": "24小时内"
})
async def generate_disposal_plan(tweet_text: str, airline: str = "unknown") -> DisposalPlan:
"""生成客服处置方案"""
result = await agent.run(
f"""
请为以下推文生成客服处置方案
推文内容: "{tweet_text}"
航空公司: {airline}
请分析问题严重程度制定具体的处理措施和时间安排
"""
)
# pydantic-ai新版本直接返回结构化数据
return result
async def main():
"""主函数 - 演示Agent功能"""
print("=== 航空公司情感分析智能Agent ===\n")
# 测试用例
test_tweets = [
("@United my flight was delayed for 3 hours and no one gave us any information! This is terrible service!", "united"),
("Just had a wonderful flight with @Delta. The crew was amazing and everything was perfect!", "delta"),
("@AmericanAir lost my luggage and now I have no clothes for my business meeting. Very disappointed.", "american")
]
for i, (tweet, airline) in enumerate(test_tweets, 1):
print(f"\n--- 测试用例 {i} ---")
print(f"推文: {tweet}")
print(f"航空公司: {airline}")
try:
# 情感分析
analyzer = SentimentAnalyzer()
sentiment_result = analyzer.predict_sentiment(tweet, airline)
print(f"情感分析: {sentiment_result['sentiment']} (置信度: {sentiment_result['confidence']:.3f})")
# 生成处置方案
disposal_plan = await generate_disposal_plan(tweet, airline)
print("\n处置方案:")
print(f"问题分类: {disposal_plan.问题分类}")
print(f"严重程度: {disposal_plan.严重程度}")
print(f"责任部门: {disposal_plan.责任部门}")
print(f"预期解决时间: {disposal_plan.预期解决时间}")
print("建议措施:")
for j, measure in enumerate(disposal_plan.建议措施, 1):
print(f" {j}. {measure}")
print("时间线:")
for stage, action in disposal_plan.时间线.items():
print(f" {stage}: {action}")
except Exception as e:
print(f"处理错误: {e}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View File

@ -0,0 +1,380 @@
"""个性化服务恢复与补偿推荐系统"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
from pydantic import BaseModel, Field
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import random
from datetime import datetime, timedelta
class CompensationEffect(BaseModel):
"""补偿效果分析"""
补偿类型: str = Field(description="补偿类型")
问题类型: str = Field(description="问题类型")
情感强度: float = Field(description="投诉情感强度")
预期满意度提升: float = Field(description="预期满意度提升")
成本效益比: float = Field(description="成本效益比")
推荐指数: float = Field(description="推荐指数(0-1)")
历史成功率: float = Field(description="历史成功率")
class CompensationRecommendation(BaseModel):
"""补偿推荐结果"""
最优补偿方案: str = Field(description="最优补偿方案")
备选方案: List[str] = Field(description="备选补偿方案")
预期效果: CompensationEffect = Field(description="预期效果分析")
推荐理由: str = Field(description="推荐理由")
实施建议: str = Field(description="实施建议")
A_B测试建议: str = Field(description="A/B测试建议")
class ABTestResult(BaseModel):
"""A/B测试结果"""
测试组: str = Field(description="测试组")
对照组: str = Field(description="对照组")
满意度提升差异: float = Field(description="满意度提升差异")
统计显著性: float = Field(description="统计显著性(p值)")
推荐结论: str = Field(description="推荐结论")
class CompensationRecommender:
"""个性化服务恢复与补偿推荐系统"""
def __init__(self, data_path: str = "data/Tweets.csv"):
self.data_path = data_path
self.compensation_types = [
"里程积分补偿", "代金券补偿", "升舱服务", "优先登机",
"免费行李额度", "贵宾室体验", "快速安检", "延误保险理赔",
"餐饮补偿", "酒店住宿", "交通补贴", "下次旅行折扣"
]
self.problem_types = [
"航班延误", "行李丢失", "服务态度", "座位问题",
"餐饮质量", "登机流程", "取消航班", "超售问题"
]
self.model = None
self.effect_data = self._generate_historical_effect_data()
def _generate_historical_effect_data(self) -> pd.DataFrame:
"""生成历史补偿效果数据(模拟数据)"""
np.random.seed(42)
data = []
for problem in self.problem_types:
for compensation in self.compensation_types:
for _ in range(50): # 每个组合50条历史数据
sentiment_intensity = np.random.uniform(-1, 0) # 负面情感
# 基于问题类型和补偿类型的预期效果
base_effect = self._calculate_base_effect(problem, compensation)
# 考虑情感强度的影响
sentiment_multiplier = 1 + abs(sentiment_intensity) * 0.5
# 随机因素
random_factor = np.random.normal(1, 0.1)
satisfaction_improvement = base_effect * sentiment_multiplier * random_factor
cost_benefit_ratio = self._calculate_cost_benefit(compensation, satisfaction_improvement)
success_rate = np.random.uniform(0.6, 0.95)
data.append({
'problem_type': problem,
'compensation_type': compensation,
'sentiment_intensity': sentiment_intensity,
'satisfaction_improvement': max(0, min(1, satisfaction_improvement)),
'cost_benefit_ratio': cost_benefit_ratio,
'success_rate': success_rate
})
return pd.DataFrame(data)
def _calculate_base_effect(self, problem: str, compensation: str) -> float:
"""计算基础补偿效果"""
# 问题-补偿匹配度矩阵
effect_matrix = {
"航班延误": {"里程积分补偿": 0.8, "代金券补偿": 0.7, "延误保险理赔": 0.9},
"行李丢失": {"里程积分补偿": 0.6, "代金券补偿": 0.8, "快速安检": 0.4},
"服务态度": {"升舱服务": 0.7, "贵宾室体验": 0.6, "下次旅行折扣": 0.8},
"餐饮质量": {"餐饮补偿": 0.9, "代金券补偿": 0.7, "里程积分补偿": 0.6}
}
# 默认效果
default_effect = 0.5
return effect_matrix.get(problem, {}).get(compensation, default_effect)
def _calculate_cost_benefit(self, compensation: str, improvement: float) -> float:
"""计算成本效益比"""
# 补偿成本估算
cost_estimates = {
"里程积分补偿": 50, "代金券补偿": 100, "升舱服务": 200,
"优先登机": 30, "免费行李额度": 40, "贵宾室体验": 60,
"快速安检": 20, "延误保险理赔": 150, "餐饮补偿": 25,
"酒店住宿": 120, "交通补贴": 80, "下次旅行折扣": 90
}
cost = cost_estimates.get(compensation, 50)
benefit = improvement * 200 # 假设满意度提升的价值
return benefit / cost if cost > 0 else 0
def train_recommendation_model(self) -> None:
"""训练补偿推荐模型"""
# 准备特征数据
X = pd.get_dummies(self.effect_data[['problem_type', 'compensation_type']])
X['sentiment_intensity'] = self.effect_data['sentiment_intensity']
y = self.effect_data['satisfaction_improvement']
# 训练模型
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.model.fit(X, y)
def recommend_compensation(self, problem_type: str, sentiment_intensity: float,
user_history: Optional[Dict] = None) -> CompensationRecommendation:
"""推荐补偿方案"""
if self.model is None:
self.train_recommendation_model()
# 评估所有补偿方案
compensation_effects = []
for compensation in self.compensation_types:
# 准备特征
features = self._prepare_features(problem_type, compensation, sentiment_intensity)
# 预测满意度提升
predicted_improvement = self.model.predict([features])[0]
# 计算成本效益比
cost_benefit = self._calculate_cost_benefit(compensation, predicted_improvement)
# 获取历史成功率
historical_data = self.effect_data[
(self.effect_data['problem_type'] == problem_type) &
(self.effect_data['compensation_type'] == compensation)
]
success_rate = historical_data['success_rate'].mean() if not historical_data.empty else 0.7
# 计算推荐指数
recommendation_score = self._calculate_recommendation_score(
predicted_improvement, cost_benefit, success_rate
)
compensation_effects.append(CompensationEffect(
补偿类型=compensation,
问题类型=problem_type,
情感强度=sentiment_intensity,
预期满意度提升=predicted_improvement,
成本效益比=cost_benefit,
推荐指数=recommendation_score,
历史成功率=success_rate
))
# 排序并选择最优方案
compensation_effects.sort(key=lambda x: x.推荐指数, reverse=True)
best_compensation = compensation_effects[0]
# 生成推荐理由
recommendation_reason = self._generate_recommendation_reason(best_compensation)
# 生成实施建议
implementation_advice = self._generate_implementation_advice(best_compensation.补偿类型)
# 生成A/B测试建议
ab_test_suggestion = self._generate_ab_test_suggestion(best_compensation, compensation_effects[1:3])
return CompensationRecommendation(
最优补偿方案=best_compensation.补偿类型,
备选方案=[ce.补偿类型 for ce in compensation_effects[1:3]],
预期效果=best_compensation,
推荐理由=recommendation_reason,
实施建议=implementation_advice,
A_B测试建议=ab_test_suggestion
)
def _prepare_features(self, problem_type: str, compensation: str, sentiment_intensity: float) -> List[float]:
"""准备特征数据"""
# 创建问题类型和补偿类型的独热编码
problem_features = [1 if problem_type == pt else 0 for pt in self.problem_types]
compensation_features = [1 if compensation == ct else 0 for ct in self.compensation_types]
return problem_features + compensation_features + [sentiment_intensity]
def _calculate_recommendation_score(self, improvement: float, cost_benefit: float, success_rate: float) -> float:
"""计算推荐指数"""
# 加权综合评分
improvement_weight = 0.4
cost_benefit_weight = 0.4
success_rate_weight = 0.2
return (improvement * improvement_weight +
min(cost_benefit, 5) * 0.2 * cost_benefit_weight + # 限制成本效益比影响
success_rate * success_rate_weight)
def _generate_recommendation_reason(self, effect: CompensationEffect) -> str:
"""生成推荐理由"""
reasons = []
if effect.预期满意度提升 > 0.7:
reasons.append("预期满意度提升效果显著")
elif effect.预期满意度提升 > 0.5:
reasons.append("预期满意度提升效果良好")
if effect.成本效益比 > 3:
reasons.append("成本效益比优秀")
elif effect.成本效益比 > 2:
reasons.append("成本效益比良好")
if effect.历史成功率 > 0.8:
reasons.append("历史成功率较高")
return "; ".join(reasons) if reasons else "基于综合评估推荐"
def _generate_implementation_advice(self, compensation_type: str) -> str:
"""生成实施建议"""
advice_map = {
"里程积分补偿": "建议立即发放积分,并发送确认邮件",
"代金券补偿": "生成电子代金券有效期建议30天",
"升舱服务": "为下次旅行提供升舱机会,需提前确认",
"优先登机": "自动添加到乘客档案,下次生效",
"延误保险理赔": "启动保险理赔流程,需收集相关证明"
}
return advice_map.get(compensation_type, "按照标准流程实施补偿")
def _generate_ab_test_suggestion(self, best_compensation: CompensationEffect,
alternatives: List[CompensationEffect]) -> str:
"""生成A/B测试建议"""
if not alternatives:
return "暂无合适的A/B测试方案"
alternative = alternatives[0]
return (
f"建议进行A/B测试{best_compensation.补偿类型}{alternative.补偿类型}进行对比,"
f"预期效果差异为{abs(best_compensation.预期满意度提升 - alternative.预期满意度提升):.2f}"
)
def run_ab_test(self, problem_type: str, sentiment_intensity: float,
test_groups: List[str], sample_size: int = 100) -> ABTestResult:
"""运行A/B测试"""
if len(test_groups) < 2:
raise ValueError("A/B测试需要至少2个测试组")
# 模拟测试结果
group_a = test_groups[0]
group_b = test_groups[1]
# 模拟测试数据
group_a_improvement = np.random.normal(0.6, 0.1, sample_size)
group_b_improvement = np.random.normal(0.65, 0.1, sample_size)
# 计算差异
mean_diff = group_b_improvement.mean() - group_a_improvement.mean()
# 简单显著性检验(模拟)
p_value = 0.03 if abs(mean_diff) > 0.05 else 0.15
conclusion = "B组效果显著优于A组" if p_value < 0.05 else "两组效果无显著差异"
return ABTestResult(
测试组=group_b,
对照组=group_a,
满意度提升差异=mean_diff,
统计显著性=p_value,
推荐结论=conclusion
)
# 创建补偿推荐Agent
from pydantic_ai import Agent
class CompensationAnalysis(BaseModel):
"""补偿分析结果"""
问题诊断: str = Field(description="问题诊断")
补偿策略: str = Field(description="补偿策略")
预期ROI: float = Field(description="预期投资回报率")
风险分析: str = Field(description="风险分析")
长期影响: str = Field(description="长期影响评估")
compensation_agent = Agent(
'deepseek:deepseek-chat',
system_prompt="""
你是航空公司补偿策略专家基于行为经济学和因果推断原理分析补偿方案
你的任务
1. 分析客户问题的严重程度和影响范围
2. 评估不同补偿方案的有效性
3. 预测补偿方案的长期影响
4. 提供风险分析和优化建议
分析要点
- 考虑客户的情感强度和问题类型
- 评估补偿方案的成本效益比
- 预测客户满意度的恢复程度
- 分析长期客户关系影响
输出必须是结构化的JSON格式
""",
output_type=CompensationAnalysis
)
async def analyze_compensation_strategy(problem_description: str, sentiment_intensity: float) -> CompensationAnalysis:
"""分析补偿策略"""
result = await compensation_agent.run(f"""
请分析以下客户问题的补偿策略
问题描述: {problem_description}
情感强度: {sentiment_intensity:.2f}
请提供
1. 问题诊断和严重程度评估
2. 推荐的补偿策略
3. 预期投资回报率分析
4. 风险分析和优化建议
5. 长期客户关系影响评估
""")
# 获取实际的CompensationAnalysis对象
# pydantic-ai不同版本中result可能有不同结构
compensation_analysis = None
# 尝试多种可能的属性访问方式
if hasattr(result, 'data'):
compensation_analysis = result.data
elif hasattr(result, 'result'):
compensation_analysis = result.result
elif hasattr(result, 'output'):
compensation_analysis = result.output
elif hasattr(result, 'value'):
compensation_analysis = result.value
else:
compensation_analysis = result
# 如果compensation_analysis仍然是AgentRunResult类型尝试直接访问其属性
if hasattr(compensation_analysis, '问题诊断'):
# 已经是正确的CompensationAnalysis对象
pass
else:
# 尝试从其他属性获取
if hasattr(compensation_analysis, 'data'):
compensation_analysis = compensation_analysis.data
elif hasattr(compensation_analysis, 'result'):
compensation_analysis = compensation_analysis.result
elif hasattr(compensation_analysis, 'output'):
compensation_analysis = compensation_analysis.output
elif hasattr(compensation_analysis, 'value'):
compensation_analysis = compensation_analysis.value
# 最终检查是否获取到了正确的对象
if not hasattr(compensation_analysis, '问题诊断'):
raise AttributeError(f"无法访问CompensationAnalysis对象的问题诊断属性对象类型: {type(compensation_analysis)}")
return compensation_analysis

View File

@ -0,0 +1,372 @@
"""竞争情报与差异化定位系统"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
from pydantic import BaseModel, Field
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.preprocessing import StandardScaler
from scipy import stats
class AirlineComparison(BaseModel):
"""航空公司对比分析"""
航空公司: str = Field(description="航空公司名称")
问题类型: str = Field(description="问题类型")
投诉数量: int = Field(description="投诉数量")
平均情感强度: float = Field(description="平均情感强度")
满意度得分: float = Field(description="满意度得分(0-100)")
相对表现: str = Field(description="相对表现(优于/劣于/持平)")
class CompetitiveAdvantage(BaseModel):
"""竞争优势分析"""
优势领域: str = Field(description="优势领域")
相对优势度: float = Field(description="相对优势度")
关键指标: List[str] = Field(description="关键指标")
改进建议: str = Field(description="改进建议")
class OpportunitySpace(BaseModel):
"""机会空间发现"""
机会领域: str = Field(description="机会领域")
未满足需求: str = Field(description="未满足需求")
潜在市场规模: float = Field(description="潜在市场规模(0-1)")
竞争对手弱点: List[str] = Field(description="竞争对手弱点")
差异化建议: str = Field(description="差异化建议")
class CompetitiveIntelligence:
"""竞争情报与差异化定位系统"""
def __init__(self, data_path: str = "data/Tweets.csv"):
self.data_path = data_path
self.airlines = ["united", "american", "delta", "southwest", "us airways"]
self.problem_types = [
"Bad Flight", "Can't Tell", "Late Flight", "Customer Service Issue",
"Flight Booking Problems", "Lost Luggage", "Flight Attendant Complaints",
"Cancelled Flight", "Damaged Luggage", "longlines"
]
def load_data(self) -> pd.DataFrame:
"""加载数据"""
df = pd.read_csv(self.data_path)
return df
def analyze_airline_comparison(self, target_airline: str, competitor_airlines: List[str]) -> List[AirlineComparison]:
"""分析航空公司对比"""
df = self.load_data()
comparisons = []
for problem in self.problem_types:
# 分析目标航空公司在当前问题上的表现
target_data = df[(df['airline'] == target_airline) & (df['negativereason'] == problem)]
if len(target_data) > 0:
target_complaints = len(target_data)
target_sentiment = target_data['airline_sentiment_confidence'].mean()
target_score = self._calculate_satisfaction_score(target_sentiment, target_complaints)
# 与每个竞争对手对比
for competitor in competitor_airlines:
if competitor != target_airline:
competitor_data = df[(df['airline'] == competitor) & (df['negativereason'] == problem)]
if len(competitor_data) > 0:
comp_complaints = len(competitor_data)
comp_sentiment = competitor_data['airline_sentiment_confidence'].mean()
comp_score = self._calculate_satisfaction_score(comp_sentiment, comp_complaints)
# 确定相对表现
relative_performance = self._determine_relative_performance(target_score, comp_score)
comparisons.append(AirlineComparison(
航空公司=f"{target_airline} vs {competitor}",
问题类型=problem,
投诉数量=target_complaints,
平均情感强度=target_sentiment,
满意度得分=target_score,
相对表现=relative_performance
))
return comparisons
def identify_competitive_advantages(self, target_airline: str, competitor_airlines: List[str]) -> List[CompetitiveAdvantage]:
"""识别竞争优势"""
df = self.load_data()
advantages = []
for problem in self.problem_types:
# 计算目标航空公司在当前问题上的表现
target_data = df[(df['airline'] == target_airline) & (df['negativereason'] == problem)]
if len(target_data) > 0:
target_score = self._calculate_satisfaction_score(
target_data['airline_sentiment_confidence'].mean(),
len(target_data)
)
# 计算竞争对手的平均表现
competitor_scores = []
for competitor in competitor_airlines:
if competitor != target_airline:
comp_data = df[(df['airline'] == competitor) & (df['negativereason'] == problem)]
if len(comp_data) > 0:
comp_score = self._calculate_satisfaction_score(
comp_data['airline_sentiment_confidence'].mean(),
len(comp_data)
)
competitor_scores.append(comp_score)
if competitor_scores:
avg_competitor_score = np.mean(competitor_scores)
advantage_degree = target_score - avg_competitor_score
# 如果优势明显,记录为竞争优势
if advantage_degree > 5: # 优势阈值
advantages.append(CompetitiveAdvantage(
优势领域=problem,
相对优势度=advantage_degree,
关键指标=[f"满意度得分: {target_score:.1f}", f"行业平均: {avg_competitor_score:.1f}"],
改进建议=self._generate_improvement_suggestion(problem, advantage_degree)
))
return advantages
def discover_opportunity_spaces(self, target_airline: str, competitor_airlines: List[str]) -> List[OpportunitySpace]:
"""发现机会空间"""
df = self.load_data()
opportunities = []
# 分析竞争对手的弱点
for competitor in competitor_airlines:
if competitor != target_airline:
# 找出竞争对手表现最差的问题领域
competitor_problems = []
for problem in self.problem_types:
comp_data = df[(df['airline'] == competitor) & (df['negativereason'] == problem)]
if len(comp_data) > 0:
score = self._calculate_satisfaction_score(
comp_data['airline_sentiment_confidence'].mean(),
len(comp_data)
)
competitor_problems.append((problem, score))
# 找出竞争对手最弱的问题领域(得分最低)
if competitor_problems:
worst_problem = min(competitor_problems, key=lambda x: x[1])
# 检查目标航空公司在相同问题上的表现
target_data = df[(df['airline'] == target_airline) & (df['negativereason'] == worst_problem[0])]
if len(target_data) > 0:
target_score = self._calculate_satisfaction_score(
target_data['airline_sentiment_confidence'].mean(),
len(target_data)
)
# 如果目标航空公司表现更好,则存在机会空间
if target_score > worst_problem[1]:
market_size = self._estimate_market_size(worst_problem[0], df)
opportunities.append(OpportunitySpace(
机会领域=worst_problem[0],
未满足需求=f"{competitor}{worst_problem[0]}问题上表现不佳",
潜在市场规模=market_size,
竞争对手弱点=[f"{competitor}满意度得分: {worst_problem[1]:.1f}"],
差异化建议=self._generate_differentiation_suggestion(worst_problem[0], competitor)
))
return opportunities
def monitor_competitor_improvements(self, competitor_airlines: List[str]) -> Dict[str, List[Dict]]:
"""监控竞争对手改进"""
df = self.load_data()
improvements = {}
for competitor in competitor_airlines:
competitor_improvements = []
# 分析竞争对手在不同问题上的表现趋势
for problem in self.problem_types:
problem_data = df[(df['airline'] == competitor) & (df['negativereason'] == problem)]
if len(problem_data) > 10: # 确保有足够的数据
# 简单的时间趋势分析按推文ID排序假设ID反映时间顺序
problem_data_sorted = problem_data.sort_values('tweet_id')
# 将数据分为前后两半
split_point = len(problem_data_sorted) // 2
early_period = problem_data_sorted.iloc[:split_point]
late_period = problem_data_sorted.iloc[split_point:]
if len(early_period) > 0 and len(late_period) > 0:
early_score = self._calculate_satisfaction_score(
early_period['airline_sentiment_confidence'].mean(),
len(early_period)
)
late_score = self._calculate_satisfaction_score(
late_period['airline_sentiment_confidence'].mean(),
len(late_period)
)
improvement = late_score - early_score
if improvement > 2: # 显著改进
competitor_improvements.append({
'问题类型': problem,
'改进幅度': improvement,
'前期表现': early_score,
'后期表现': late_score,
'改进措施': self._infer_improvement_measures(problem, improvement)
})
improvements[competitor] = competitor_improvements
return improvements
def _calculate_satisfaction_score(self, sentiment_confidence: float, complaint_count: int) -> float:
"""计算满意度得分"""
if pd.isna(sentiment_confidence):
sentiment_confidence = 0.5
# 基于情感置信度和投诉数量计算综合得分
base_score = sentiment_confidence * 100 # 转换为0-100分
# 考虑投诉数量的影响(投诉越多,得分越低)
complaint_penalty = min(complaint_count * 0.1, 20) # 最多扣20分
final_score = max(0, base_score - complaint_penalty)
return final_score
def _determine_relative_performance(self, target_score: float, competitor_score: float) -> str:
"""确定相对表现"""
difference = target_score - competitor_score
if difference > 5:
return "优于"
elif difference < -5:
return "劣于"
else:
return "持平"
def _generate_improvement_suggestion(self, problem: str, advantage_degree: float) -> str:
"""生成改进建议"""
suggestions = {
"Bad Flight": "继续保持航班质量监控,加强机组人员培训",
"Late Flight": "优化航班调度,提高准点率",
"Customer Service Issue": "加强客服培训,提升服务响应速度",
"Lost Luggage": "改进行李追踪系统,加强行李处理流程"
}
base_suggestion = suggestions.get(problem, "持续优化相关服务流程")
if advantage_degree > 10:
return f"{base_suggestion},考虑将这一优势作为品牌差异化点进行宣传"
else:
return f"{base_suggestion},保持现有优势"
def _estimate_market_size(self, problem: str, df: pd.DataFrame) -> float:
"""估计市场规模"""
# 基于问题在所有航空公司中的出现频率估计市场规模
total_complaints = len(df[df['negativereason'] == problem])
total_all_complaints = len(df[df['negativereason'].notna()])
if total_all_complaints > 0:
return total_complaints / total_all_complaints
else:
return 0.1 # 默认值
def _generate_differentiation_suggestion(self, problem: str, competitor: str) -> str:
"""生成差异化建议"""
suggestions = {
"Bad Flight": f"针对{competitor}在航班体验上的弱点,推出'舒适飞行保证'计划",
"Late Flight": f"利用{competitor}准点率问题,强调自身的准点承诺",
"Customer Service Issue": f"针对{competitor}的服务问题,推出'24小时客服响应'服务",
"Lost Luggage": f"针对{competitor}行李问题,提供'行李实时追踪'功能"
}
return suggestions.get(problem, f"针对{competitor}的弱点,推出差异化服务方案")
def _infer_improvement_measures(self, problem: str, improvement: float) -> str:
"""推断改进措施"""
measures = {
"Bad Flight": "可能改进了航班服务流程或机组培训",
"Late Flight": "可能优化了航班调度或地面服务",
"Customer Service Issue": "可能加强了客服培训或投诉处理流程",
"Lost Luggage": "可能升级了行李处理系统或追踪技术"
}
base_measure = measures.get(problem, "实施了相关服务改进措施")
if improvement > 5:
return f"显著{base_measure}"
else:
return f"轻微{base_measure}"
def generate_competitive_insights_report(self, target_airline: str, competitor_airlines: List[str]) -> Dict:
"""生成竞争洞察报告"""
comparisons = self.analyze_airline_comparison(target_airline, competitor_airlines)
advantages = self.identify_competitive_advantages(target_airline, competitor_airlines)
opportunities = self.discover_opportunity_spaces(target_airline, competitor_airlines)
improvements = self.monitor_competitor_improvements(competitor_airlines)
return {
'comparisons': comparisons,
'advantages': advantages,
'opportunities': opportunities,
'improvements': improvements
}
# 创建可视化函数
def create_competitive_analysis_charts(insights_report: Dict) -> Dict:
"""创建竞争分析图表"""
charts = {}
# 航空公司对比图表
if insights_report['comparisons']:
comparisons_df = pd.DataFrame([c.dict() for c in insights_report['comparisons']])
fig_comparison = px.bar(
comparisons_df,
x='问题类型',
y='满意度得分',
color='航空公司',
title='航空公司满意度对比',
barmode='group'
)
charts['comparison_chart'] = fig_comparison
# 竞争优势图表
if insights_report['advantages']:
advantages_df = pd.DataFrame([a.dict() for a in insights_report['advantages']])
fig_advantages = px.bar(
advantages_df,
x='优势领域',
y='相对优势度',
title='竞争优势分析',
color='相对优势度',
color_continuous_scale='Viridis'
)
charts['advantages_chart'] = fig_advantages
# 机会空间图表
if insights_report['opportunities']:
opportunities_df = pd.DataFrame([o.dict() for o in insights_report['opportunities']])
fig_opportunities = px.scatter(
opportunities_df,
x='机会领域',
y='潜在市场规模',
size='潜在市场规模',
title='机会空间发现',
hover_data=['未满足需求']
)
charts['opportunities_chart'] = fig_opportunities
return charts

175
bigwork/src/data.py Normal file
View File

@ -0,0 +1,175 @@
"""航空公司情感分析数据预处理模块"""
import pandas as pd
from pathlib import Path
from typing import Tuple, Dict, List
import re
def validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""简化数据验证函数"""
# 检查必需列是否存在
required_columns = ['tweet_id', 'airline_sentiment', 'airline_sentiment_confidence',
'negativereason', 'airline', 'text', 'tweet_created']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"数据缺失必需列: {missing_columns}")
# 验证airline_sentiment值
valid_sentiments = ["positive", "neutral", "negative"]
invalid_sentiments = df[~df['airline_sentiment'].isin(valid_sentiments)]['airline_sentiment'].unique()
if len(invalid_sentiments) > 0:
print(f"警告: 发现无效的情感值: {invalid_sentiments}")
# 验证置信度范围
invalid_confidence = df[(df['airline_sentiment_confidence'] < 0) | (df['airline_sentiment_confidence'] > 1)]
if len(invalid_confidence) > 0:
print(f"警告: 发现置信度超出范围 [0,1] 的数据行")
return df
class DataProcessor:
"""数据预处理类"""
def __init__(self, data_path: str):
self.data_path = Path(data_path)
self.abb_dict = self._load_abbreviation_dict()
def _load_abbreviation_dict(self) -> Dict[str, str]:
"""加载缩写词典"""
return {
'pls': 'please', 'thx': 'thanks', 'ty': 'thank you',
'u': 'you', 'r': 'are', 'ur': 'your', 'btw': 'by the way',
'asap': 'as soon as possible', 'fyi': 'for your information',
'imo': 'in my opinion', 'lol': 'laughing out loud',
'omg': 'oh my god', 'idk': 'i don\'t know', 'tbh': 'to be honest',
'afaik': 'as far as i know', 'brb': 'be right back'
}
def load_data(self) -> pd.DataFrame:
"""加载原始数据"""
df = pd.read_csv(self.data_path)
# 数据验证
validated_df = validate_dataframe(df)
return validated_df
def preprocess_text(self, text: str) -> str:
"""文本预处理 - 克制清洗策略"""
if pd.isna(text):
return ""
# 基础清洗:小写化、去除多余空格
text = str(text).lower().strip()
text = re.sub(r'\s+', ' ', text)
# 删除表情符号(减少噪声)
text = re.sub(r'[\U00010000-\U0010ffff]', '', text) # 删除emoji
# 保留@提及、#标签、URL链接重要上下文
# 不删除这些社交媒体特征
# 缩写/俚语词典映射
words = text.split()
processed_words = []
for word in words:
# 处理常见缩写
processed_word = self.abb_dict.get(word, word)
# 添加注释:在原始词后添加注释
if word in self.abb_dict and word != processed_word:
processed_words.append(f"{word}({processed_word})")
else:
processed_words.append(processed_word)
return ' '.join(processed_words)
def extract_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""提取特征 - 增强特征工程"""
df_processed = df.copy()
# 文本预处理
df_processed['cleaned_text'] = df_processed['text'].apply(self.preprocess_text)
# 文本统计特征
df_processed['text_length'] = df_processed['cleaned_text'].str.len()
df_processed['word_count'] = df_processed['cleaned_text'].str.split().str.len()
df_processed['punctuation_ratio'] = df_processed['cleaned_text'].str.count(r'[!?.,;:]') / (df_processed['text_length'] + 1)
df_processed['uppercase_ratio'] = df_processed['text'].str.count(r'[A-Z]') / (df_processed['text_length'] + 1)
# 社交媒体特征
df_processed['mention_count'] = df_processed['text'].str.count(r'@\\w+')
df_processed['hashtag_count'] = df_processed['text'].str.count(r'#\\w+')
df_processed['url_count'] = df_processed['text'].str.count(r'http[s]?://')
df_processed['has_mention'] = (df_processed['mention_count'] > 0).astype(int)
df_processed['has_hashtag'] = (df_processed['hashtag_count'] > 0).astype(int)
df_processed['has_url'] = (df_processed['url_count'] > 0).astype(int)
# 负面情感关键词检测(增强负面识别)
negative_keywords = [
'terrible', 'awful', 'horrible', 'bad', 'worst', 'disappointing',
'disgusting', 'hate', 'angry', 'frustrated', 'annoying', 'poor',
'sucks', 'terrible', 'awful', 'horrible', 'bad', 'worst',
'delay', 'cancelled', 'lost', 'missing', 'broken', 'damaged'
]
def count_negative_words(text):
if pd.isna(text):
return 0
text_lower = str(text).lower()
return sum(1 for word in negative_keywords if word in text_lower)
df_processed['negative_word_count'] = df_processed['cleaned_text'].apply(count_negative_words)
df_processed['has_negative_words'] = (df_processed['negative_word_count'] > 0).astype(int)
# 感叹号数量(强烈情感指标)
df_processed['exclamation_count'] = df_processed['text'].str.count(r'!')
df_processed['has_exclamation'] = (df_processed['exclamation_count'] > 0).astype(int)
# 结构化特征
# 航空公司one-hot编码准备
df_processed['airline_encoded'] = df_processed['airline'].astype('category').cat.codes
# 负面原因(多标签特征准备)
df_processed['has_negative_reason'] = (~df_processed['negativereason'].isna()).astype(int)
# 时间特征
df_processed['tweet_created'] = pd.to_datetime(df_processed['tweet_created'])
df_processed['hour'] = df_processed['tweet_created'].dt.hour
df_processed['day_of_week'] = df_processed['tweet_created'].dt.dayofweek
df_processed['is_weekend'] = (df_processed['day_of_week'] >= 5).astype(int)
return df_processed
def prepare_training_data(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
"""准备训练数据"""
df_features = self.extract_features(df)
# 选择特征列
feature_cols = [
'text_length', 'word_count', 'punctuation_ratio', 'uppercase_ratio',
'mention_count', 'hashtag_count', 'url_count',
'has_mention', 'has_hashtag', 'has_url',
'airline_encoded', 'has_negative_reason',
'hour', 'day_of_week', 'is_weekend', 'airline_sentiment_confidence'
]
X = df_features[feature_cols]
y = df_features['airline_sentiment']
return X, y
def get_feature_columns() -> Tuple[List[str], List[str]]:
"""获取特征列列表"""
numeric_features = [
'text_length', 'word_count', 'punctuation_ratio', 'uppercase_ratio',
'mention_count', 'hashtag_count', 'url_count',
'airline_encoded', 'has_negative_reason',
'hour', 'day_of_week', 'is_weekend', 'airline_sentiment_confidence',
'negative_word_count', 'exclamation_count' # 新增负面情感特征
]
categorical_features = ['has_mention', 'has_hashtag', 'has_url', 'has_negative_words', 'has_exclamation']
return numeric_features, categorical_features

186
bigwork/src/dna_agent.py Normal file
View File

@ -0,0 +1,186 @@
"""服务体验DNA解码Agent系统"""
import asyncio
from typing import List, Dict, Optional
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from dna_decoder import DNADecoder, DNAResult, ServiceExperienceInsight
class DNAAnalysisRequest(BaseModel):
"""DNA分析请求"""
tweet_text: str = Field(description="推文内容")
airline: str = Field(description="航空公司")
analysis_depth: str = Field(description="分析深度", default="standard")
class DNAInterpretation(BaseModel):
"""DNA解释结果"""
问题分类: str = Field(description="主要问题分类")
情感深度: str = Field(description="情感深度分析")
隐性模式: List[str] = Field(description="发现的隐性模式")
关联问题: List[str] = Field(description="关联的问题类型")
根本原因: str = Field(description="推测的根本原因")
class DNADisposalPlan(BaseModel):
"""DNA处置方案"""
紧急程度: str = Field(description="紧急程度")
处置策略: str = Field(description="整体处置策略")
具体措施: List[str] = Field(description="具体处置措施")
预防建议: List[str] = Field(description="预防建议")
跟进计划: str = Field(description="跟进计划")
class DNAAgentResult(BaseModel):
"""DNA Agent结果"""
分析摘要: str = Field(description="分析摘要")
深度解释: DNAInterpretation = Field(description="深度解释")
处置方案: DNADisposalPlan = Field(description="处置方案")
洞察发现: List[str] = Field(description="关键洞察发现")
# 创建DNA解码Agent
dna_agent = Agent(
'deepseek:deepseek-chat',
system_prompt="""
你是航空公司服务体验DNA解码专家专门分析客户反馈中的深层模式和隐性不满
你的任务
1. 基于DNA解码系统的分析结果进行深度解释
2. 识别客户反馈中的隐性模式和关联关系
3. 生成针对性的处置方案和预防建议
4. 提供基于数据分析的洞察发现
分析要点
- 关注情感强度的细微变化
- 识别未直接表达的隐性不满
- 发现问题的关联性和模式
- 提供数据驱动的解决方案
输出必须是结构化的JSON格式
""",
output_type=DNAAgentResult
)
class DNAAgentSystem:
"""DNA Agent系统"""
def __init__(self):
self.dna_decoder = DNADecoder()
self.agent = dna_agent
async def analyze_tweet_dna(self, tweet_text: str, airline: str = "unknown") -> DNAAgentResult:
"""分析推文DNA"""
# 第一步DNA解码分析
dna_result = self.dna_decoder.decode_experience_dna(tweet_text)
# 第二步Agent深度解释和处置方案生成
agent_result = await self.agent.run(
f"""
请基于以下DNA解码结果生成深度解释和处置方案
推文内容: "{tweet_text}"
航空公司: {airline}
DNA解码结果:
- 情感强度: {dna_result.推文分析.情感强度:.3f}
- 隐性不满: {'' if dna_result.推文分析.隐性不满 else ''}
- 主要主题: {list(dna_result.推文分析.主题分布.keys())[:3] if dna_result.推文分析.主题分布 else []}
- 关键洞察: {dna_result.推文分析.关键洞察}
整体洞察摘要:
{self._format_insights(dna_result.整体洞察)}
请提供
1. 深度的问题分类和情感分析
2. 发现的隐性模式和关联关系
3. 针对性的处置方案
4. 基于数据的洞察发现
"""
)
return agent_result
def _format_insights(self, insights: List[ServiceExperienceInsight]) -> str:
"""格式化洞察信息"""
if not insights:
return "暂无整体洞察数据"
formatted = []
for insight in insights[:3]: # 只显示前3个关键洞察
formatted.append(
f"- {insight.主题名称}: 情感强度{insight.情感强度:.2f}, "
f"隐性不满{insight.隐性不满指标:.2f}"
)
return "\n".join(formatted)
def get_visualization_data(self, tweet_text: str) -> Dict:
"""获取可视化数据"""
dna_result = self.dna_decoder.decode_experience_dna(tweet_text)
# 主题分布数据
topic_data = {
'topics': list(dna_result.推文分析.主题分布.keys()),
'probabilities': list(dna_result.推文分析.主题分布.values())
}
# 情感强度数据
sentiment_data = {
'intensity': dna_result.推文分析.情感强度,
'is_negative': dna_result.推文分析.情感强度 < 0,
'implicit_dissatisfaction': dna_result.推文分析.隐性不满
}
# 关联问题数据
association_data = {
'problems': dna_result.推文分析.问题关联,
'count': len(dna_result.推文分析.问题关联)
}
return {
'topic_distribution': topic_data,
'sentiment_analysis': sentiment_data,
'problem_association': association_data,
'insights': dna_result.整体洞察,
'recommendations': dna_result.处置建议
}
# 创建全局实例
dna_agent_system = DNAAgentSystem()
async def analyze_service_experience_dna(tweet_text: str, airline: str = "unknown") -> DNAAgentResult:
"""分析服务体验DNA"""
return await dna_agent_system.analyze_tweet_dna(tweet_text, airline)
def get_dna_visualization_data(tweet_text: str) -> Dict:
"""获取DNA可视化数据"""
return dna_agent_system.get_visualization_data(tweet_text)
# 测试函数
async def test_dna_agent():
"""测试DNA Agent系统"""
test_tweet = "@United my flight was delayed for 3 hours and the food was terrible. Very disappointing experience."
try:
result = await analyze_service_experience_dna(test_tweet, "united")
print("DNA Agent测试成功!")
print(f"分析摘要: {result.分析摘要}")
print(f"问题分类: {result.深度解释.问题分类}")
print(f"处置策略: {result.处置方案.处置策略}")
return True
except Exception as e:
print(f"DNA Agent测试失败: {e}")
return False
if __name__ == "__main__":
# 运行测试
asyncio.run(test_dna_agent())

421
bigwork/src/dna_decoder.py Normal file
View File

@ -0,0 +1,421 @@
"""服务体验DNA解码系统"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
from pydantic import BaseModel, Field
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.cluster import DBSCAN
import re
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
import plotly.express as px
import plotly.graph_objects as go
from collections import Counter
import networkx as nx
class ServiceExperienceInsight(BaseModel):
"""服务体验洞察"""
主题名称: str = Field(description="主题名称")
关键词: List[str] = Field(description="主题关键词")
情感强度: float = Field(description="情感强度得分(-1到1)")
问题类型: str = Field(description="问题分类")
隐性不满指标: float = Field(description="隐性不满程度(0-1)")
关联问题: List[str] = Field(description="关联的问题类型")
典型推文: List[str] = Field(description="典型推文示例")
class ExperienceDNA(BaseModel):
"""体验DNA分析结果"""
推文ID: str = Field(description="推文ID")
主题分布: Dict[str, float] = Field(description="主题分布概率")
情感强度: float = Field(description="情感强度得分")
隐性不满: bool = Field(description="是否存在隐性不满")
问题关联: List[str] = Field(description="关联的问题类型")
关键洞察: str = Field(description="关键洞察描述")
class ServiceExperienceMap(BaseModel):
"""服务体验图谱"""
节点: List[Dict] = Field(description="问题节点")
: List[Dict] = Field(description="关联关系")
洞察: List[ServiceExperienceInsight] = Field(description="关键洞察")
class DNAResult(BaseModel):
"""DNA解码结果"""
推文分析: ExperienceDNA = Field(description="单条推文分析")
整体洞察: List[ServiceExperienceInsight] = Field(description="整体洞察")
体验图谱: ServiceExperienceMap = Field(description="服务体验图谱")
处置建议: List[str] = Field(description="处置建议")
class DNADecoder:
"""服务体验DNA解码器"""
def __init__(self, data_path: str = "data/Tweets.csv"):
self.data_path = data_path
self.lda_model = None
self.vectorizer = None
self.topics = {}
def load_data(self) -> pd.DataFrame:
"""加载数据"""
df = pd.read_csv(self.data_path)
return df
def preprocess_text(self, text: str) -> str:
"""文本预处理"""
if pd.isna(text):
return ""
# 基础清洗
text = str(text).lower().strip()
text = re.sub(r'[^\w\s]', '', text) # 移除标点
text = re.sub(r'\s+', ' ', text) # 移除多余空格
return text
def train_lda_model(self, n_topics: int = 8) -> None:
"""训练LDA主题模型"""
df = self.load_data()
texts = df['text'].apply(self.preprocess_text).tolist()
# 创建词袋模型
self.vectorizer = CountVectorizer(
max_df=0.95, min_df=2,
stop_words='english',
max_features=1000
)
X = self.vectorizer.fit_transform(texts)
# 训练LDA模型
self.lda_model = LatentDirichletAllocation(
n_components=n_topics,
random_state=42,
max_iter=10
)
self.lda_model.fit(X)
# 提取主题关键词
self._extract_topic_keywords()
def _extract_topic_keywords(self) -> None:
"""提取主题关键词"""
if self.lda_model is None or self.vectorizer is None:
return
feature_names = self.vectorizer.get_feature_names_out()
for topic_idx, topic in enumerate(self.lda_model.components_):
top_keywords_idx = topic.argsort()[-10:][::-1]
top_keywords = [feature_names[i] for i in top_keywords_idx]
# 根据关键词自动命名主题
topic_name = self._generate_topic_name(top_keywords)
self.topics[topic_idx] = {
'name': topic_name,
'keywords': top_keywords,
'weight': topic.sum()
}
def _generate_topic_name(self, keywords: List[str]) -> str:
"""根据关键词生成主题名称"""
# 常见航空服务关键词映射
keyword_mapping = {
'flight': '航班问题',
'delay': '延误问题',
'cancel': '取消问题',
'baggage': '行李问题',
'service': '服务质量',
'staff': '员工服务',
'food': '餐饮服务',
'seat': '座位舒适度',
'time': '时间安排',
'customer': '客户服务'
}
for keyword in keywords:
if keyword in keyword_mapping:
return keyword_mapping[keyword]
return f"主题_{keywords[0] if keywords else '未知'}"
def analyze_sentiment_intensity(self, text: str) -> float:
"""分析情感强度"""
if pd.isna(text):
return 0.0
text_lower = str(text).lower()
# 情感强度关键词和权重
intensity_indicators = {
# 强烈负面
'terrible': -0.9, 'awful': -0.9, 'horrible': -0.9, 'disgusting': -0.9,
'hate': -0.8, 'worst': -0.8, 'never again': -0.8,
# 中等负面
'bad': -0.6, 'poor': -0.6, 'disappointing': -0.6, 'frustrated': -0.6,
# 轻微负面
'ok': -0.3, 'average': -0.3, 'could be better': -0.3,
# 轻微正面
'good': 0.3, 'nice': 0.3, 'decent': 0.3,
# 中等正面
'great': 0.6, 'excellent': 0.6, 'wonderful': 0.6, 'satisfied': 0.6,
# 强烈正面
'amazing': 0.9, 'perfect': 0.9, 'best': 0.9, 'love': 0.9
}
# 感叹号和重复字符检测
exclamation_count = text.count('!')
repeat_patterns = len(re.findall(r'([a-z])\1{2,}', text_lower))
intensity_score = 0.0
matched_keywords = 0
for keyword, weight in intensity_indicators.items():
if keyword in text_lower:
intensity_score += weight
matched_keywords += 1
# 计算平均强度
if matched_keywords > 0:
intensity_score /= matched_keywords
# 考虑标点符号强度
intensity_score += min(0.2, exclamation_count * 0.05)
intensity_score += min(0.1, repeat_patterns * 0.03)
# 限制在-1到1之间
intensity_score = max(-1.0, min(1.0, intensity_score))
return intensity_score
def detect_implicit_dissatisfaction(self, text: str) -> Tuple[bool, float]:
"""检测隐性不满"""
if pd.isna(text):
return False, 0.0
text_lower = str(text).lower()
# 隐性不满指标
implicit_indicators = {
# 委婉表达
'wish': 0.3, 'hope': 0.3, 'would be nice': 0.4,
'could have been': 0.4, 'might want to': 0.3,
# 对比表达
'but': 0.4, 'however': 0.5, 'although': 0.4,
# 疑问表达
'why': 0.3, 'how come': 0.4, 'is this normal': 0.5,
# 讽刺表达
'interesting': 0.6, 'surprising': 0.5, 'unexpected': 0.4
}
# 检测指标
indicator_score = 0.0
for indicator, weight in implicit_indicators.items():
if indicator in text_lower:
indicator_score += weight
# 情感强度较低但存在问题的文本
sentiment_intensity = self.analyze_sentiment_intensity(text)
# 隐性不满判断逻辑
is_implicit = False
confidence = 0.0
if indicator_score > 0.5 and abs(sentiment_intensity) < 0.3:
is_implicit = True
confidence = min(1.0, indicator_score * 0.8)
elif indicator_score > 0.3 and sentiment_intensity < -0.1:
is_implicit = True
confidence = min(1.0, indicator_score * 0.6)
return is_implicit, confidence
def build_experience_map(self, df: pd.DataFrame) -> ServiceExperienceMap:
"""构建服务体验图谱"""
# 提取问题和关联
problems = df['negativereason'].dropna().unique()
nodes = []
edges = []
# 创建节点
for i, problem in enumerate(problems):
problem_df = df[df['negativereason'] == problem]
sentiment_avg = problem_df['airline_sentiment_confidence'].mean()
nodes.append({
'id': problem,
'label': problem,
'size': len(problem_df),
'sentiment': sentiment_avg
})
# 创建关联边(基于共同出现)
for i, problem1 in enumerate(problems):
for j, problem2 in enumerate(problems):
if i < j:
# 计算关联强度(简单实现)
co_occurrence = len(df[
(df['negativereason'] == problem1) |
(df['negativereason'] == problem2)
])
if co_occurrence > 10: # 阈值
edges.append({
'source': problem1,
'target': problem2,
'weight': co_occurrence
})
# 生成洞察
insights = self._generate_insights(df)
return ServiceExperienceMap(
节点=nodes,
=edges,
洞察=insights
)
def _generate_insights(self, df: pd.DataFrame) -> List[ServiceExperienceInsight]:
"""生成关键洞察"""
insights = []
# 分析最常见的负面原因
top_problems = df['negativereason'].value_counts().head(5)
for problem, count in top_problems.items():
problem_df = df[df['negativereason'] == problem]
# 分析相关推文的情感强度
sentiment_scores = problem_df['text'].apply(self.analyze_sentiment_intensity)
avg_sentiment = sentiment_scores.mean()
# 检测隐性不满
implicit_scores = problem_df['text'].apply(
lambda x: self.detect_implicit_dissatisfaction(x)[1]
)
avg_implicit = implicit_scores.mean()
# 提取典型推文
typical_tweets = problem_df['text'].head(3).tolist()
insight = ServiceExperienceInsight(
主题名称=problem,
关键词=[problem],
情感强度=avg_sentiment,
问题类型=problem,
隐性不满指标=avg_implicit,
关联问题=[],
典型推文=typical_tweets
)
insights.append(insight)
return insights
def decode_experience_dna(self, tweet_text: str) -> DNAResult:
"""解码单条推文的体验DNA"""
if self.lda_model is None:
self.train_lda_model()
# 预处理文本
processed_text = self.preprocess_text(tweet_text)
# 主题分析
if self.vectorizer is not None:
X = self.vectorizer.transform([processed_text])
topic_distribution = self.lda_model.transform(X)[0]
# 转换为字典格式
topic_dist_dict = {}
for topic_idx, prob in enumerate(topic_distribution):
if topic_idx in self.topics:
topic_name = self.topics[topic_idx]['name']
topic_dist_dict[topic_name] = float(prob)
else:
topic_dist_dict = {}
# 情感强度分析
sentiment_intensity = self.analyze_sentiment_intensity(tweet_text)
# 隐性不满检测
is_implicit, implicit_confidence = self.detect_implicit_dissatisfaction(tweet_text)
# 构建体验DNA
experience_dna = ExperienceDNA(
推文ID="current_tweet",
主题分布=topic_dist_dict,
情感强度=sentiment_intensity,
隐性不满=is_implicit,
问题关联=list(topic_dist_dict.keys()),
关键洞察=self._generate_tweet_insight(tweet_text, sentiment_intensity, is_implicit)
)
# 加载数据生成整体洞察和体验图谱
df = self.load_data()
overall_insights = self._generate_insights(df)
experience_map = self.build_experience_map(df)
# 生成处置建议
recommendations = self._generate_recommendations(experience_dna)
return DNAResult(
推文分析=experience_dna,
整体洞察=overall_insights,
体验图谱=experience_map,
处置建议=recommendations
)
def _generate_tweet_insight(self, text: str, sentiment: float, is_implicit: bool) -> str:
"""生成推文关键洞察"""
insights = []
if sentiment < -0.5:
insights.append("客户表现出强烈不满情绪")
elif sentiment < -0.1:
insights.append("客户存在轻微不满")
if is_implicit:
insights.append("存在隐性不满迹象")
if 'delay' in text.lower():
insights.append("涉及延误问题")
if 'food' in text.lower():
insights.append("涉及餐饮服务")
if 'baggage' in text.lower():
insights.append("涉及行李问题")
return "; ".join(insights) if insights else "无明显问题特征"
def _generate_recommendations(self, dna: ExperienceDNA) -> List[str]:
"""生成处置建议"""
recommendations = []
if dna.情感强度 < -0.5:
recommendations.append("立即联系客户,表达歉意并了解具体情况")
elif dna.情感强度 < -0.1:
recommendations.append("主动跟进客户反馈,提供解决方案")
if dna.隐性不满:
recommendations.append("注意客户可能未直接表达的深层不满")
if '延误问题' in dna.问题关联:
recommendations.append("提供延误补偿方案和后续航班安排")
if '行李问题' in dna.问题关联:
recommendations.append("启动行李追踪和赔偿流程")
if '餐饮服务' in dna.问题关联:
recommendations.append("收集具体反馈改进餐饮质量")
return recommendations
def create_dna_decoder() -> DNADecoder:
"""创建DNA解码器实例"""
return DNADecoder()

View File

@ -0,0 +1,921 @@
"""智能客服优化系统 - 基于意图识别和实时NLP"""
import os
import asyncio
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
from pydantic import BaseModel, Field
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import re
from datetime import datetime
from collections import defaultdict
from pydantic_ai import Agent
# 加载环境变量
load_dotenv()
class IntentAnalysis(BaseModel):
"""意图分析结果"""
主要意图: str = Field(description="主要意图分类")
紧急程度: str = Field(description="紧急程度(高/中/低)")
问题类型: str = Field(description="问题类型分类")
情感强度: float = Field(description="情感强度得分")
关键词: List[str] = Field(description="关键识别词")
置信度: float = Field(description="分析置信度")
是否为易流失客户: str = Field(description="是否为易流失客户群体(高/中/低风险)")
流失风险评分: float = Field(description="流失风险评分(0-100)")
流失风险原因: List[str] = Field(description="流失风险原因分析")
class RoutingDecision(BaseModel):
"""路由决策结果"""
处理方式: str = Field(description="处理方式(自动回复/人工介入/升级处理)")
优先级: str = Field(description="处理优先级")
预计响应时间: str = Field(description="预计响应时间")
推荐部门: str = Field(description="推荐处理部门")
路由理由: str = Field(description="路由决策理由")
class ResponseSuggestion(BaseModel):
"""回复建议"""
开场白: str = Field(description="对话开场白")
核心回复: str = Field(description="核心问题回复")
安抚语句: str = Field(description="安抚客户语句")
解决方案: List[str] = Field(description="具体解决方案")
结束语: str = Field(description="对话结束语")
个性化建议: str = Field(description="个性化建议")
class ConversationPattern(BaseModel):
"""对话模式"""
模式类型: str = Field(description="模式类型")
成功案例: List[str] = Field(description="成功案例")
关键话术: List[str] = Field(description="关键话术")
适用场景: List[str] = Field(description="适用场景")
效果评估: float = Field(description="效果评估得分")
class SmartCustomerService:
"""智能客服优化系统"""
def __init__(self, data_path: str = "data/Tweets.csv"):
self.data_path = data_path
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
self.intent_classifier = None
self.routing_model = None
self.conversation_patterns = {}
self.success_cases = self._load_success_cases()
# 意图分类标签
self.intent_categories = [
"投诉", "咨询", "建议", "表扬", "紧急求助", "一般问题"
]
# 问题类型分类
self.problem_types = [
"航班延误", "行李问题", "服务态度", "座位问题", "餐饮质量",
"登机流程", "取消航班", "超售问题", "预订问题", "其他"
]
# 路由决策规则
self.routing_rules = {
"紧急求助": {"处理方式": "升级处理", "优先级": "最高", "响应时间": "立即"},
"投诉": {"处理方式": "人工介入", "优先级": "", "响应时间": "30分钟内"},
"咨询": {"处理方式": "自动回复", "优先级": "", "响应时间": "1小时内"},
"建议": {"处理方式": "自动回复", "优先级": "", "响应时间": "24小时内"},
"表扬": {"处理方式": "自动回复", "优先级": "", "响应时间": "24小时内"}
}
def _load_success_cases(self) -> List[Dict]:
"""加载成功案例数据"""
# 模拟成功案例数据
return [
{
"text": "航班延误了3小时很失望",
"response": "非常抱歉给您带来不便。我们已为您安排优先登机和里程补偿。",
"satisfaction": 0.8,
"pattern": "延误安抚+补偿"
},
{
"text": "行李被损坏了,怎么办?",
"response": "很遗憾听到这个消息。请立即联系行李服务台,我们将为您处理赔偿。",
"satisfaction": 0.9,
"pattern": "问题解决+赔偿"
}
]
def train_intent_classifier(self) -> None:
"""训练意图识别分类器"""
# 创建更丰富的训练数据(支持中英文)
training_data = [
# 投诉类 - 更多样本
("航班延误了3小时没有任何解释", "投诉"),
("我的行李被损坏了,非常生气", "投诉"),
("客服态度很差,很不满意", "投诉"),
("服务太差了,再也不坐了", "投诉"),
("飞机晚点4小时没有任何补偿", "投诉"),
("座位不舒服,飞行体验很糟糕", "投诉"),
("餐饮质量很差,难以下咽", "投诉"),
("delay flight 3 hours no explanation", "投诉"),
("my luggage damaged very angry", "投诉"),
# 紧急求助类 - 更多样本
("紧急求助,航班取消了怎么办", "紧急求助"),
("救命,行李丢失了", "紧急求助"),
("紧急情况,护照丢失了", "紧急求助"),
("urgent help flight cancelled what to do", "紧急求助"),
("help lost luggage emergency", "紧急求助"),
# 咨询类 - 更多样本
("请问航班什么时候起飞", "咨询"),
("行李托运有什么规定", "咨询"),
("如何办理登机手续", "咨询"),
("航班改签需要什么手续", "咨询"),
("what time does the flight depart", "咨询"),
("baggage allowance rules", "咨询"),
# 建议类
("建议改进登机流程", "建议"),
("希望增加航班频次", "建议"),
("suggest improve boarding process", "建议"),
("hope increase flight frequency", "建议"),
# 表扬类
("服务很好,非常满意", "表扬"),
("空乘人员态度很好", "表扬"),
("service excellent very satisfied", "表扬"),
("flight attendant very friendly", "表扬"),
]
texts = [data[0] for data in training_data]
intent_labels = [data[1] for data in training_data]
print(f"训练数据数量: {len(texts)}")
print(f"意图类别分布: {dict(zip(*np.unique(intent_labels, return_counts=True)))}")
# 特征提取
X = self.vectorizer.fit_transform(texts)
print(f"特征维度: {X.shape}")
# 训练分类器
self.intent_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
self.intent_classifier.fit(X, intent_labels)
# 打印分类器信息
print(f"分类器类别: {self.intent_classifier.classes_}")
print("分类器训练完成!")
async def analyze_intent_deepseek(self, text: str) -> IntentAnalysis:
"""深度意图分析 - 完全基于DeepSeek思考"""
# 使用DeepSeek进行深度意图分析
deep_analysis = await analyze_deep_intent(text)
# 提取DeepIntentAnalysis对象
if hasattr(deep_analysis, 'data'):
deep_analysis = deep_analysis.data
elif hasattr(deep_analysis, 'result'):
deep_analysis = deep_analysis.result
elif hasattr(deep_analysis, 'output'):
deep_analysis = deep_analysis.output
elif hasattr(deep_analysis, 'value'):
deep_analysis = deep_analysis.value
# 转换为IntentAnalysis对象
return IntentAnalysis(
主要意图=deep_analysis.主要意图,
紧急程度=deep_analysis.紧急程度,
问题类型=deep_analysis.问题类型,
情感强度=deep_analysis.情感强度,
关键词=deep_analysis.关键词,
置信度=deep_analysis.置信度,
是否为易流失客户=deep_analysis.是否为易流失客户,
流失风险评分=deep_analysis.流失风险评分,
流失风险原因=deep_analysis.流失风险原因
)
def analyze_intent(self, text: str) -> IntentAnalysis:
"""分析用户意图(基于关键词的可靠方法)"""
text_lower = text.lower()
# 基于关键词的意图识别(更可靠的方法)
intent_keywords = {
"投诉": [
# 航班延误相关
"延误", "晚点", "延迟", "误点", "推迟", "取消", "改签", "航班取消", "航班延误",
"起飞晚点", "到达晚点", "长时间延误", "延误赔偿", "延误补偿",
# 服务态度相关
"态度差", "服务差", "态度恶劣", "服务态度", "客服态度", "空乘态度", "不耐烦",
"冷漠", "不专业", "不负责任", "推诿", "扯皮", "敷衍", "应付", "白眼", "白了一眼",
"白了我一眼", "翻白眼", "瞪眼", "瞪了一眼", "瞪了我一眼", "不耐烦", "爱理不理",
"不理不睬", "冷眼相待", "态度冷淡", "态度傲慢", "态度恶劣", "态度蛮横", "态度粗暴",
"服务不周", "服务不到位", "服务态度差", "服务态度恶劣", "服务态度不好",
"空乘态度差", "空乘态度恶劣", "空乘服务差", "空乘不耐烦", "空乘冷漠",
"空乘不专业", "空乘不负责任", "空乘推诿", "空乘敷衍", "空乘应付",
"空乘白眼", "空乘翻白眼", "空乘瞪眼", "空乘爱理不理", "空乘不理不睬",
"空乘冷眼相待", "空乘态度冷淡", "空乘态度傲慢", "空乘态度蛮横",
"空乘态度粗暴", "空乘服务不周", "空乘服务不到位", "空乘服务态度差",
"空乘服务态度恶劣", "空乘服务态度不好", "乘务员态度差", "乘务员态度恶劣",
"乘务员服务差", "乘务员不耐烦", "乘务员冷漠", "乘务员不专业",
"乘务员不负责任", "乘务员推诿", "乘务员敷衍", "乘务员应付",
"乘务员白眼", "乘务员翻白眼", "乘务员瞪眼", "乘务员爱理不理",
"乘务员不理不睬", "乘务员冷眼相待", "乘务员态度冷淡", "乘务员态度傲慢",
"乘务员态度蛮横", "乘务员态度粗暴", "乘务员服务不周", "乘务员服务不到位",
"乘务员服务态度差", "乘务员服务态度恶劣", "乘务员服务态度不好",
"rude", "impolite", "unfriendly", "bad attitude", "poor service", "unprofessional",
"disrespectful", "ignored", "snubbed", "looked down upon", "treated badly",
# 餐饮质量相关
"太难吃", "难吃", "不好吃", "味道差", "餐饮质量", "飞机餐", "餐食", "饮料",
"食物", "饭菜", "难以下咽", "吃不下去", "口味差", "质量差", "不新鲜",
# 行李问题相关
"行李损坏", "行李丢失", "行李延误", "托运问题", "行李破损", "行李超重",
"行李超规", "行李赔偿", "行李找不到", "行李被撬", "行李被开",
# 座位舒适度相关
"座位不舒服", "座位窄", "空间小", "腿部空间", "座椅", "靠背", "座位间距",
"拥挤", "不舒服", "难受", "坐得不舒服", "座位问题",
# 登机流程相关
"登机慢", "排队时间长", "安检慢", "值机慢", "登机流程", "候机时间长",
"登机口变更", "登机混乱", "秩序差", "管理混乱",
# 整体体验相关
"体验差", "体验糟糕", "很差", "太差", "糟糕透顶", "失望", "不满意", "生气",
"愤怒", "恼火", "无语", "崩溃", "绝望", "后悔", "再也不坐", "换航空公司",
"投诉到底", "非常失望", "极度不满", "糟糕体验", "差评", "差劲",
# 英文关键词
"delay", "cancel", "damage", "lost", "angry", "terrible", "awful", "disgusting",
"bad service", "poor experience", "terrible food", "uncomfortable", "crowded",
"slow boarding", "long queue", "bad attitude", "rude staff", "horrible"
],
"紧急求助": [
"紧急", "求助", "救命", "怎么办", "急事", "突发事件", "紧急情况", "急需帮助",
"urgent", "help", "emergency", "what to do", "need help", "urgent help"
],
"咨询": [
"请问", "什么", "如何", "多少", "什么时候", "哪里", "能否", "是否可以",
"when", "what", "how", "where", "which", "can", "could", "would"
],
"建议": [
"建议", "希望", "改进", "建议改进", "提个建议", "希望改进", "建议增加",
"suggest", "hope", "improve", "recommend", "advice"
],
"表扬": [
"很好", "满意", "优秀", "感谢", "很棒", "非常好", "很满意", "服务很好",
"态度很好", "体验很好", "值得表扬", "点赞", "好评", "推荐", "下次还坐",
"great", "excellent", "satisfied", "thank", "wonderful", "fantastic", "amazing"
]
}
# 计算每个意图的匹配分数
intent_scores = {}
for intent, keywords in intent_keywords.items():
score = 0
for keyword in keywords:
# 使用更灵活的匹配方式:包含匹配和中文分词匹配
# 中文文本需要特殊处理因为split()对中文分词效果不好
if keyword in text_lower:
score += 1
intent_scores[intent] = score
# 情感强度分析(先执行,因为后面需要用到)
sentiment_intensity = self._analyze_sentiment_intensity(text)
# 选择得分最高的意图(考虑意图优先级)
max_score = max(intent_scores.values())
if max_score > 0:
# 找到得分最高的意图
best_intents = [intent for intent, score in intent_scores.items() if score == max_score]
# 如果有多个意图得分相同,按优先级选择
intent_priority = ["紧急求助", "投诉", "咨询", "建议", "表扬"]
# 按优先级排序最佳意图
best_intents_sorted = sorted(best_intents,
key=lambda x: intent_priority.index(x) if x in intent_priority else len(intent_priority))
intent = best_intents_sorted[0] # 选择优先级最高的意图
confidence = min(0.99, max_score / len(intent_keywords[intent]))
else:
# 没有匹配到关键词,根据情感强度判断意图
if sentiment_intensity < -0.3:
intent = "投诉" # 负面情感默认为投诉
confidence = 0.6
else:
intent = "咨询"
confidence = 0.3
# 关键词提取
keywords = self._extract_keywords(text)
# 问题类型分类
problem_type = self._classify_problem_type(text)
# 紧急程度评估
urgency_level = self._assess_urgency(text, intent, sentiment_intensity)
# 流失客户群体分析
churn_risk_result = self._analyze_churn_risk(text, intent, sentiment_intensity, urgency_level)
# 调试信息
print(f"=== 意图分析结果 ===")
print(f"输入文本: {text}")
print(f"意图得分: {intent_scores}")
print(f"最终意图: {intent}")
print(f"置信度: {confidence:.2f}")
print(f"情感强度: {sentiment_intensity:.2f}")
print(f"流失风险: {churn_risk_result['风险等级']} (评分: {churn_risk_result['风险评分']})")
print(f"风险原因: {churn_risk_result['风险原因']}")
return IntentAnalysis(
主要意图=intent,
紧急程度=urgency_level,
问题类型=problem_type,
情感强度=sentiment_intensity,
关键词=keywords,
置信度=confidence,
是否为易流失客户=churn_risk_result['风险等级'],
流失风险评分=churn_risk_result['风险评分'],
流失风险原因=churn_risk_result['风险原因']
)
def make_routing_decision(self, intent_analysis: IntentAnalysis) -> RoutingDecision:
"""制定路由决策"""
# 基于意图和紧急程度制定路由决策
base_decision = self.routing_rules.get(intent_analysis.主要意图, {
"处理方式": "人工介入", "优先级": "", "响应时间": "2小时内"
})
# 根据紧急程度调整决策
if intent_analysis.紧急程度 == "":
base_decision["处理方式"] = "升级处理"
base_decision["优先级"] = "最高"
base_decision["响应时间"] = "立即"
elif intent_analysis.紧急程度 == "":
if base_decision["处理方式"] == "自动回复":
base_decision["处理方式"] = "人工介入"
# 推荐处理部门
department = self._recommend_department(intent_analysis.问题类型)
# 路由理由
routing_reason = self._generate_routing_reason(intent_analysis, base_decision)
return RoutingDecision(
处理方式=base_decision["处理方式"],
优先级=base_decision["优先级"],
预计响应时间=base_decision["响应时间"],
推荐部门=department,
路由理由=routing_reason
)
def generate_response_suggestions(self, text: str, intent_analysis: IntentAnalysis,
routing_decision: RoutingDecision) -> ResponseSuggestion:
"""生成回复建议"""
# 基于意图和问题类型生成回复
opening = self._generate_opening(intent_analysis)
core_response = self._generate_core_response(text, intent_analysis)
comforting_phrase = self._generate_comforting_phrase(intent_analysis)
solutions = self._generate_solutions(intent_analysis)
closing = self._generate_closing(intent_analysis)
personalization = self._generate_personalization(intent_analysis)
return ResponseSuggestion(
开场白=opening,
核心回复=core_response,
安抚语句=comforting_phrase,
解决方案=solutions,
结束语=closing,
个性化建议=personalization
)
def learn_conversation_patterns(self) -> Dict[str, ConversationPattern]:
"""学习对话模式"""
patterns = {}
# 分析成功案例,提取模式
for case in self.success_cases:
pattern_type = case["pattern"]
if pattern_type not in patterns:
patterns[pattern_type] = ConversationPattern(
模式类型=pattern_type,
成功案例=[case["response"]],
关键话术=self._extract_key_phrases(case["response"]),
适用场景=[case["text"]],
效果评估=case["satisfaction"]
)
else:
patterns[pattern_type].成功案例.append(case["response"])
patterns[pattern_type].适用场景.append(case["text"])
return patterns
def _preprocess_text(self, text: str) -> str:
"""文本预处理(支持中英文)"""
if pd.isna(text):
return ""
text = str(text).lower()
# 只移除英文标点,保留中文字符
text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)
text = re.sub(r'\s+', ' ', text) # 移除多余空格
return text.strip()
def _extract_keywords(self, text: str) -> List[str]:
"""提取关键词(支持中英文)"""
keywords = []
# 问题相关关键词(中英文)
problem_keywords = [
'delay', 'cancel', 'baggage', 'service', 'seat', 'food',
'延误', '取消', '行李', '服务', '座位', '餐饮', '客服', '态度'
]
for keyword in problem_keywords:
if keyword in text.lower():
keywords.append(keyword)
# 情感相关关键词(中英文)
sentiment_keywords = [
'terrible', 'awful', 'disappointing', 'great', 'excellent',
'生气', '失望', '满意', '很好', '优秀'
]
for keyword in sentiment_keywords:
if keyword in text.lower():
keywords.append(keyword)
return keywords[:5] # 返回前5个关键词
def _analyze_sentiment_intensity(self, text: str) -> float:
"""分析情感强度(支持中英文)"""
text_lower = text.lower()
# 强烈负面情感词(中英文)
strong_negative = ['terrible', 'awful', 'horrible', 'disgusting', 'hate',
'非常生气', '极度不满', '糟糕透顶', '恶心', '', '愤怒']
# 一般负面情感词
negative = ['bad', 'poor', 'disappointing', 'frustrated',
'不满意', '失望', '生气', '糟糕', '差劲']
# 正面情感词
positive = ['great', 'excellent', 'wonderful', 'satisfied',
'很好', '优秀', '满意', '不错', '喜欢']
intensity = 0.0
# 检查强烈负面词
for word in strong_negative:
if word in text_lower:
intensity -= 0.8
break # 找到强烈负面词就停止
# 检查一般负面词
for word in negative:
if word in text_lower:
intensity -= 0.4
break
# 检查正面词
for word in positive:
if word in text_lower:
intensity += 0.6
break
# 考虑感叹号和重复字符(中文和英文)
exclamation_count = text.count('!') + text.count('')
if exclamation_count > 0:
intensity -= min(0.3, exclamation_count * 0.1)
# 考虑负面程度副词
negative_modifiers = ['非常', '特别', '极其', '十分', '']
for modifier in negative_modifiers:
if modifier in text:
intensity -= 0.2
break
# 考虑正面程度副词
positive_modifiers = ['非常', '特别', '极其', '十分', '']
for modifier in positive_modifiers:
if modifier in text and intensity > 0:
intensity += 0.2
break
# 如果没有检测到情感词,根据内容判断
if intensity == 0.0:
# 检查是否包含问题描述
problem_words = ['延误', '取消', '损坏', '丢失', '态度差', '问题']
if any(word in text for word in problem_words):
intensity = -0.3 # 默认轻微负面
else:
intensity = 0.1 # 中性或轻微正面
return max(-1.0, min(1.0, intensity))
def _classify_problem_type(self, text: str) -> str:
"""分类问题类型(支持中英文,支持复合问题识别)"""
text_lower = text.lower()
# 扩展问题类型映射,包含更多关键词
problem_mapping = {
"航班延误": ['delay', 'late', 'waiting', '延误', '晚点', '延迟', '误点', '推迟', '取消', '改签'],
"行李问题": ['baggage', 'luggage', 'suitcase', '行李', '托运', '箱子', '损坏', '丢失', '延误'],
"服务态度": ['service', 'attitude', 'rude', '服务', '态度', '客服', '空乘', '乘务员',
'白眼', '翻白眼', '瞪眼', '不耐烦', '冷漠', '不专业', '不负责任', '敷衍', '应付'],
"座位问题": ['seat', 'comfortable', 'space', '座位', '舒适', '空间', '', '拥挤', '不舒服', '', '异味'],
"餐饮质量": ['food', 'meal', 'drink', '餐饮', '食物', '饮料', '难吃', '味道差', '质量差', '不新鲜']
}
# 检测所有匹配的问题类型
detected_problems = []
for problem_type, keywords in problem_mapping.items():
if any(keyword in text_lower for keyword in keywords):
detected_problems.append(problem_type)
# 根据匹配数量返回结果
if len(detected_problems) == 0:
return "其他"
elif len(detected_problems) == 1:
return detected_problems[0]
else:
# 返回复合问题描述
return "复合问题:" + "".join(detected_problems)
def _assess_urgency(self, text: str, intent: str, sentiment: float) -> str:
"""评估紧急程度"""
# 基于意图和情感强度评估紧急程度
if intent == "紧急求助":
return ""
elif sentiment < -0.5:
return ""
elif sentiment < -0.2:
return ""
else:
return ""
def _analyze_churn_risk(self, text: str, intent: str, sentiment: float, urgency: str) -> Dict:
"""分析流失客户风险"""
text_lower = text.lower()
# 流失风险评分0-100分
risk_score = 0
risk_reasons = []
# 1. 情感强度因素权重40%
if sentiment < -0.7:
risk_score += 40
risk_reasons.append("情感强度极高(强烈不满)")
elif sentiment < -0.3:
risk_score += 25
risk_reasons.append("情感强度较高(明显不满)")
elif sentiment < 0:
risk_score += 10
risk_reasons.append("情感强度偏低(轻微不满)")
# 2. 意图类型因素权重30%
if intent == "投诉":
risk_score += 30
risk_reasons.append("客户提出正式投诉")
elif intent == "紧急求助":
risk_score += 20
risk_reasons.append("客户需要紧急帮助")
elif intent == "建议":
risk_score += 5
risk_reasons.append("客户提出改进建议")
# 3. 紧急程度因素权重20%
if urgency == "":
risk_score += 20
risk_reasons.append("问题紧急程度高")
elif urgency == "":
risk_score += 10
risk_reasons.append("问题紧急程度中等")
# 4. 关键词因素权重10%
churn_keywords = [
"再也不坐", "取消会员", "换航空公司", "投诉到底", "非常失望",
"never again", "cancel membership", "switch airline", "file complaint"
]
for keyword in churn_keywords:
if keyword in text_lower:
risk_score += 10
risk_reasons.append(f"包含流失关键词:{keyword}")
break
# 5. 问题重复性因素(额外加分)
problem_count = len([word for word in ['延误', '取消', '损坏', '丢失', '态度差']
if word in text])
if problem_count > 1:
risk_score += min(15, problem_count * 5)
risk_reasons.append(f"涉及多个问题点({problem_count}个)")
# 确定风险等级
if risk_score >= 70:
risk_level = "高风险"
elif risk_score >= 40:
risk_level = "中风险"
else:
risk_level = "低风险"
# 如果没有检测到风险因素,添加默认说明
if not risk_reasons:
risk_reasons.append("客户情绪稳定,无明显流失风险")
return {
"风险等级": risk_level,
"风险评分": min(100, risk_score),
"风险原因": risk_reasons
}
def _recommend_department(self, problem_type: str) -> str:
"""推荐处理部门"""
department_mapping = {
"航班延误": "运营部门",
"行李问题": "行李服务部",
"服务态度": "客户服务部",
"座位问题": "客舱服务部",
"餐饮质量": "餐饮服务部",
"其他": "综合服务部"
}
return department_mapping.get(problem_type, "综合服务部")
def _generate_routing_reason(self, intent_analysis: IntentAnalysis, decision: Dict) -> str:
"""生成路由理由"""
reasons = []
if intent_analysis.紧急程度 == "":
reasons.append("问题紧急程度高")
if intent_analysis.主要意图 == "投诉":
reasons.append("客户投诉需要专业处理")
if intent_analysis.情感强度 < -0.5:
reasons.append("客户情感强烈,需要安抚")
return "; ".join(reasons) if reasons else "标准处理流程"
def _generate_opening(self, intent_analysis: IntentAnalysis) -> str:
"""生成开场白"""
openings = {
"投诉": "非常抱歉给您带来不便",
"紧急求助": "我们立即为您处理",
"咨询": "感谢您的咨询",
"建议": "感谢您的宝贵建议",
"表扬": "感谢您的认可和鼓励"
}
return openings.get(intent_analysis.主要意图, "感谢您联系我们")
def _generate_core_response(self, text: str, intent_analysis: IntentAnalysis) -> str:
"""生成核心回复"""
# 基于问题类型生成针对性回复
responses = {
"航班延误": "我们正在密切关注航班状态,将及时为您提供最新信息",
"行李问题": "我们已记录您的行李问题,将安排专人跟进处理",
"服务态度": "我们非常重视服务品质,将加强相关人员培训",
"座位问题": "我们将优化座位分配流程,提升乘坐舒适度"
}
return responses.get(intent_analysis.问题类型, "我们已收到您的反馈,将尽快处理")
def _generate_comforting_phrase(self, intent_analysis: IntentAnalysis) -> str:
"""生成安抚语句"""
if intent_analysis.情感强度 < -0.3:
return "我们理解您的心情,将尽力解决您的问题"
else:
return "感谢您的耐心和理解"
def _generate_solutions(self, intent_analysis: IntentAnalysis) -> List[str]:
"""生成解决方案"""
solutions_map = {
"航班延误": ["提供实时航班信息", "安排优先登机", "提供餐饮补偿"],
"行李问题": ["启动行李追踪", "安排赔偿流程", "提供临时用品"],
"服务态度": ["安排专人回访", "提供服务改进", "给予适当补偿"]
}
return solutions_map.get(intent_analysis.问题类型, ["记录问题并跟进", "提供解决方案"])
def _generate_closing(self, intent_analysis: IntentAnalysis) -> str:
"""生成结束语"""
if intent_analysis.主要意图 == "投诉":
return "再次为给您带来的不便表示歉意,我们将持续改进服务"
else:
return "感谢您选择我们,期待再次为您服务"
def _generate_personalization(self, intent_analysis: IntentAnalysis) -> str:
"""生成个性化建议"""
if intent_analysis.情感强度 < -0.5:
return "建议客服人员使用更温和的语气,重点安抚客户情绪"
elif intent_analysis.紧急程度 == "":
return "建议立即响应,避免问题升级"
else:
return "标准处理流程,保持专业服务态度"
def _extract_key_phrases(self, text: str) -> List[str]:
"""提取关键话术"""
# 简单提取重要短语
phrases = []
important_words = ['抱歉', '感谢', '立即', '解决', '补偿', '改进']
for word in important_words:
if word in text:
# 提取包含关键词的短语
start = max(0, text.find(word) - 10)
end = min(len(text), text.find(word) + 20)
phrases.append(text[start:end].strip())
return phrases[:3] # 返回前3个关键话术
def load_data(self) -> pd.DataFrame:
"""加载数据"""
df = pd.read_csv(self.data_path)
return df
# 创建深度意图分析Agent
class DeepIntentAnalysis(BaseModel):
"""深度意图分析结果"""
主要意图: str = Field(description="主要意图分类")
紧急程度: str = Field(description="紧急程度(高/中/低)")
问题类型: str = Field(description="问题类型分类")
情感强度: float = Field(description="情感强度得分")
关键词: List[str] = Field(description="关键识别词")
置信度: float = Field(description="分析置信度")
是否为易流失客户: str = Field(description="是否为易流失客户群体(高/中/低风险)")
流失风险评分: float = Field(description="流失风险评分(0-100)")
流失风险原因: List[str] = Field(description="流失风险原因分析")
深度分析说明: str = Field(description="深度分析说明和推理过程")
# 创建智能客服Agent
class SmartServiceAnalysis(BaseModel):
"""智能服务分析结果"""
意图深度分析: str = Field(description="意图深度分析")
最佳处理策略: str = Field(description="最佳处理策略")
话术优化建议: str = Field(description="话术优化建议")
预期效果: str = Field(description="预期效果评估")
改进机会: str = Field(description="服务改进机会")
# 创建深度意图分析Agent
deep_intent_agent = Agent(
'deepseek:deepseek-chat',
system_prompt="""
你是专业的客户意图分析专家专门分析航空服务领域的客户反馈
你的任务是基于客户反馈进行深度意图分析包括
1. 准确识别客户的主要意图和真实需求
2. 评估问题的紧急程度和情感强度
3. 分析客户流失风险和潜在问题
4. 提供深度分析和推理过程
分析要点
- 理解客户的情感表达和隐含需求
- 考虑航空服务的特殊性航班延误服务态度座位舒适度等
- 基于上下文进行综合判断
- 提供详细的分析说明和推理过程
输出必须是结构化的JSON格式
""",
output_type=DeepIntentAnalysis
)
smart_service_agent = Agent(
'deepseek:deepseek-chat',
system_prompt="""
你是智能客服专家基于NLP和机器学习技术优化客户服务流程
你的任务
1. 深度分析客户意图和情感状态
2. 制定最优处理策略和话术
3. 预测服务效果和改进机会
4. 提供基于数据的最佳实践建议
分析要点
- 关注客户真实需求和情感表达
- 考虑服务效率和客户满意度平衡
- 基于历史成功案例优化策略
- 提供可操作的具体建议
输出必须是结构化的JSON格式
""",
output_type=SmartServiceAnalysis
)
async def analyze_deep_intent(customer_text: str) -> DeepIntentAnalysis:
"""深度意图分析 - 完全基于DeepSeek思考"""
result = await deep_intent_agent.run(f"""
请对以下航空服务客户反馈进行深度意图分析
客户反馈: "{customer_text}"
请进行深度分析
1. 主要意图识别投诉咨询建议表扬紧急求助
2. 紧急程度评估
3. 问题类型分类航班延误服务态度座位问题餐饮质量行李问题等
4. 情感强度分析-1.0极度负面到1.0极度正面
5. 关键词提取从客户反馈中提取关键识别词
6. 流失风险评估分析客户流失风险和原因
7. 深度分析说明详细说明分析过程和推理
请基于深度思考不要简单依赖关键词匹配
""")
return result
# 创建回复话术生成Agent
class CustomerReply(BaseModel):
"""客户回复话术"""
开场白: str = Field(description="开场白和问候语")
问题确认: str = Field(description="问题确认和共情表达")
解决方案: str = Field(description="具体解决方案和措施")
补偿措施: str = Field(description="补偿或安抚措施")
结束语: str = Field(description="结束语和后续跟进")
个性化调整: str = Field(description="个性化调整建议")
reply_agent = Agent(
'deepseek:deepseek-chat',
system_prompt="""
你是专业的客服话术专家专门为航空服务设计直接可用的客户回复话术
你的任务是基于智能分析结果生成可以直接复制粘贴使用的客户回复话术
1. 开场白和问候语 - 专业友好的开场
2. 问题确认和共情 - 理解客户感受
3. 具体解决方案 - 明确可行的解决措施
4. 补偿措施 - 适当的补偿或安抚
5. 结束语 - 专业的结束和后续安排
6. 个性化调整 - 根据具体情况微调
话术要求
- 专业友好共情
- 具体可行可操作
- 符合航空服务行业标准
- 可以直接复制使用
输出必须是结构化的JSON格式
""",
output_type=CustomerReply
)
async def generate_customer_reply(customer_text: str, intent_analysis: IntentAnalysis,
smart_analysis: SmartServiceAnalysis) -> CustomerReply:
"""生成直接可用的客户回复话术"""
result = await reply_agent.run(f"""
请基于以下智能分析结果生成可以直接复制使用的客户回复话术
客户反馈: "{customer_text}"
意图分析结果:
- 主要意图: {intent_analysis.主要意图}
- 紧急程度: {intent_analysis.紧急程度}
- 问题类型: {intent_analysis.问题类型}
- 情感强度: {intent_analysis.情感强度:.2f}
- 流失风险: {intent_analysis.是否为易流失客户}
深度智能分析:
- 意图深度分析: {smart_analysis.意图深度分析}
- 最佳处理策略: {smart_analysis.最佳处理策略}
- 话术优化建议: {smart_analysis.话术优化建议}
- 预期效果: {smart_analysis.预期效果}
请生成可以直接复制粘贴使用的完整回复话术
""")
return result
async def analyze_smart_service_strategy(customer_text: str, intent_analysis: IntentAnalysis) -> SmartServiceAnalysis:
"""分析智能服务策略"""
result = await smart_service_agent.run(f"""
请基于以下客户反馈和意图分析提供智能服务策略
客户反馈: "{customer_text}"
意图分析结果:
- 主要意图: {intent_analysis.主要意图}
- 紧急程度: {intent_analysis.紧急程度}
- 问题类型: {intent_analysis.问题类型}
- 情感强度: {intent_analysis.情感强度:.2f}
- 关键词: {', '.join(intent_analysis.关键词)}
请提供
1. 意图深度分析和客户真实需求识别
2. 最佳处理策略和优先级安排
3. 话术优化建议和个性化调整
4. 预期服务效果评估
5. 服务改进机会和最佳实践
""")
return result

1243
bigwork/src/streamlit_app.py Normal file

File diff suppressed because it is too large Load Diff

179
bigwork/src/train.py Normal file
View File

@ -0,0 +1,179 @@
"""航空公司情感分析模型训练模块"""
from pathlib import Path
import joblib
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, f1_score, recall_score
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from xgboost import XGBClassifier
from sklearn.neural_network import MLPClassifier
from src.data import DataProcessor, get_feature_columns
MODELS_DIR = Path("models")
MODEL_PATH = MODELS_DIR / "sentiment_model.pkl"
def get_pipeline(model_type: str = "rf") -> Pipeline:
"""构建sklearn处理流水线
1. 数值特征 -> 缺失填充 (均值) -> 标准化
2. 类别特征 -> 缺失填充 (众数) -> OneHot编码
3. 文本特征 -> TF-IDF向量化 (n-gram: 1-2, 最大特征: 5000)
4. 模型 -> 多种算法对比
"""
numeric_features, categorical_features = get_feature_columns()
# 数值处理管道
numeric_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="mean")),
("scaler", StandardScaler()),
]
)
# 类别处理管道
categorical_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)
# 文本处理管道
text_transformer = TfidfVectorizer(
max_features=5000,
ngram_range=(1, 2),
stop_words='english'
)
# 组合预处理
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
("text", text_transformer, "cleaned_text"),
]
)
# 选择模型(多算法对比)
if model_type == "lr":
clf = LogisticRegression(random_state=42, max_iter=1000)
elif model_type == "rf":
clf = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
elif model_type == "svm":
clf = SVC(kernel='linear', probability=True, random_state=42)
elif model_type == "xgb":
clf = XGBClassifier(random_state=42, eval_metric='mlogloss')
elif model_type == "mlp":
clf = MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=1000, random_state=42)
else:
clf = RandomForestClassifier(random_state=42)
return Pipeline(steps=[("preprocessor", preprocessor), ("classifier", clf)])
def evaluate_model(model, X_test, y_test, model_name: str):
"""评估模型性能"""
y_pred = model.predict(X_test)
# 计算指标(侧重召回率)
macro_f1 = f1_score(y_test, y_pred, average='macro')
recall_negative = recall_score(y_test, y_pred, labels=['negative'], average='macro', zero_division=0)
print(f"[{model_name}]")
print(f" Macro-F1: {macro_f1:.4f}")
print(f" 负面情感召回率: {recall_negative:.4f}")
return macro_f1, recall_negative
def train() -> None:
"""执行训练流程 - 只训练最佳模型"""
print(">>> 1. 数据准备与时序划分")
processor = DataProcessor("data/Tweets.csv")
df = processor.load_data()
# 数据预处理
df_processed = processor.extract_features(df)
# 准备训练数据
X = df_processed[['cleaned_text'] + get_feature_columns()[0] + get_feature_columns()[1]]
y = df_processed['airline_sentiment']
# 时序划分训练测试集(确保验证集时间晚于训练集)
df_sorted = df_processed.sort_values('tweet_created')
split_idx = int(0.8 * len(df_sorted))
X_train = X.iloc[:split_idx]
X_test = X.iloc[split_idx:]
y_train = y.iloc[:split_idx]
y_test = y.iloc[split_idx:]
print(f"训练集大小: {X_train.shape}, 测试集大小: {X_test.shape}")
print(f"时间范围: {df_sorted['tweet_created'].min()}{df_sorted['tweet_created'].max()}")
print("\n>>> 2. 训练最佳模型 (LogisticRegression)")
# 直接训练最佳模型
pipe = get_pipeline("lr")
pipe.fit(X_train, y_train)
macro_f1, recall_negative = evaluate_model(pipe, X_test, y_test, "LogisticRegression")
print("\n>>> 3. 模型详细评估")
# 详细分类报告
y_pred = pipe.predict(X_test)
print("\n分类报告:")
print(classification_report(y_test, y_pred))
print("\n>>> 4. 误差分析")
# 计算错误样本
error_mask = y_pred != y_test
error_count = error_mask.sum()
# 假阴性分析(漏判负面)
fn_mask = (y_test == 'negative') & (y_pred != 'negative')
fn_count = fn_mask.sum()
print(f"总计错误样本数: {error_count}")
print(f"假阴性错误(漏判负面): {fn_count}")
print("\n>>> 5. 保存模型")
# 确保模型目录存在
MODELS_DIR.mkdir(exist_ok=True)
# 保存模型
joblib.dump(pipe, MODEL_PATH)
# 保存特征名称(用于预测时特征对齐)
feature_names = {
'numeric_features': get_feature_columns()[0],
'categorical_features': get_feature_columns()[1],
'text_feature': 'cleaned_text'
}
joblib.dump(feature_names, MODELS_DIR / "feature_names.pkl")
print(f"最佳模型 (LogisticRegression) 已保存至 {MODEL_PATH}")
print(f"最终性能: Macro-F1 = {macro_f1:.4f}")
print(f"负面情感召回率 = {recall_negative:.4f}")
if macro_f1 >= 0.80:
print("✅ 达到目标: Macro-F1 ≥ 0.80")
else:
print("❌ 未达到目标: Macro-F1 < 0.80")
if __name__ == "__main__":
train()

12
bigwork/start_app.bat Normal file
View File

@ -0,0 +1,12 @@
@echo off
echo 正在修复 PATH 环境变量...
set PATH=%PATH:C:;Users=C:\Users%
set PATH=%PATH:;%S;stemRoot%=;%SystemRoot%%
echo 切换到项目目录...
cd /d "d:\HuaweiMoveData\Users\马艺洁\Desktop\MLwork\bigwork"
echo 使用 UV 启动 Streamlit 应用...
uv run streamlit run src/streamlit_app.py
pause

11
bigwork/start_app.ps1 Normal file
View File

@ -0,0 +1,11 @@
# 修复 PATH 环境变量
Write-Host "正在修复 PATH 环境变量..." -ForegroundColor Green
$env:PATH = $env:PATH.Replace('C:;Users', 'C:\Users').Replace(';%S;stemRoot%', ';%SystemRoot%')
# 切换到项目目录
Write-Host "切换到项目目录..." -ForegroundColor Green
Set-Location "d:\HuaweiMoveData\Users\马艺洁\Desktop\MLwork\bigwork"
# 使用 UV 启动 Streamlit 应用
Write-Host "使用 UV 启动 Streamlit 应用..." -ForegroundColor Green
uv run streamlit run src/streamlit_app.py

View File

@ -0,0 +1,85 @@
"""数据模块测试"""
import pytest
import pandas as pd
from pathlib import Path
import sys
# 添加项目根目录到Python路径
sys.path.append(str(Path(__file__).parent.parent))
from src.data import DataProcessor, TweetSchema
class TestDataProcessor:
"""数据处理器测试类"""
def setup_method(self):
"""测试准备"""
self.processor = DataProcessor("data/Tweets.csv")
def test_abbreviation_dict(self):
"""测试缩写词典"""
assert 'pls' in self.processor.abb_dict
assert self.processor.abb_dict['pls'] == 'please'
def test_preprocess_text(self):
"""测试文本预处理"""
# 测试基础清洗
text = "Hello @user This is a #test http://example.com"
processed = self.processor.preprocess_text(text)
# 应该保留@、#、URL
assert "@user" in processed
assert "#test" in processed
assert "http://example.com" in processed
# 测试缩写替换
text_with_abb = "pls help thx"
processed = self.processor.preprocess_text(text_with_abb)
assert "please" in processed
assert "thanks" in processed
def test_schema_validation(self):
"""测试数据验证"""
# 创建测试数据
test_data = {
'tweet_id': [1, 2, 3],
'airline_sentiment': ['negative', 'neutral', 'positive'],
'airline_sentiment_confidence': [0.9, 0.8, 0.95],
'negativereason': ['Late Flight', None, 'Bad Service'],
'airline': ['united', 'delta', 'american'],
'text': ['test tweet 1', 'test tweet 2', 'test tweet 3'],
'tweet_created': ['2023-01-01', '2023-01-02', '2023-01-03']
}
df = pd.DataFrame(test_data)
# 应该通过验证
validated_df = TweetSchema.validate(df)
assert len(validated_df) == 3
def test_feature_extraction(self):
"""测试特征提取"""
# 创建测试数据
test_data = {
'text': ['@user test #hashtag http://example.com'],
'airline': ['united'],
'airline_sentiment_confidence': [0.8],
'tweet_created': ['2023-01-01 10:00:00']
}
df = pd.DataFrame(test_data)
df_processed = self.processor.extract_features(df)
# 检查提取的特征
assert 'cleaned_text' in df_processed.columns
assert 'text_length' in df_processed.columns
assert 'has_mention' in df_processed.columns
assert df_processed['has_mention'].iloc[0] == 1
assert df_processed['has_hashtag'].iloc[0] == 1
assert df_processed['has_url'].iloc[0] == 1
if __name__ == "__main__":
pytest.main([__file__])

4081
bigwork/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

58
bigwork/uv_start.ps1 Normal file
View File

@ -0,0 +1,58 @@
# UV 智能启动脚本 - 自动修复 PATH 并启动应用
Write-Host "🚀 UV 智能启动脚本" -ForegroundColor Cyan
Write-Host "========================================" -ForegroundColor Cyan
# 自动修复 PATH 环境变量
function Fix-Path-Auto {
Write-Host "🔧 检查并修复 PATH 环境变量..." -ForegroundColor Yellow
$pythonScriptsPath = "C:\Users\马艺洁\AppData\Local\Programs\Python\Python312\Scripts"
# 检查是否需要修复
$needsFix = $false
if (-not ($env:PATH -like "*$pythonScriptsPath*")) {
Write-Host "✅ 添加正确的 Python Scripts 路径" -ForegroundColor Green
$env:PATH = "$pythonScriptsPath;" + $env:PATH
$needsFix = $true
}
if ($env:PATH -like "*C:;Users*") {
Write-Host "✅ 修复 C:;Users 格式错误" -ForegroundColor Green
$env:PATH = $env:PATH.Replace('C:;Users', 'C:\Users')
$needsFix = $true
}
if ($env:PATH -like "*;%S;stemRoot%*") {
Write-Host "✅ 修复 ;%S;stemRoot% 格式错误" -ForegroundColor Green
$env:PATH = $env:PATH.Replace(';%S;stemRoot%', ';%SystemRoot%')
$needsFix = $true
}
if (-not $needsFix) {
Write-Host "✅ PATH 环境变量正常" -ForegroundColor Green
}
}
# 执行修复
Fix-Path-Auto
# 切换到项目目录
Set-Location "d:\HuaweiMoveData\Users\马艺洁\Desktop\MLwork\bigwork"
# 验证 UV 命令
Write-Host "🔍 验证 UV 命令可用性..." -ForegroundColor Cyan
try {
$uvVersion = uv --version 2>$null
if ($LASTEXITCODE -eq 0) {
Write-Host "✅ UV 命令可用: $uvVersion" -ForegroundColor Green
Write-Host "🚀 使用 UV 启动 Streamlit 应用..." -ForegroundColor Green
uv run streamlit run src/streamlit_app.py
} else {
throw "UV command failed"
}
} catch {
Write-Host "⚠️ UV 命令不可用,尝试使用 python -m uv..." -ForegroundColor Yellow
python -m uv run streamlit run src/streamlit_app.py
}