08_17-AirCARE/bigwork/src/compensation_recommender.py
Your Name bd5d8d108c feat: 添加航空公司情感分析与智能客服系统初始代码
- 实现数据预处理模块(data.py)和模型训练模块(train.py)
- 添加智能客服Agent应用(agent_app.py)和DNA解码系统(dna_decoder.py)
- 包含补偿推荐系统(compensation_recommender.py)和可视化支持
- 添加项目配置文件(pyproject.toml)和README文档
- 提供多种启动脚本(start_app.*, fix_path_and_run.bat等)
2026-01-13 00:43:15 +08:00

380 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""个性化服务恢复与补偿推荐系统"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
from pydantic import BaseModel, Field
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import random
from datetime import datetime, timedelta
class CompensationEffect(BaseModel):
"""补偿效果分析"""
补偿类型: str = Field(description="补偿类型")
问题类型: str = Field(description="问题类型")
情感强度: float = Field(description="投诉情感强度")
预期满意度提升: float = Field(description="预期满意度提升")
成本效益比: float = Field(description="成本效益比")
推荐指数: float = Field(description="推荐指数(0-1)")
历史成功率: float = Field(description="历史成功率")
class CompensationRecommendation(BaseModel):
"""补偿推荐结果"""
最优补偿方案: str = Field(description="最优补偿方案")
备选方案: List[str] = Field(description="备选补偿方案")
预期效果: CompensationEffect = Field(description="预期效果分析")
推荐理由: str = Field(description="推荐理由")
实施建议: str = Field(description="实施建议")
A_B测试建议: str = Field(description="A/B测试建议")
class ABTestResult(BaseModel):
"""A/B测试结果"""
测试组: str = Field(description="测试组")
对照组: str = Field(description="对照组")
满意度提升差异: float = Field(description="满意度提升差异")
统计显著性: float = Field(description="统计显著性(p值)")
推荐结论: str = Field(description="推荐结论")
class CompensationRecommender:
"""个性化服务恢复与补偿推荐系统"""
def __init__(self, data_path: str = "data/Tweets.csv"):
self.data_path = data_path
self.compensation_types = [
"里程积分补偿", "代金券补偿", "升舱服务", "优先登机",
"免费行李额度", "贵宾室体验", "快速安检", "延误保险理赔",
"餐饮补偿", "酒店住宿", "交通补贴", "下次旅行折扣"
]
self.problem_types = [
"航班延误", "行李丢失", "服务态度", "座位问题",
"餐饮质量", "登机流程", "取消航班", "超售问题"
]
self.model = None
self.effect_data = self._generate_historical_effect_data()
def _generate_historical_effect_data(self) -> pd.DataFrame:
"""生成历史补偿效果数据(模拟数据)"""
np.random.seed(42)
data = []
for problem in self.problem_types:
for compensation in self.compensation_types:
for _ in range(50): # 每个组合50条历史数据
sentiment_intensity = np.random.uniform(-1, 0) # 负面情感
# 基于问题类型和补偿类型的预期效果
base_effect = self._calculate_base_effect(problem, compensation)
# 考虑情感强度的影响
sentiment_multiplier = 1 + abs(sentiment_intensity) * 0.5
# 随机因素
random_factor = np.random.normal(1, 0.1)
satisfaction_improvement = base_effect * sentiment_multiplier * random_factor
cost_benefit_ratio = self._calculate_cost_benefit(compensation, satisfaction_improvement)
success_rate = np.random.uniform(0.6, 0.95)
data.append({
'problem_type': problem,
'compensation_type': compensation,
'sentiment_intensity': sentiment_intensity,
'satisfaction_improvement': max(0, min(1, satisfaction_improvement)),
'cost_benefit_ratio': cost_benefit_ratio,
'success_rate': success_rate
})
return pd.DataFrame(data)
def _calculate_base_effect(self, problem: str, compensation: str) -> float:
"""计算基础补偿效果"""
# 问题-补偿匹配度矩阵
effect_matrix = {
"航班延误": {"里程积分补偿": 0.8, "代金券补偿": 0.7, "延误保险理赔": 0.9},
"行李丢失": {"里程积分补偿": 0.6, "代金券补偿": 0.8, "快速安检": 0.4},
"服务态度": {"升舱服务": 0.7, "贵宾室体验": 0.6, "下次旅行折扣": 0.8},
"餐饮质量": {"餐饮补偿": 0.9, "代金券补偿": 0.7, "里程积分补偿": 0.6}
}
# 默认效果
default_effect = 0.5
return effect_matrix.get(problem, {}).get(compensation, default_effect)
def _calculate_cost_benefit(self, compensation: str, improvement: float) -> float:
"""计算成本效益比"""
# 补偿成本估算
cost_estimates = {
"里程积分补偿": 50, "代金券补偿": 100, "升舱服务": 200,
"优先登机": 30, "免费行李额度": 40, "贵宾室体验": 60,
"快速安检": 20, "延误保险理赔": 150, "餐饮补偿": 25,
"酒店住宿": 120, "交通补贴": 80, "下次旅行折扣": 90
}
cost = cost_estimates.get(compensation, 50)
benefit = improvement * 200 # 假设满意度提升的价值
return benefit / cost if cost > 0 else 0
def train_recommendation_model(self) -> None:
"""训练补偿推荐模型"""
# 准备特征数据
X = pd.get_dummies(self.effect_data[['problem_type', 'compensation_type']])
X['sentiment_intensity'] = self.effect_data['sentiment_intensity']
y = self.effect_data['satisfaction_improvement']
# 训练模型
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.model.fit(X, y)
def recommend_compensation(self, problem_type: str, sentiment_intensity: float,
user_history: Optional[Dict] = None) -> CompensationRecommendation:
"""推荐补偿方案"""
if self.model is None:
self.train_recommendation_model()
# 评估所有补偿方案
compensation_effects = []
for compensation in self.compensation_types:
# 准备特征
features = self._prepare_features(problem_type, compensation, sentiment_intensity)
# 预测满意度提升
predicted_improvement = self.model.predict([features])[0]
# 计算成本效益比
cost_benefit = self._calculate_cost_benefit(compensation, predicted_improvement)
# 获取历史成功率
historical_data = self.effect_data[
(self.effect_data['problem_type'] == problem_type) &
(self.effect_data['compensation_type'] == compensation)
]
success_rate = historical_data['success_rate'].mean() if not historical_data.empty else 0.7
# 计算推荐指数
recommendation_score = self._calculate_recommendation_score(
predicted_improvement, cost_benefit, success_rate
)
compensation_effects.append(CompensationEffect(
补偿类型=compensation,
问题类型=problem_type,
情感强度=sentiment_intensity,
预期满意度提升=predicted_improvement,
成本效益比=cost_benefit,
推荐指数=recommendation_score,
历史成功率=success_rate
))
# 排序并选择最优方案
compensation_effects.sort(key=lambda x: x.推荐指数, reverse=True)
best_compensation = compensation_effects[0]
# 生成推荐理由
recommendation_reason = self._generate_recommendation_reason(best_compensation)
# 生成实施建议
implementation_advice = self._generate_implementation_advice(best_compensation.补偿类型)
# 生成A/B测试建议
ab_test_suggestion = self._generate_ab_test_suggestion(best_compensation, compensation_effects[1:3])
return CompensationRecommendation(
最优补偿方案=best_compensation.补偿类型,
备选方案=[ce.补偿类型 for ce in compensation_effects[1:3]],
预期效果=best_compensation,
推荐理由=recommendation_reason,
实施建议=implementation_advice,
A_B测试建议=ab_test_suggestion
)
def _prepare_features(self, problem_type: str, compensation: str, sentiment_intensity: float) -> List[float]:
"""准备特征数据"""
# 创建问题类型和补偿类型的独热编码
problem_features = [1 if problem_type == pt else 0 for pt in self.problem_types]
compensation_features = [1 if compensation == ct else 0 for ct in self.compensation_types]
return problem_features + compensation_features + [sentiment_intensity]
def _calculate_recommendation_score(self, improvement: float, cost_benefit: float, success_rate: float) -> float:
"""计算推荐指数"""
# 加权综合评分
improvement_weight = 0.4
cost_benefit_weight = 0.4
success_rate_weight = 0.2
return (improvement * improvement_weight +
min(cost_benefit, 5) * 0.2 * cost_benefit_weight + # 限制成本效益比影响
success_rate * success_rate_weight)
def _generate_recommendation_reason(self, effect: CompensationEffect) -> str:
"""生成推荐理由"""
reasons = []
if effect.预期满意度提升 > 0.7:
reasons.append("预期满意度提升效果显著")
elif effect.预期满意度提升 > 0.5:
reasons.append("预期满意度提升效果良好")
if effect.成本效益比 > 3:
reasons.append("成本效益比优秀")
elif effect.成本效益比 > 2:
reasons.append("成本效益比良好")
if effect.历史成功率 > 0.8:
reasons.append("历史成功率较高")
return "; ".join(reasons) if reasons else "基于综合评估推荐"
def _generate_implementation_advice(self, compensation_type: str) -> str:
"""生成实施建议"""
advice_map = {
"里程积分补偿": "建议立即发放积分,并发送确认邮件",
"代金券补偿": "生成电子代金券有效期建议30天",
"升舱服务": "为下次旅行提供升舱机会,需提前确认",
"优先登机": "自动添加到乘客档案,下次生效",
"延误保险理赔": "启动保险理赔流程,需收集相关证明"
}
return advice_map.get(compensation_type, "按照标准流程实施补偿")
def _generate_ab_test_suggestion(self, best_compensation: CompensationEffect,
alternatives: List[CompensationEffect]) -> str:
"""生成A/B测试建议"""
if not alternatives:
return "暂无合适的A/B测试方案"
alternative = alternatives[0]
return (
f"建议进行A/B测试{best_compensation.补偿类型}{alternative.补偿类型}进行对比,"
f"预期效果差异为{abs(best_compensation.预期满意度提升 - alternative.预期满意度提升):.2f}"
)
def run_ab_test(self, problem_type: str, sentiment_intensity: float,
test_groups: List[str], sample_size: int = 100) -> ABTestResult:
"""运行A/B测试"""
if len(test_groups) < 2:
raise ValueError("A/B测试需要至少2个测试组")
# 模拟测试结果
group_a = test_groups[0]
group_b = test_groups[1]
# 模拟测试数据
group_a_improvement = np.random.normal(0.6, 0.1, sample_size)
group_b_improvement = np.random.normal(0.65, 0.1, sample_size)
# 计算差异
mean_diff = group_b_improvement.mean() - group_a_improvement.mean()
# 简单显著性检验(模拟)
p_value = 0.03 if abs(mean_diff) > 0.05 else 0.15
conclusion = "B组效果显著优于A组" if p_value < 0.05 else "两组效果无显著差异"
return ABTestResult(
测试组=group_b,
对照组=group_a,
满意度提升差异=mean_diff,
统计显著性=p_value,
推荐结论=conclusion
)
# 创建补偿推荐Agent
from pydantic_ai import Agent
class CompensationAnalysis(BaseModel):
"""补偿分析结果"""
问题诊断: str = Field(description="问题诊断")
补偿策略: str = Field(description="补偿策略")
预期ROI: float = Field(description="预期投资回报率")
风险分析: str = Field(description="风险分析")
长期影响: str = Field(description="长期影响评估")
compensation_agent = Agent(
'deepseek:deepseek-chat',
system_prompt="""
你是航空公司补偿策略专家,基于行为经济学和因果推断原理分析补偿方案。
你的任务:
1. 分析客户问题的严重程度和影响范围
2. 评估不同补偿方案的有效性
3. 预测补偿方案的长期影响
4. 提供风险分析和优化建议
分析要点:
- 考虑客户的情感强度和问题类型
- 评估补偿方案的成本效益比
- 预测客户满意度的恢复程度
- 分析长期客户关系影响
输出必须是结构化的JSON格式。
""",
output_type=CompensationAnalysis
)
async def analyze_compensation_strategy(problem_description: str, sentiment_intensity: float) -> CompensationAnalysis:
"""分析补偿策略"""
result = await compensation_agent.run(f"""
请分析以下客户问题的补偿策略:
问题描述: {problem_description}
情感强度: {sentiment_intensity:.2f}
请提供:
1. 问题诊断和严重程度评估
2. 推荐的补偿策略
3. 预期投资回报率分析
4. 风险分析和优化建议
5. 长期客户关系影响评估
""")
# 获取实际的CompensationAnalysis对象
# pydantic-ai不同版本中result可能有不同结构
compensation_analysis = None
# 尝试多种可能的属性访问方式
if hasattr(result, 'data'):
compensation_analysis = result.data
elif hasattr(result, 'result'):
compensation_analysis = result.result
elif hasattr(result, 'output'):
compensation_analysis = result.output
elif hasattr(result, 'value'):
compensation_analysis = result.value
else:
compensation_analysis = result
# 如果compensation_analysis仍然是AgentRunResult类型尝试直接访问其属性
if hasattr(compensation_analysis, '问题诊断'):
# 已经是正确的CompensationAnalysis对象
pass
else:
# 尝试从其他属性获取
if hasattr(compensation_analysis, 'data'):
compensation_analysis = compensation_analysis.data
elif hasattr(compensation_analysis, 'result'):
compensation_analysis = compensation_analysis.result
elif hasattr(compensation_analysis, 'output'):
compensation_analysis = compensation_analysis.output
elif hasattr(compensation_analysis, 'value'):
compensation_analysis = compensation_analysis.value
# 最终检查是否获取到了正确的对象
if not hasattr(compensation_analysis, '问题诊断'):
raise AttributeError(f"无法访问CompensationAnalysis对象的问题诊断属性对象类型: {type(compensation_analysis)}")
return compensation_analysis