"""智能客服系统 - 用户流失预测与挽留措施生成""" import streamlit as st import pandas as pd import numpy as np from pathlib import Path import sys from typing import Dict, List, Optional, Any import json from datetime import datetime # 添加项目根目录到Python路径 sys.path.insert(0, str(Path(__file__).parent)) # 导入现有模块 from enhanced_sentiment_analyzer import EnhancedAirlineSentimentAnalyzer from machine_learning import FeatureEngineering, MachineLearningPipeline # DeepSeek API配置 import os from dotenv import load_dotenv load_dotenv() DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "") DEEPSEEK_API_BASE = "https://api.deepseek.com" # 尝试导入DeepSeek相关库 try: from openai import OpenAI client = OpenAI( api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_API_BASE ) DEEPSEEK_AVAILABLE = True except ImportError: DEEPSEEK_AVAILABLE = False st.warning("DeepSeek API不可用,将使用模拟响应") # Pydantic模型定义(用于结构化输出) class CustomerRetentionPlan: """客户挽留方案""" def __init__(self, customer_id: str, churn_probability: float, churn_level: str, risk_factors: List[str], retention_strategy: str, immediate_actions: List[str], long_term_measures: List[str], expected_outcome: str): self.customer_id = customer_id self.churn_probability = churn_probability self.churn_level = churn_level self.risk_factors = risk_factors self.retention_strategy = retention_strategy self.immediate_actions = immediate_actions self.long_term_measures = long_term_measures self.expected_outcome = expected_outcome def to_dict(self): return { "customer_id": self.customer_id, "churn_probability": self.churn_probability, "churn_level": self.churn_level, "risk_factors": self.risk_factors, "retention_strategy": self.retention_strategy, "immediate_actions": self.immediate_actions, "long_term_measures": self.long_term_measures, "expected_outcome": self.expected_outcome } class SmartCustomerService: """智能客服系统""" def __init__(self, data_path: str, random_state: int = 42): self.data_path = data_path self.random_state = random_state self.churn_model = None self.feature_engineer = None self.conversation_history = [] # 流失等级定义 self.churn_levels = { "low": (0.0, 0.3), "medium": (0.3, 0.7), "high": (0.7, 1.0) } def prepare_churn_data(self, df: pd.DataFrame) -> pd.DataFrame: """准备流失预测数据(基于真实用户行为)""" df_copy = df.copy() # 使用真实用户名作为用户ID df_copy['user_id'] = df_copy['name'] # 创建流失相关特征 df_copy['is_negative'] = (df_copy['airline_sentiment'] == 'negative').astype(int) df_copy['is_positive'] = (df_copy['airline_sentiment'] == 'positive').astype(int) df_copy['is_neutral'] = (df_copy['airline_sentiment'] == 'neutral').astype(int) df_copy['sentiment_strength'] = df_copy['airline_sentiment_confidence'] # 计算文本特征 df_copy['text_length'] = df_copy['text'].astype(str).apply(len) # 投诉关键词计数 complaint_keywords = ["投诉", "不满意", "问题", "糟糕", "差", "退票", "取消", "延误", "错误"] df_copy['complaint_keywords'] = df_copy['text'].astype(str).apply( lambda x: sum(1 for keyword in complaint_keywords if keyword in x) ) # 紧急关键词计数 urgent_keywords = ["紧急", "立即", "马上", "尽快", "立刻", "急需"] df_copy['urgent_keywords'] = df_copy['text'].astype(str).apply( lambda x: sum(1 for keyword in urgent_keywords if keyword in x) ) # 严重问题类型标记 serious_issues = ["Bad Flight", "Late Flight", "Customer Service Issue", "Lost Luggage"] df_copy['is_serious_issue'] = df_copy['negativereason'].isin(serious_issues).astype(int) # 常见问题类型标记 common_issues = ["Flight Booking Problems", "Cancelled Flight", "Flight Attendant Complaints"] df_copy['is_common_issue'] = df_copy['negativereason'].isin(common_issues).astype(int) # 计算用户级别的行为指标 user_stats = df_copy.groupby('user_id').agg({ 'is_negative': ['count', 'sum'], 'is_positive': 'sum', 'is_neutral': 'sum', 'sentiment_strength': 'mean', 'airline_sentiment_confidence': 'mean', 'retweet_count': 'sum', 'tweet_created': 'count', # 推文数量 'text_length': 'mean', 'complaint_keywords': 'sum', 'urgent_keywords': 'sum', 'is_serious_issue': 'sum', 'is_common_issue': 'sum', 'negativereason': 'nunique' # 问题多样性 }).reset_index() user_stats.columns = ['user_id', 'total_tweets', 'negative_count', 'positive_count', 'neutral_count', 'avg_sentiment_strength', 'avg_confidence', 'total_retweets', 'activity_count', 'text_length_avg', 'complaint_keywords', 'urgent_keywords', 'serious_issues', 'common_issues', 'issue_diversity'] # 计算关键指标 user_stats['negative_ratio'] = user_stats['negative_count'] / user_stats['total_tweets'] user_stats['positive_ratio'] = user_stats['positive_count'] / user_stats['total_tweets'] user_stats['engagement_rate'] = user_stats['total_retweets'] / user_stats['total_tweets'] user_stats['complaint_ratio'] = user_stats['complaint_keywords'] / user_stats['total_tweets'] user_stats['serious_issue_ratio'] = user_stats['serious_issues'] / user_stats['negative_count'].replace(0, 1) # 计算时间相关特征 if 'tweet_created' in df_copy.columns: df_copy['tweet_date'] = pd.to_datetime(df_copy['tweet_created'], errors='coerce') user_dates = df_copy.groupby('user_id')['tweet_date'].agg(['min', 'max']).reset_index() user_dates['activity_duration_days'] = (user_dates['max'] - user_dates['min']).dt.days user_dates['avg_tweets_per_day'] = user_stats['total_tweets'] / user_dates['activity_duration_days'].replace(0, 1) user_stats = user_stats.merge(user_dates[['user_id', 'activity_duration_days', 'avg_tweets_per_day']], on='user_id', how='left') else: user_stats['activity_duration_days'] = 0 user_stats['avg_tweets_per_day'] = 0 # 基于真实业务逻辑定义流失标签 # 更合理的流失定义:高负面比例 + 多次负面反馈 + 强负面情感 # 过滤掉只有1条推文的用户(数据不足) user_stats = user_stats[user_stats['total_tweets'] > 1] if len(user_stats) == 0: # 如果没有足够数据,使用默认流失标签 user_stats['churn_label'] = 0 return user_stats # 更合理的风险评分公式(包含新特征) user_stats['churn_risk_score'] = ( user_stats['negative_ratio'] * 0.3 + # 负面比例权重 user_stats['serious_issue_ratio'] * 0.2 + # 严重问题权重 user_stats['complaint_ratio'] * 0.15 + # 投诉关键词权重 (user_stats['urgent_keywords'] / user_stats['total_tweets']) * 0.1 + # 紧急程度权重 (user_stats['issue_diversity'] / user_stats['issue_diversity'].max()) * 0.1 + # 问题多样性权重 user_stats['avg_sentiment_strength'] * 0.15 # 情感强度权重 ) # 根据风险分数定义流失标签(前15%为高风险) risk_threshold = user_stats['churn_risk_score'].quantile(0.85) user_stats['churn_label'] = (user_stats['churn_risk_score'] > risk_threshold).astype(int) # 添加用户级别信息 user_airlines = df_copy.groupby('user_id')['airline'].first().reset_index() user_stats = pd.merge(user_stats, user_airlines, on='user_id', how='left') return user_stats def train_churn_model(self): """训练流失预测模型(使用您优化过的XGBoost)""" try: # 智能路径查找 data_paths = [ self.data_path, "data/Tweets.csv", "../data/Tweets.csv", "d:/HuaweiMoveData/Users/马艺洁/Desktop/MLwork/bigwork/data/Tweets.csv" ] df = None for path in data_paths: try: df = pd.read_csv(path) print(f"✅ 成功加载数据文件: {path}") break except FileNotFoundError: continue if df is None: print("❌ 无法找到数据文件,请检查Tweets.csv文件位置") return False # 准备流失数据 churn_data = self.prepare_churn_data(df) # 检查数据质量 if len(churn_data) == 0: print("❌ 流失数据准备失败,数据量不足") return False # 特征工程(使用您之前优化过的特征) feature_columns = [ 'total_tweets', 'negative_count', 'negative_ratio', 'positive_ratio', 'avg_sentiment_strength', 'avg_confidence', 'text_length_avg', 'complaint_keywords', 'urgent_keywords', 'complaint_ratio', 'serious_issues', 'common_issues', 'issue_diversity', 'serious_issue_ratio', 'total_retweets', 'activity_duration_days', 'avg_tweets_per_day', 'engagement_rate' ] # 只选择存在的特征列 available_features = [col for col in feature_columns if col in churn_data.columns] if len(available_features) == 0: print("❌ 没有可用的特征列") return False features = churn_data[available_features] labels = churn_data['churn_label'] # 检查标签分布 label_counts = labels.value_counts() print(f"标签分布: {dict(label_counts)}") # 使用您优化过的XGBoost模型参数 from xgboost import XGBClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, classification_report, roc_auc_score from sklearn.preprocessing import StandardScaler # 特征标准化 scaler = StandardScaler() features_scaled = scaler.fit_transform(features) # 划分训练测试集 X_train, X_test, y_train, y_test = train_test_split( features_scaled, labels, test_size=0.2, random_state=self.random_state, stratify=labels ) # 使用您之前优化过的XGBoost参数 self.churn_model = XGBClassifier( n_estimators=200, # 增加树的数量 max_depth=8, # 增加深度 learning_rate=0.05, # 降低学习率 subsample=0.8, # 子采样 colsample_bytree=0.8, # 特征采样 reg_alpha=0.1, # L1正则化 reg_lambda=1, # L2正则化 random_state=self.random_state, eval_metric='logloss', use_label_encoder=False ) self.churn_model.fit(X_train, y_train) # 评估模型 y_pred = self.churn_model.predict(X_test) y_pred_proba = self.churn_model.predict_proba(X_test)[:, 1] accuracy = accuracy_score(y_test, y_pred) auc_score = roc_auc_score(y_test, y_pred_proba) print(f"✅ 流失预测模型训练完成") print(f" 准确率: {accuracy:.4f}") print(f" AUC得分: {auc_score:.4f}") print(f" 特征数量: {len(available_features)}") # 保存特征名称用于预测 self.feature_names = available_features self.scaler = scaler return True except Exception as e: print(f"❌ 模型训练失败: {e}") return False def predict_churn_risk(self, user_data: Dict[str, Any]) -> Dict[str, Any]: """预测用户流失风险(使用标准化特征)""" if self.churn_model is None: return {"error": "模型未训练"} try: # 准备特征(按照训练时的特征顺序) feature_values = [] for feature_name in self.feature_names: feature_values.append(user_data.get(feature_name, 0)) # 转换为numpy数组并标准化 features = np.array([feature_values]) features_scaled = self.scaler.transform(features) # 预测概率 churn_probability = self.churn_model.predict_proba(features_scaled)[0][1] # 基于对话分析结果调整流失概率 # 如果有负面情感,增加流失概率 if user_data.get('sentiment') == 'negative': churn_probability = min(0.999, churn_probability + 0.20) # 增加幅度 # 如果有高紧急度,增加流失概率 if user_data.get('urgency_level') == 'high': churn_probability = min(0.999, churn_probability + 0.25) # 提高增加幅度 # 如果投诉强度高,增加流失概率 if 'complaint_intensity' in user_data and user_data['complaint_intensity'] > 0.5: churn_probability = min(0.999, churn_probability + 0.20) # 提高增加幅度 # 如果有关键词,增加流失概率 if 'keywords' in user_data and len(user_data['keywords']) > 2: churn_probability = min(0.999, churn_probability + 0.10) # 提高增加幅度 # 检查是否包含航班延误相关关键词 if 'content' in user_data: content = user_data['content'].lower() delay_keywords = ["延误", "晚点", "取消", "改签", "退票", "行李丢失", "耽误事", "耽误时间", "错过", "延误航班", "航班延误", "好几个小时", "长时间延误", "严重延误"] if any(keyword in content for keyword in delay_keywords): churn_probability = min(0.999, churn_probability + 0.15) # 提高航班延误的流失概率 # 检查是否包含"再也不"、"再也不想"等强烈流失意向的词汇 strong_exit_phrases = ["再也不", "再也不想", "再也不会", "再也不坐", "再也不用", "永远不", "永远不会", "永远不想"] if 'content' in user_data and any(phrase in user_data['content'] for phrase in strong_exit_phrases): churn_probability = min(0.999, churn_probability + 0.25) # 强烈流失意向,大幅增加概率 # 确保概率在合理范围内(避免0.00%的情况) churn_probability = max(0.01, min(0.999, churn_probability)) # 确定流失等级(使用更合理的阈值) churn_level = "low" if churn_probability >= 0.7: churn_level = "high" elif churn_probability >= 0.3: churn_level = "medium" # 识别风险因素(提供详细依据) risk_factors = self.identify_risk_factors_with_details(user_data, churn_probability) # 计算预测置信度(基于特征完整性和模型性能) prediction_confidence = self.calculate_prediction_confidence(user_data, churn_probability) return { "churn_probability": churn_probability, "churn_level": churn_level, "risk_factors": risk_factors, "prediction_confidence": prediction_confidence, "feature_details": self.get_feature_details(user_data), "model_info": { "model_type": "XGBoost", "feature_count": len(self.feature_names), "training_method": "基于用户行为特征的监督学习" } } except Exception as e: return {"error": f"预测失败: {e}"} def identify_risk_factors(self, user_data: Dict[str, Any], churn_prob: float) -> List[str]: """识别风险因素""" risk_factors = [] # 基于负面反馈的风险因素 if user_data.get('negative_ratio', 0) > 0.6: risk_factors.append("高负面反馈频率") if user_data.get('serious_issue_ratio', 0) > 0.5: risk_factors.append("严重问题投诉比例高") if user_data.get('complaint_ratio', 0) > 0.3: risk_factors.append("投诉关键词频繁出现") if user_data.get('urgent_keywords', 0) > 0: risk_factors.append("存在紧急诉求") # 基于情感强度的风险因素 if user_data.get('avg_sentiment_strength', 0) > 0.8: risk_factors.append("强烈负面情绪") # 基于活跃度的风险因素 if user_data.get('total_tweets', 0) < 3: risk_factors.append("低活跃度") if user_data.get('avg_tweets_per_day', 0) < 0.1: risk_factors.append("极低发帖频率") # 基于问题多样性的风险因素 if user_data.get('issue_diversity', 0) > 2: risk_factors.append("多类型问题投诉") # 基于模型预测的风险因素 if churn_prob > 0.7: risk_factors.append("高流失概率") elif churn_prob > 0.5: risk_factors.append("中等流失概率") return risk_factors if risk_factors else ["无明显风险因素"] def identify_risk_factors_with_details(self, user_data: Dict[str, Any], churn_prob: float) -> List[Dict[str, Any]]: """识别风险因素(包含详细依据)""" risk_factors = [] # 基于负面反馈的风险因素 if user_data.get('negative_ratio', 0) > 0.6: risk_factors.append({ "factor": "高负面反馈频率", "value": f"{user_data.get('negative_ratio', 0):.1%}", "threshold": "60%", "impact": "高", "description": "用户负面反馈比例过高,表明对服务严重不满" }) if user_data.get('serious_issue_ratio', 0) > 0.5: risk_factors.append({ "factor": "严重问题投诉比例高", "value": f"{user_data.get('serious_issue_ratio', 0):.1%}", "threshold": "50%", "impact": "高", "description": "涉及航班延误、服务问题等严重投诉比例较高" }) if user_data.get('complaint_ratio', 0) > 0.3: risk_factors.append({ "factor": "投诉关键词频繁出现", "value": f"{user_data.get('complaint_ratio', 0):.1%}", "threshold": "30%", "impact": "中", "description": "推文中频繁出现投诉、不满意等负面词汇" }) # 基于活跃度的风险因素 if user_data.get('total_tweets', 0) < 3: risk_factors.append({ "factor": "低活跃度", "value": f"{user_data.get('total_tweets', 0)}条推文", "threshold": "3条", "impact": "中", "description": "用户活跃度较低,可能缺乏互动意愿" }) if user_data.get('avg_tweets_per_day', 0) < 0.1: risk_factors.append({ "factor": "极低发帖频率", "value": f"{user_data.get('avg_tweets_per_day', 0):.2f}条/天", "threshold": "0.1条/天", "impact": "低", "description": "用户发帖频率极低,可能已转向其他平台" }) # 基于模型预测的风险因素 if churn_prob > 0.7: risk_factors.append({ "factor": "高流失概率", "value": f"{churn_prob:.1%}", "threshold": "70%", "impact": "极高", "description": "XGBoost模型预测流失概率超过70%" }) elif churn_prob > 0.5: risk_factors.append({ "factor": "中等流失概率", "value": f"{churn_prob:.1%}", "threshold": "50%", "impact": "中", "description": "XGBoost模型预测流失概率超过50%" }) # 基于情感分析的风险因素(如果有对话分析数据) if 'sentiment' in user_data and user_data['sentiment'] == 'negative': risk_factors.append({ "factor": "负面情感倾向", "value": "负面", "threshold": "中性", "impact": "中", "description": "用户表达了负面情感,可能存在流失风险" }) # 基于紧急程度的风险因素 if 'urgency_level' in user_data and user_data['urgency_level'] == 'high': risk_factors.append({ "factor": "高紧急度请求", "value": "高", "threshold": "中", "impact": "高", "description": "用户提出了高紧急度的请求,需要立即处理" }) # 基于投诉强度的风险因素 if 'complaint_intensity' in user_data and user_data['complaint_intensity'] > 0.3: # 降低阈值 risk_factors.append({ "factor": "高投诉强度", "value": f"{user_data['complaint_intensity']:.1%}", "threshold": "30%", "impact": "中", "description": "用户投诉强度较高,需要重点关注" }) # 基于关键词的风险因素 if 'keywords' in user_data and len(user_data['keywords']) > 2: # 降低阈值 risk_factors.append({ "factor": "多维度问题反馈", "value": f"{len(user_data['keywords'])}个关键词", "threshold": "2个", "impact": "中", "description": "用户反馈涉及多个维度的问题,需要全面处理" }) # 基于服务态度的风险因素 if any(phrase in user_data.get('content', '').lower() for phrase in ["不搭理", "不理", "不管", "不顾", "不服务", "不帮忙", "不解决", "不回应", "不回复", "不处理", "不作为"]): risk_factors.append({ "factor": "服务态度问题", "value": "存在", "threshold": "无", "impact": "高", "description": "用户反映服务人员态度问题,可能导致流失" }) return risk_factors def calculate_prediction_confidence(self, user_data: Dict[str, Any], churn_prob: float) -> float: """计算预测置信度""" confidence = 0.8 # 基础置信度 # 基于数据完整性的调整 total_tweets = user_data.get('total_tweets', 0) if total_tweets >= 5: confidence += 0.1 elif total_tweets >= 3: confidence += 0.05 else: confidence -= 0.1 # 基于特征数量的调整 feature_count = len([v for v in user_data.values() if isinstance(v, (int, float)) and v > 0]) if feature_count >= 10: confidence += 0.05 # 基于概率值的调整 if 0.2 <= churn_prob <= 0.8: confidence += 0.05 return min(0.95, max(0.6, confidence)) def get_feature_details(self, user_data: Dict[str, Any]) -> Dict[str, Any]: """获取特征详细信息""" return { "behavior_features": { "total_tweets": user_data.get('total_tweets', 0), "negative_ratio": user_data.get('negative_ratio', 0), "positive_ratio": user_data.get('positive_ratio', 0), "avg_sentiment_strength": user_data.get('avg_sentiment_strength', 0) }, "content_features": { "complaint_keywords": user_data.get('complaint_keywords', 0), "urgent_keywords": user_data.get('urgent_keywords', 0), "text_length_avg": user_data.get('text_length_avg', 0) }, "issue_features": { "serious_issues": user_data.get('serious_issues', 0), "common_issues": user_data.get('common_issues', 0), "issue_diversity": user_data.get('issue_diversity', 0) }, "activity_features": { "total_retweets": user_data.get('total_retweets', 0), "avg_tweets_per_day": user_data.get('avg_tweets_per_day', 0), "engagement_rate": user_data.get('engagement_rate', 0) } } def generate_retention_plan(self, user_id: str, churn_prediction: Dict[str, Any], query: str = "") -> CustomerRetentionPlan: """生成挽留方案""" # 使用DeepSeek生成挽留措施(如果可用) if DEEPSEEK_AVAILABLE: retention_text = self._call_deepseek_api(user_id, churn_prediction, query) else: retention_text = self._generate_mock_retention_plan(user_id, churn_prediction, query) # 解析响应并创建结构化方案 return self._parse_retention_plan(user_id, churn_prediction, retention_text) def _call_deepseek_api(self, user_id: str, churn_prediction: Dict[str, Any], query: str = "") -> str: """调用DeepSeek API生成挽留措施""" try: # 构建智能提示词(包含用户查询内容) prompt = self._build_retention_prompt(user_id, churn_prediction, query) # 调用DeepSeek API(使用新的openai接口) response = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "你是一个专业的客户关系管理专家,擅长制定客户挽留策略。请基于客户的具体问题和流失风险分析,提供切实可行的个性化挽留措施。"}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=1000 ) return response.choices[0].message.content.strip() except Exception as e: st.warning(f"DeepSeek API调用失败: {e},将使用备用方案") return self._generate_mock_retention_plan(user_id, churn_prediction, query) def _build_retention_prompt(self, user_id: str, churn_prediction: Dict[str, Any], query: str = "") -> str: """构建智能提示词(包含用户查询内容)""" churn_prob = churn_prediction.get('churn_probability', 0) risk_factors = churn_prediction.get('risk_factors', []) # 处理风险因素(可能是字典列表或字符串列表) risk_factors_text = "" if risk_factors: if isinstance(risk_factors[0], dict): # 如果是字典列表,提取factor字段 risk_factors_text = ', '.join([factor.get('factor', '') for factor in risk_factors]) else: # 如果是字符串列表,直接join risk_factors_text = ', '.join(risk_factors) else: risk_factors_text = "无明显风险因素" # 分析查询内容的情感倾向 query_context = "" if query: if any(keyword in query for keyword in ["投诉", "不满意", "问题", "糟糕", "差", "退票", "取消"]): query_context = f"客户表达了不满:'{query}'" elif any(keyword in query for keyword in ["感谢", "满意", "好", "优秀", "推荐"]): query_context = f"客户表达了满意:'{query}'" else: query_context = f"客户咨询:'{query}'" prompt = f""" 请为航空公司客户制定个性化的客户服务响应和挽留策略: 客户信息: - 客户ID: {user_id} - 客户反馈: {query} - 风险因素: {risk_factors_text} {f"- 当前问题: {query_context}" if query_context else ""} 请基于以上信息,特别是客户的具体问题,制定一个针对性的客户服务方案,遵循以下3步决策流程: 1. 评估:分析客户的具体问题和需求 2. 解释:说明我们的理解和解决方案 3. 行动计划:提供具体的措施和改进方案 请用中文回复,内容要具体、可执行,符合航空服务业的特点,并直接回应客户的问题。 注意: - 不要在回复中提及"流失风险等级"、"流失概率"等专业术语 - 直接针对客户的问题提供解决方案 - 使用友好、专业的语气 """ return prompt def _generate_mock_retention_plan(self, user_id: str, churn_prediction: Dict[str, Any], query: str = "") -> str: """生成模拟客户服务方案(包含查询内容)""" # 根据查询内容调整响应 query_response = "" if query: if any(keyword in query for keyword in ["投诉", "不满意", "问题", "糟糕", "差"]): query_response = f"针对您提到的问题,我们将立即调查并解决。" elif "退票" in query or "取消" in query: query_response = f"关于退票/取消事宜,我们将为您提供灵活的解决方案。" elif any(keyword in query for keyword in ["感谢", "满意", "好", "优秀"]): query_response = f"感谢您的积极反馈!我们将继续努力提供优质服务。" else: query_response = f"关于您的咨询,我们将为您提供详细解答。" # 基于查询内容生成响应,不再提及风险等级 return f""" 尊敬的客户, {query_response} 【评估】 我们已经详细了解了您的情况和需求。 【解释】 根据您提供的信息,我们理解您的关注点,并将采取相应措施解决您的问题。 【行动计划】 1. 立即执行措施: - 安排专属客服专员与您联系 - 针对您的具体问题提供详细解决方案 - 确保您的需求得到及时响应 2. 长期改进计划: - 持续优化我们的服务流程 - 加强客户反馈收集和处理机制 - 定期对服务质量进行评估和改进 我们致力于为您提供优质的服务体验,感谢您的理解与支持。 """ def _parse_retention_plan(self, user_id: str, churn_prediction: Dict[str, Any], retention_text: str) -> CustomerRetentionPlan: """解析客户服务方案文本为结构化数据""" # 尝试从响应中提取结构化信息 strategy, immediate, long_term, outcome = self._extract_from_ai_response(retention_text, "low") # 创建retention plan对象 retention_plan = CustomerRetentionPlan( customer_id=user_id, churn_probability=churn_prediction.get('churn_probability', 0), churn_level="low", # 默认值,不再使用风险等级 risk_factors=churn_prediction.get('risk_factors', []), retention_strategy=strategy, immediate_actions=immediate, long_term_measures=long_term, expected_outcome=outcome ) return retention_plan def _extract_from_ai_response(self, response_text: str, churn_level: str) -> tuple: """从AI响应中提取结构化信息""" # 基础策略映射 strategy_map = { "low": "常规维护", "medium": "主动干预", "high": "紧急挽留" } # 基础措施映射 immediate_map = { "low": ["发送满意度调查", "提供个性化推荐"], "medium": ["电话回访", "提供优惠券", "问题解决方案"], "high": ["高级经理介入", "实质性补偿", "紧急服务改进"] } long_term_map = { "low": ["定期客户关怀", "服务优化跟进"], "medium": ["服务质量改进", "客户满意度提升"], "high": ["长期跟踪机制", "深度服务优化"] } outcome_map = { "low": "维持良好客户关系", "medium": "降低流失风险", "high": "防止客户流失" } # 如果响应包含特定关键词,可以进一步优化策略 if "个性化" in response_text and "定制" in response_text: strategy_map[churn_level] += "(个性化定制)" if "紧急" in response_text or "立即" in response_text: immediate_map[churn_level].insert(0, "紧急响应机制") if "长期" in response_text and "跟踪" in response_text: long_term_map[churn_level].append("持续改进计划") return ( strategy_map[churn_level], immediate_map[churn_level], long_term_map[churn_level], outcome_map[churn_level] ) def process_customer_query(self, user_id: str, query: str, pre_analysis=None, pre_prediction=None) -> Dict[str, Any]: """处理客户查询,基于流失分析结果生成响应""" # 1. 分类:分析查询类型和情感 query_analysis = pre_analysis if pre_analysis else self.analyze_query(query) # 2. 获取用户数据并预测流失风险 user_data = self.get_user_data(user_id) churn_prediction = pre_prediction if pre_prediction else self.predict_churn_risk(user_data) # 3. 生成挽留方案(包含查询内容) retention_plan = self.generate_retention_plan(user_id, churn_prediction, query) # 4. 生成智能响应(使用DeepSeek API) explanation = self._generate_smart_response( query=query, query_analysis=query_analysis, churn_prediction=churn_prediction, retention_plan=retention_plan, pre_analysis=pre_analysis ) # 记录对话历史 self.conversation_history.append({ "timestamp": datetime.now().isoformat(), "user_id": user_id, "query": query, "analysis": query_analysis, "churn_prediction": churn_prediction, "response": explanation }) return { "query_analysis": query_analysis, "churn_prediction": churn_prediction, "retention_plan": retention_plan.to_dict(), "response": explanation } def analyze_query(self, query: str) -> Dict[str, Any]: """分析客户查询""" # 使用智能情感分析方法 sentiment = self._analyze_sentiment_smart(query) # 紧急程度分析 urgency = "high" if "紧急" in query or "立即" in query else "normal" return { "sentiment": sentiment, "urgency": urgency } def get_user_data(self, user_id: str) -> Dict[str, Any]: """获取用户数据(基于真实数据集)""" try: # 智能路径查找数据文件 data_paths = [ "../data/Tweets.csv", "data/Tweets.csv", "d:/HuaweiMoveData/Users/马艺洁/Desktop/MLwork/bigwork/data/Tweets.csv", self.data_path ] df = None for path in data_paths: try: df = pd.read_csv(path) break except FileNotFoundError: continue if df is None: st.warning("无法找到数据文件,使用默认数据") return self._get_default_user_data(user_id) # 如果用户ID是模拟的,从数据集中随机选择一个真实用户 if user_id.startswith("user_") or user_id == "test_user": # 从数据集中随机选择一个真实用户 real_users = df['name'].dropna().unique() if len(real_users) > 0: selected_user = np.random.choice(real_users) user_id = selected_user # 计算该用户的真实特征 user_tweets = df[df['name'] == user_id] if len(user_tweets) == 0: # 如果用户不存在,返回默认值 return self._get_default_user_data(user_id) # 计算基础特征 total_tweets = len(user_tweets) negative_count = len(user_tweets[user_tweets['airline_sentiment'] == 'negative']) positive_count = len(user_tweets[user_tweets['airline_sentiment'] == 'positive']) neutral_count = len(user_tweets[user_tweets['airline_sentiment'] == 'neutral']) negative_ratio = negative_count / total_tweets if total_tweets > 0 else 0 positive_ratio = positive_count / total_tweets if total_tweets > 0 else 0 # 情感强度(使用置信度作为强度指标) avg_sentiment_strength = user_tweets['airline_sentiment_confidence'].mean() avg_confidence = user_tweets['airline_sentiment_confidence'].mean() # 计算文本特征(基于推文内容) text_features = self._extract_text_features(user_tweets) # 计算负面原因特征 negative_reason_features = self._extract_negative_reason_features(user_tweets) # 计算活跃度特征 activity_features = self._extract_activity_features(user_tweets) # 合并所有特征 user_features = { "user_id": user_id, "total_tweets": total_tweets, "negative_count": negative_count, "positive_count": positive_count, "neutral_count": neutral_count, "negative_ratio": negative_ratio, "positive_ratio": positive_ratio, "avg_sentiment_strength": avg_sentiment_strength, "avg_confidence": avg_confidence, "airline": user_tweets['airline'].iloc[0] if 'airline' in user_tweets.columns else 'Unknown', "recent_tweet": user_tweets['text'].iloc[-1] if len(user_tweets) > 0 else "", **text_features, **negative_reason_features, **activity_features } return user_features except Exception as e: st.warning(f"获取用户数据失败: {e},使用默认数据") return self._get_default_user_data(user_id) def _extract_text_features(self, user_tweets: pd.DataFrame) -> Dict[str, Any]: """从推文文本中提取特征""" try: texts = user_tweets['text'].dropna().tolist() if not texts: return {"text_length_avg": 0, "complaint_keywords": 0, "urgent_keywords": 0} # 计算平均文本长度 avg_text_length = np.mean([len(str(text)) for text in texts]) # 投诉相关关键词 complaint_keywords = ["投诉", "不满意", "问题", "糟糕", "差", "退票", "取消", "延误", "错误"] complaint_count = sum(1 for text in texts if any(keyword in str(text) for keyword in complaint_keywords)) # 紧急程度关键词 urgent_keywords = ["紧急", "立即", "马上", "尽快", "立刻", "急需"] urgent_count = sum(1 for text in texts if any(keyword in str(text) for keyword in urgent_keywords)) return { "text_length_avg": avg_text_length, "complaint_keywords": complaint_count, "urgent_keywords": urgent_count, "complaint_ratio": complaint_count / len(texts) if texts else 0 } except Exception: return {"text_length_avg": 0, "complaint_keywords": 0, "urgent_keywords": 0, "complaint_ratio": 0} def _extract_negative_reason_features(self, user_tweets: pd.DataFrame) -> Dict[str, Any]: """提取负面原因相关特征""" try: negative_tweets = user_tweets[user_tweets['airline_sentiment'] == 'negative'] if len(negative_tweets) == 0: return {"serious_issues": 0, "common_issues": 0, "issue_diversity": 0} # 严重问题类型 serious_issues = ["Bad Flight", "Late Flight", "Customer Service Issue", "Lost Luggage"] serious_count = len(negative_tweets[negative_tweets['negativereason'].isin(serious_issues)]) # 常见问题类型 common_issues = ["Flight Booking Problems", "Cancelled Flight", "Flight Attendant Complaints"] common_count = len(negative_tweets[negative_tweets['negativereason'].isin(common_issues)]) # 问题多样性 unique_issues = negative_tweets['negativereason'].nunique() return { "serious_issues": serious_count, "common_issues": common_count, "issue_diversity": unique_issues, "serious_issue_ratio": serious_count / len(negative_tweets) if len(negative_tweets) > 0 else 0 } except Exception: return {"serious_issues": 0, "common_issues": 0, "issue_diversity": 0, "serious_issue_ratio": 0} def _extract_activity_features(self, user_tweets: pd.DataFrame) -> Dict[str, Any]: """提取用户活跃度特征""" try: # 转推数量 total_retweets = user_tweets['retweet_count'].sum() # 时间分布特征(如果有时序数据) if 'tweet_created' in user_tweets.columns: tweet_dates = pd.to_datetime(user_tweets['tweet_created'], errors='coerce') activity_duration_days = (tweet_dates.max() - tweet_dates.min()).days if not tweet_dates.isna().all() else 0 avg_tweets_per_day = len(user_tweets) / max(1, activity_duration_days) else: activity_duration_days = 0 avg_tweets_per_day = 0 # 参与度指标 engagement_rate = total_retweets / max(1, len(user_tweets)) return { "total_retweets": total_retweets, "activity_duration_days": activity_duration_days, "avg_tweets_per_day": avg_tweets_per_day, "engagement_rate": engagement_rate } except Exception: return {"total_retweets": 0, "activity_duration_days": 0, "avg_tweets_per_day": 0, "engagement_rate": 0} def _get_default_user_data(self, user_id: str) -> Dict[str, Any]: """获取默认用户数据(当无法从真实数据集中获取时)""" return { "user_id": user_id, "total_tweets": 5, "negative_count": 1, "positive_count": 2, "neutral_count": 2, "negative_ratio": 0.2, "positive_ratio": 0.4, "avg_sentiment_strength": 0.7, "avg_confidence": 0.8, "airline": "Unknown", "recent_tweet": "This is a default tweet", "text_length_avg": 100, "complaint_keywords": 0, "urgent_keywords": 0, "complaint_ratio": 0, "serious_issues": 0, "common_issues": 0, "issue_diversity": 0, "serious_issue_ratio": 0, "total_retweets": 1, "activity_duration_days": 7, "avg_tweets_per_day": 0.7, "engagement_rate": 0.2 } def generate_explanation(self, query_analysis: Dict[str, Any], churn_prediction: Dict[str, Any], retention_plan: CustomerRetentionPlan) -> str: """生成解释性响应""" # 基于结构化数据构建响应(不使用DeepSeek原始内容,避免包含风险等级话术) base_response = "感谢您的反馈。" if query_analysis["sentiment"] == "negative": base_response += "我们非常重视您提到的问题。" elif query_analysis["sentiment"] == "positive": base_response += "很高兴听到您的积极反馈!" # 移除流失风险说明,只关注问题解决 base_response += "我们将认真处理您的问题:" if retention_plan.immediate_actions: base_response += "\n\n立即措施:" for action in retention_plan.immediate_actions: base_response += f"\n• {action}" if retention_plan.long_term_measures: base_response += "\n\n长期改进:" for measure in retention_plan.long_term_measures: base_response += f"\n• {measure}" base_response += f"\n\n我们的目标:{retention_plan.expected_outcome}" return base_response def _generate_smart_response(self, query: str, query_analysis: Dict[str, Any], churn_prediction: Dict[str, Any], retention_plan: CustomerRetentionPlan, pre_analysis=None) -> str: """使用DeepSeek API生成智能结构化响应""" import requests import json import os from dotenv import load_dotenv # 加载环境变量 load_dotenv() # DeepSeek API配置 api_key = os.getenv("DEEPSEEK_API_KEY") api_url = "https://api.deepseek.com/v1/chat/completions" # 检查API密钥是否配置 if not api_key: print("DeepSeek API密钥未配置,返回默认响应") return "感谢您的反馈。我们的客服团队将尽快处理您的问题,并与您联系。" # 构建系统提示 system_prompt = """ 你是一个专业的航空公司客服助手,需要基于用户的反馈和流失分析结果生成结构化的响应。 请遵循以下要求: 1. 响应必须结构化,使用清晰的标题和分点说明 2. 结合流失分析结果,但不要直接提及"流失风险"等专业术语 3. 重点关注问题解决和用户关怀 4. 语言要专业、友好、真诚 5. 不要使用过于技术性的语言 6. 响应要详细但简洁,避免一大段文字 7. 针对用户的具体问题提供具体解决方案 8. 确保响应内容全面,覆盖用户反馈的所有问题 9. 提供具体的后续跟进方式和时间 10. 根据用户的情感倾向调整语气,对于负面反馈要更加歉意和安抚 结构化格式示例: ## 尊敬的用户 ### 关于您反馈的问题 - 简要总结用户的问题 - 表达对用户感受的理解和歉意 ### 我们的挽留方案 - 立即措施1(具体且可操作) - 立即措施2(具体且可操作) ### 长期改进计划 - 长期措施1(针对根本原因) - 长期措施2(针对根本原因) ### 我们的承诺 - 服务承诺1(具体可衡量) - 服务承诺2(具体可衡量) ### 后续跟进 - 跟进方式1(具体渠道) - 跟进时间(具体时间点) """ # 构建用户提示,包含所有相关信息 user_prompt = f""" 用户反馈:{query} 情感分析结果:{query_analysis.get('sentiment', 'neutral')} 紧急程度:{query_analysis.get('urgency', 'normal')} 流失分析相关信息: - 风险等级:{churn_prediction.get('risk_level', 'low')} - 流失概率:{churn_prediction.get('churn_probability', 0.0):.2f} - 置信度:{churn_prediction.get('confidence', 0.0):.2f} - 风险因素:{', '.join([rf.get('factor', str(rf)) for rf in churn_prediction.get('risk_factors', [])])} 挽留方案: - 核心策略:{retention_plan.retention_strategy} - 立即措施:{', '.join(retention_plan.immediate_actions)} - 长期措施:{', '.join(retention_plan.long_term_measures)} - 预期结果:{retention_plan.expected_outcome} """ # 构建请求体 payload = { "model": "deepseek-chat", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.7, "max_tokens": 500, "top_p": 0.95 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } try: # 调用DeepSeek API st.info("正在调用DeepSeek API生成智能响应...") response = requests.post(api_url, json=payload, headers=headers, timeout=30) # 延长超时时间 st.info(f"API响应状态码: {response.status_code}") response.raise_for_status() # 解析响应 result = response.json() st.info("API响应成功,正在处理结果...") smart_response = result['choices'][0]['message']['content'] return smart_response except requests.exceptions.Timeout as e: # 超时错误 st.error(f"DeepSeek API调用超时,请检查网络连接: {e}") # 生成结构化备用响应 fallback_response = f""" ## 尊敬的用户 ### 关于您反馈的问题 - {query} - 我们非常理解您的感受,对此给您带来的不便深表歉意 ### 我们的挽留方案 """ for action in retention_plan.immediate_actions: fallback_response += f"- {action}\n" fallback_response += f""" ### 长期改进计划 """ for measure in retention_plan.long_term_measures: fallback_response += f"- {measure}\n" fallback_response += f""" ### 我们的承诺 - 我们将持续优化服务质量,确保类似问题不再发生 - 您的反馈对我们非常重要,是我们改进的动力 - 如有任何问题,随时联系我们的客服热线:400-123-4567 ### 后续跟进 - 我们将在24小时内通过短信或电话跟进您的反馈处理情况 - 您可以通过我们的APP或官网查看反馈处理进度 - 预计在3个工作日内完成问题的最终处理 """ return fallback_response except requests.exceptions.HTTPError as e: # HTTP错误 st.error(f"DeepSeek API返回错误状态码: {e}") # 生成结构化备用响应 fallback_response = f""" ## 尊敬的用户 ### 关于您反馈的问题 - {query} - 我们非常理解您的感受,对此给您带来的不便深表歉意 ### 我们的挽留方案 """ for action in retention_plan.immediate_actions: fallback_response += f"- {action}\n" fallback_response += f""" ### 长期改进计划 """ for measure in retention_plan.long_term_measures: fallback_response += f"- {measure}\n" fallback_response += f""" ### 我们的承诺 - 我们将持续优化服务质量,确保类似问题不再发生 - 您的反馈对我们非常重要,是我们改进的动力 - 如有任何问题,随时联系我们的客服热线:400-123-4567 ### 后续跟进 - 我们将在24小时内通过短信或电话跟进您的反馈处理情况 - 您可以通过我们的APP或官网查看反馈处理进度 - 预计在3个工作日内完成问题的最终处理 """ return fallback_response except Exception as e: # 其他错误 st.error(f"DeepSeek API调用失败,使用备用响应生成方法: {e}") # 生成结构化备用响应 fallback_response = f""" ## 尊敬的用户 ### 关于您反馈的问题 - {query} - 我们非常理解您的感受,对此给您带来的不便深表歉意 ### 我们的挽留方案 """ for action in retention_plan.immediate_actions: fallback_response += f"- {action}\n" fallback_response += f""" ### 长期改进计划 """ for measure in retention_plan.long_term_measures: fallback_response += f"- {measure}\n" fallback_response += f""" ### 我们的承诺 - 我们将持续优化服务质量,确保类似问题不再发生 - 您的反馈对我们非常重要,是我们改进的动力 - 如有任何问题,随时联系我们的客服热线:400-123-4567 ### 后续跟进 - 我们将在24小时内通过短信或电话跟进您的反馈处理情况 - 您可以通过我们的APP或官网查看反馈处理进度 - 预计在3个工作日内完成问题的最终处理 """ return fallback_response def analyze_conversation_content(self, query: str, response: str) -> Dict[str, Any]: """分析对话内容,提取情感、紧急程度等特征""" # 情感分析 - 使用智能情感分析方法 sentiment = self._analyze_sentiment_smart(query) # 紧急程度分析 urgency_level = self._analyze_urgency(query) # 投诉强度分析 complaint_intensity = self._analyze_complaint_intensity(query) # 关键词提取 keywords = self._extract_keywords(query) return { "sentiment": sentiment, "urgency_level": urgency_level, "complaint_intensity": complaint_intensity, "keywords": keywords, "query_length": len(query), "response_length": len(response), "conversation_ratio": len(response) / max(1, len(query)) } def _analyze_sentiment_smart(self, text: str) -> str: """分析文本情感倾向 - 结合DeepSeek API的智能方法""" import requests import json import os from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 使用用户提供的DeepSeek API密钥 api_key = os.getenv("DEEPSEEK_API_KEY") api_url = "https://api.deepseek.com/v1/chat/completions" # 检查API密钥是否配置 if not api_key: print("DeepSeek API密钥未配置,返回默认情感分析结果") return "neutral" # 构建系统提示 system_prompt = """ 你是一个专业的情感分析助手,需要分析用户反馈的情感倾向。 请严格按照以下要求: 1. 只返回"positive"、"negative"或"neutral"中的一个 2. 基于文本的实际情感进行判断,不要被字面意思误导 3. 考虑上下文和语义,而不仅仅是关键词匹配 4. 对于明显的负面反馈如"饭特别难吃,肉都酸了",应判定为negative 5. 对于明显的正面反馈如"服务态度非常好",应判定为positive 6. 对于中性反馈如"今天天气不错",应判定为neutral """ # 构建用户提示 user_prompt = f"分析以下文本的情感倾向:{text}" # 构建请求体 payload = { "model": "deepseek-chat", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.1, "max_tokens": 10, "top_p": 0.95 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } try: # 调用DeepSeek API response = requests.post(api_url, json=payload, headers=headers, timeout=10) response.raise_for_status() # 解析响应 result = response.json() sentiment = result['choices'][0]['message']['content'].strip().lower() # 验证响应格式 if sentiment in ["positive", "negative", "neutral"]: return sentiment else: # 如果API返回格式不正确,使用备用方法 return self._analyze_sentiment_fallback(text) except Exception as e: # 如果API调用失败,使用备用方法 print(f"DeepSeek API调用失败,使用备用情感分析方法: {e}") return self._analyze_sentiment_fallback(text) def _analyze_sentiment_fallback(self, text: str) -> str: """备用情感分析方法 - 基于规则的方法""" # 情感词典设计 sentiment_lexicon = { "positive": [ "好", "优秀", "满意", "喜欢", "棒", "赞", "完美", "开心", "高兴", "愉快", "舒适", "方便", "快捷", "准时", "感谢", "谢谢", "礼品", "礼物", "不错", "可以", "还行", "一般", "正常", "OK", "okay", "fine", "good", "nice" ], "negative": [ "难吃", "酸了", "变质", "过期", "腐烂", "食物中毒", "卫生问题", "难以下咽", "无法食用", "不好", "差", "糟糕", "失望", "生气", "愤怒", "不满", "抱怨", "投诉", "延误", "晚点", "取消", "改签", "退票", "耽误", "丢失", "损坏", "故障", "危险", "隐患", "事故", "受伤", "伤害", "威胁", "风险", "不安全", "不搭理", "不理", "不管", "不顾", "不服务", "不帮忙", "不解决", "不回应", "不回复", "不处理", "冷", "热", "温度不适", "空气差", "异味", "臭味", "脚臭", "脱鞋", "大声说话", "吵闹", "票价", "改签费", "退票费", "问题", "错误", "失误", "麻烦", "困难", "不便" ] } text_lower = text.lower() # 计算正面和负面词出现次数 positive_count = 0 for word in sentiment_lexicon["positive"]: if word in text_lower: positive_count += 1 negative_count = 0 for word in sentiment_lexicon["negative"]: if word in text_lower: negative_count += 1 # 确定情感倾向 if negative_count > positive_count: return "negative" elif positive_count > negative_count: return "positive" else: return "neutral" def _analyze_sentiment(self, text: str) -> str: """分析文本情感倾向 - 专业语义分析""" # 情感词典设计 sentiment_lexicon = { # 正面情感词及其强度 "positive": { # 强烈正面 "strong": ["感谢", "满意", "优秀", "推荐", "很棒", "完美", "惊喜", "感动", "专业", "热情", "周到", "贴心", "耐心", "细心", "负责", "诚信", "美味", "好吃", "可口", "新鲜", "舒适", "整洁", "干净", "宽敞", "明亮", "安静", "温馨", "便利", "高效", "快速", "及时", "安全", "可靠", "实惠", "便宜", "划算", "合理", "超值", "礼品", "礼物", "小礼品"], # 中等正面 "medium": ["好", "谢谢", "准时", "友好", "公平", "透明", "稳定", "耐用", "完整", "正常", "达标", "合格", "正品", "高", "优质", "香", "甜", "适中", "合口味", "喜欢", "快乐", "幸福", "满足", "值得", "分享", "期待", "希望", "祝福", "祝愿"], # 轻微正面 "weak": ["可以", "不错", "还行", "一般", "将就", "凑合", "尚可", "过得去"] }, # 负面情感词及其强度 "negative": { # 强烈负面 "strong": ["投诉", "不满意", "糟糕", "差", "错误", "失误", "失望", "后悔", "上当", "受骗", "欺骗", "虚假", "夸大", "不实", "误导", "冷漠", "不耐烦", "敷衍", "推诿", "推脱", "推卸", "不负责任", "不专业", "不热情", "不友好", "恶劣", "粗鲁", "无礼", "傲慢", "歧视", "区别对待", "脏", "乱", "差", "破旧", "损坏", "故障", "不干净", "异味", "臭味", "拥挤", "嘈杂", "吵闹", "闷热", "寒冷", "昏暗", "通风差", "噪音", "喧闹", "脱鞋", "窄", "太窄", "不能调", "调节", "狭小", "不舒服", "难受", "局促", "贵", "昂贵", "离谱", "不合理", "不值", "宰客", "坑人", "欺诈", "乱收费", "隐藏费用", "附加费", "延误", "延迟", "迟到", "等待", "排队", "缓慢", "拖沓", "不及时", "超时", "耽误", "错过", "取消", "改期", "变更", "调整", "劣质", "假冒", "伪劣", "山寨", "假货", "次品", "不合格", "不达标", "缺陷", "瑕疵", "破损", "断裂", "失灵", "难吃", "馊了", "变质", "不新鲜", "恶心", "难以下咽", "发霉", "腐烂", "臭了", "难闻", "过期", "生冷", "不熟", "危险", "隐患", "事故", "受伤", "伤害", "威胁", "风险", "不安全"], # 中等负面 "medium": ["问题", "失误", "忽视", "无视", "不理", "不管", "伸展", "腿部空间", "太咸", "太淡", "太油", "太辣", "故障", "错误", "后悔", "失望", "拥挤", "吵闹", "脱鞋", "异味", "臭味", "不舒服", "难受", "局促", "贵", "不合理", "不值", "延误", "等待", "缓慢", "不及时", "耽误", "取消", "改期", "变更", "调整", "缺陷", "瑕疵", "破损", "断裂", "失灵", "难吃", "不新鲜", "恶心", "难以下咽", "异味", "难闻", "风险", "不安全"], # 轻微负面 "weak": ["一般", "普通", "尚可", "过得去", "凑合", "将就", "可以", "还行"] } } # 程度副词及其影响因子 intensifiers = { "strong": ["非常", "特别", "极其", "很", "太", "超级", "十分", "相当", "特别", "尤其", "格外", "更加", "越", "更", "最", "极", "真的", "实在", "非常"], "medium": ["比较", "相对", "有点", "有些", "稍微", "略微", "较为", "还算", "挺", "蛮"], "weak": ["一点点", "稍微", "略微", "有点", "有些"] } # 否定词及其影响 negations = ["不", "没", "无", "非", "未", "别", "不要", "没有", "不会", "不可能", "没什么", "不太"] # 特殊情感表达 special_expressions = { "positive": ["太好了", "太棒了", "非常好", "特别好", "超级好", "真的好", "实在好", "很满意", "非常满意", "特别满意", "超级满意", "真的满意", "实在满意", "谢谢", "谢谢", "谢谢你", "感谢你", "非常感谢", "特别感谢", "超级感谢", "很开心", "非常开心", "特别开心", "超级开心", "真的开心", "实在开心", "很感动", "非常感动", "特别感动", "超级感动", "真的感动", "实在感动"], "negative": ["太差了", "太糟糕了", "非常差", "特别差", "超级差", "真的差", "实在差", "很不满意", "非常不满意", "特别不满意", "超级不满意", "真的不满意", "实在不满意", "不开心", "很不开心", "非常不开心", "特别不开心", "超级不开心", "真的不开心", "实在不开心", "很失望", "非常失望", "特别失望", "超级失望", "真的失望", "实在失望", "很生气", "非常生气", "特别生气", "超级生气", "真的生气", "实在生气"] } text_lower = text.lower() # 计算情感得分 sentiment_score = 0.0 # 1. 检查特殊情感表达 for expr in special_expressions["positive"]: if expr in text_lower: sentiment_score += 2.0 for expr in special_expressions["negative"]: if expr in text_lower: sentiment_score -= 2.0 # 2. 分析正面情感词 for strength, words in sentiment_lexicon["positive"].items(): for word in words: if word in text_lower: # 基础得分 if strength == "strong": base_score = 1.0 elif strength == "medium": base_score = 0.6 else: # weak base_score = 0.3 # 检查程度副词 for int_strength, int_words in intensifiers.items(): for int_word in int_words: if f"{int_word}{word}" in text_lower or f"{int_word} {word}" in text_lower: if int_strength == "strong": base_score *= 1.5 elif int_strength == "medium": base_score *= 1.2 else: # weak base_score *= 0.8 break # 检查否定词 word_index = text_lower.find(word) has_negation = False for neg_word in negations: neg_index = text_lower.rfind(neg_word, 0, word_index) if neg_index != -1 and word_index - neg_index < 15: # 否定词在15个字符内 has_negation = True break if has_negation: sentiment_score -= base_score else: sentiment_score += base_score # 3. 分析负面情感词 for strength, words in sentiment_lexicon["negative"].items(): for word in words: if word in text_lower: # 基础得分 if strength == "strong": base_score = 1.0 elif strength == "medium": base_score = 0.6 else: # weak base_score = 0.3 # 检查程度副词 for int_strength, int_words in intensifiers.items(): for int_word in int_words: if f"{int_word}{word}" in text_lower or f"{int_word} {word}" in text_lower: if int_strength == "strong": base_score *= 1.5 elif int_strength == "medium": base_score *= 1.2 else: # weak base_score *= 0.8 break # 检查否定词 word_index = text_lower.find(word) has_negation = False for neg_word in negations: neg_index = text_lower.rfind(neg_word, 0, word_index) if neg_index != -1 and word_index - neg_index < 15: # 否定词在15个字符内 has_negation = True break if has_negation: sentiment_score += base_score else: sentiment_score -= base_score # 4. 上下文语义分析(考虑转折词) 转折词 = ["但是", "但", "然而", "可是", "不过", "只是", "却", "然而"] for 转折词 in 转折词: if 转折词 in text_lower: # 转折词后的内容权重更高 parts = text_lower.split(转折词) if len(parts) > 1: # 分析转折词后的内容 后半部分 = parts[1] 后半得分 = 0.0 # 检查后半部分的正面情感 for words in sentiment_lexicon["positive"].values(): for word in words: if word in 后半部分: 后半得分 += 1.0 # 检查后半部分的负面情感 for words in sentiment_lexicon["negative"].values(): for word in words: if word in 后半部分: 后半得分 -= 1.0 # 转折词后的内容权重更高 if abs(后半得分) > 0: sentiment_score = 后半得分 * 1.5 break # 5. 确定情感倾向 # 调整阈值,确保明显的负面情感能被正确识别 if sentiment_score > 0.5: return "positive" elif sentiment_score < -0.1: return "negative" else: return "neutral" def _analyze_urgency(self, text: str) -> str: """分析文本紧急程度 - 智能分类""" # 紧急程度分类(根据常识性规则和影响范围) urgency_levels = { # 高紧急度:会造成严重后果或急需解决的问题 "high": { "categories": { "食品安全": ["馊了", "变质", "过期", "腐烂", "食物中毒", "卫生问题", "难吃", "难以下咽", "无法食用"], "航班延误": ["延误", "晚点", "取消", "改签", "退票", "耽误事", "耽误时间", "错过", "延误航班", "航班延误"], "行李问题": ["行李丢失", "行李损坏", "行李超重", "行李托运"], "设施故障": ["空调故障", "温度调节", "座位故障", "洗手间故障", "娱乐系统故障"], "安全问题": ["危险", "隐患", "事故", "受伤", "伤害", "威胁", "风险", "不安全"], "紧急请求": ["紧急", "立即", "马上", "尽快", "立刻", "急需", "现在", "刻不容缓", "迫不及待"] }, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] }, # 中紧急度:需要及时处理但不危及安全的问题 "medium": { "categories": { "服务态度": ["不搭理", "不理", "不管", "不顾", "不服务", "不帮忙", "不解决", "不回应", "不回复", "不处理"], "环境问题": ["太冷", "太热", "温度不适", "空气差", "异味", "臭味", "脚臭", "脱鞋", "大声说话", "吵闹"], "票务问题": ["票价", "改签费", "退票费", "优惠", "折扣", "性价比"], "一般请求": ["尽快", "马上", "立即", "希望尽快", "希望马上", "希望立即"] }, "intensifiers": ["比较", "相对", "有点", "有些", "稍微", "略微", "较为", "还算"] }, # 低紧急度:影响较小,可稍后处理的问题 "low": { "categories": { "建议反馈": ["建议", "反馈", "意见", "希望", "期待", "改进"], "一般咨询": ["咨询", "询问", "了解", "想知道", "请问", "能否告知"], "轻微不满": ["有点", "稍微", "不太", "不太满意", "有点失望", "不太舒服"] }, "intensifiers": ["一点点", "稍微", "略微", "有点", "有些"] } } text_lower = text.lower() urgency_scores = {"high": 0, "medium": 0, "low": 0} # 检查每个紧急程度类别 for level, config in urgency_levels.items(): for category, keywords in config["categories"].items(): for keyword in keywords: if keyword in text_lower: # 基础分数 urgency_scores[level] += 1 # 检查是否有程度增强词 for intensifier in config["intensifiers"]: if f"{intensifier}{keyword}" in text_lower or f"{intensifier} {keyword}" in text_lower: urgency_scores[level] += 0.5 break # 确定最终紧急程度 max_score = max(urgency_scores.values()) if max_score == 0: return "low" # 紧急程度决策逻辑 if urgency_scores["high"] >= 1: return "high" elif urgency_scores["medium"] >= 1: return "medium" else: return "low" def _analyze_complaint_intensity(self, text: str) -> float: """分析投诉强度""" # 投诉强度分类(根据常识性规则) # 不同类型问题的基础严重程度 issue_severity = { # 高严重度问题 "食品安全": { "keywords": ["馊了", "变质", "过期", "腐烂", "食物中毒", "卫生问题", "难吃", "难以下咽", "无法食用"], "base_severity": 0.6, # 基础严重度 "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] # 程度增强词 }, "航班延误": { "keywords": ["延误", "晚点", "取消", "改签", "退票", "耽误事", "耽误时间", "错过", "延误航班", "航班延误"], "base_severity": 0.5, "intensifiers": ["好几个小时", "长时间", "严重", "非常", "特别", "极其", "很", "太", "超级", "真的", "实在"] }, "行李问题": { "keywords": ["行李丢失", "行李损坏", "行李超重", "行李托运"], "base_severity": 0.5, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] }, "安全问题": { "keywords": ["危险", "隐患", "事故", "受伤", "伤害", "威胁", "风险", "不安全"], "base_severity": 0.7, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] }, # 中严重度问题 "服务态度": { "keywords": ["不搭理", "不理", "不管", "不顾", "不服务", "不帮忙", "不解决", "不回应", "不回复", "不处理", "不作为"], "base_severity": 0.4, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] }, "环境问题": { "keywords": ["太冷", "太热", "温度不适", "空气差", "异味", "臭味", "脚臭", "脱鞋", "大声说话", "吵闹", "影响休息", "打扰他人"], "base_severity": 0.3, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] }, "设施问题": { "keywords": ["空调故障", "温度调节", "座位故障", "洗手间故障", "娱乐系统故障", "充电插座坏了"], "base_severity": 0.4, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] }, # 低严重度问题 "票务问题": { "keywords": ["票价", "改签费", "退票费", "优惠", "折扣", "性价比"], "base_severity": 0.2, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] }, "一般问题": { "keywords": ["问题", "错误", "失误", "麻烦", "困难", "不便"], "base_severity": 0.2, "intensifiers": ["非常", "特别", "极其", "很", "太", "超级", "严重", "真的", "实在"] } } text_lower = text.lower() total_severity = 0.0 issue_count = 0 # 检查每个问题类型 for issue_type, config in issue_severity.items(): for keyword in config["keywords"]: if keyword in text_lower: # 基础严重度 severity = config["base_severity"] # 检查是否有程度增强词 for intensifier in config["intensifiers"]: if intensifier in text_lower: severity = min(1.0, severity + 0.2) # 增强严重度 break # 检查文本长度(越长的投诉可能越强烈) if len(text) > 20: severity = min(1.0, severity + 0.1) total_severity += severity issue_count += 1 break # 每个问题类型只计算一次 # 计算平均严重度 if issue_count > 0: # 确保有合理的最低值 avg_severity = max(0.2, total_severity / issue_count) # 确保不会过于绝对(最多95%) avg_severity = min(0.95, avg_severity) else: # 没有识别到问题类型,返回最低值 avg_severity = 0.1 return avg_severity def _extract_keywords(self, text: str) -> List[Dict[str, Any]]: """提取关键词(分类别)""" keywords = [] # 扩展分类关键词(重点扩充航班相关词汇) categories = { "投诉": ["投诉", "不满意", "问题", "糟糕", "差", "错误", "失误", "失望", "后悔", "上当", "受骗", "欺骗", "虚假", "夸大", "不实", "误导"], "服务": ["服务", "态度", "质量", "体验", "满意", "感谢", "专业", "热情", "周到", "耐心", "细心", "负责", "不搭理", "不理", "不管", "不顾", "不服务", "不帮忙", "不解决", "不回应", "不回复", "不处理", "不作为", "不负责任", "不专业", "不热情", "不友好", "不耐烦", "不礼貌", "不尊重", "视而不见", "充耳不闻", "爱答不理", "敷衍了事", "态度冷漠", "响应慢", "处理不及时", "解决问题不力"], "产品": ["产品", "质量", "功能", "设计", "外观", "性能", "可靠性", "安全性"], "价格": ["价格", "费用", "收费", "优惠", "折扣", "性价比", "贵", "便宜", "实惠"], "时间": ["时间", "等待", "延误", "迟到", "准时", "及时", "快速", "缓慢"], "环境": ["环境", "卫生", "整洁", "舒适", "安静", "拥挤", "嘈杂", "干净", "太冷", "太热", "温度不适", "空气差", "异味", "臭味", "脚臭", "脱鞋", "大声说话", "打电话", "哭闹", "小孩吵闹", "影响休息", "打扰他人", "破坏安静"], "设施": ["设施", "设备", "座位", "空间", "温度", "照明", "网络", "充电", "座位不舒服", "座位间距小", "座位脏", "洗手间脏", "娱乐系统故障", "充电插座坏了", "空调问题", "温度调节", "腿部空间", "座椅舒适度", "舱内温度", "舱内噪音", "通风"], "食品": ["食品", "餐饮", "饭菜", "味道", "质量", "卫生", "价格", "种类"], "交通": ["交通", "出行", "路线", "拥堵", "准时", "延误", "便捷", "不便"], "安全": ["安全", "保障", "风险", "隐患", "事故", "防护", "措施", "意识"], # 航班相关词汇 "航班": ["航班", "飞机", "飞行", "航空", "机场", "登机", "起飞", "降落", "航班号", "航线", "航程", "飞行时间", "航班延误", "航班取消", "航班改签"], "登机": ["登机", "安检", "值机", "行李托运", "登机牌", "候机厅", "登机口", "安检排队", "值机柜台", "行李超重", "行李丢失"], "机舱": ["机舱", "客舱", "经济舱", "商务舱", "头等舱", "座位", "座椅", "空间", "行李架", "扶手", "小桌板", "娱乐设备", "空调", "温度", "噪音", "安静"], "服务": ["空姐", "空乘", "乘务员", "服务", "餐饮服务", "饮料", "餐食", "毛毯", "枕头", "耳机", "娱乐设施", "WiFi", "充电插座"], "延误": ["延误", "晚点", "推迟", "延误原因", "天气原因", "机械故障", "航空管制", "延误赔偿", "延误住宿", "延误餐食"], "行李": ["行李", "托运行李", "手提行李", "行李额", "行李超重", "行李丢失", "行李损坏", "行李领取", "行李传送带"], "机场": ["机场", "航站楼", "出发厅", "到达厅", "机场大巴", "机场地铁", "机场出租车", "机场餐厅", "机场商店", "机场WiFi", "机场充电"], "票务": ["机票", "票价", "改签", "退票", "退票费", "改签费", "特价票", "折扣票", "往返票", "单程票", "联程票"], "飞行": ["飞行", "起飞", "降落", "颠簸", "气流", "飞行时间", "航线", "高度", "视野", "窗外景色", "飞行安全"], "乘客行为": ["乘客", "旅客", "客人", "同行", "邻座", "旁边人", "霸座", "插队", "酗酒", "吸烟", "骚扰", "不良行为", "影响他人"] } # 去重处理,避免重复提取 extracted_keywords = set() for category, category_keywords in categories.items(): for keyword in category_keywords: if keyword in text and keyword not in extracted_keywords: # 计算关键词出现次数 count = text.count(keyword) keywords.append({ "word": keyword, "type": category, "count": count }) extracted_keywords.add(keyword) # 如果没有提取到关键词,尝试提取高频词 if not keywords and len(text) > 5: # 简单的高频词提取(基于字符长度) words = [] current_word = "" for char in text: if char.isalpha(): current_word += char else: if len(current_word) >= 2: words.append(current_word) current_word = "" if len(current_word) >= 2: words.append(current_word) # 统计词频 word_counts = {} for word in words: word_counts[word] = word_counts.get(word, 0) + 1 # 提取出现次数最多的3个词 sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) for word, count in sorted_words[:3]: if len(word) >= 2: keywords.append({ "word": word, "type": "其他", "count": count }) return keywords def show_chat_interface(customer_service: SmartCustomerService): """显示对话界面,基于流失分析结果生成响应""" st.markdown("

💬 智能客服对话

", unsafe_allow_html=True) # 检查是否有流失分析结果 if 'current_analysis' not in st.session_state: st.warning("⚠️ 请先在【流失分析】板块完成用户分析") st.markdown("### 使用流程") st.write("1. 前往【流失分析】标签页") st.write("2. 训练XGBoost模型(如果尚未训练)") st.write("3. 输入用户ID和内容进行流失分析") st.write("4. 返回此处生成客服响应") return current_analysis = st.session_state.current_analysis user_id = current_analysis["user_id"] user_content = current_analysis["content"] churn_prediction = current_analysis["churn_prediction"] conversation_analysis = current_analysis["analysis"] # 显示当前分析结果摘要 st.info(f"当前分析用户: {user_id}") st.markdown("### 📊 流失分析摘要") # 显示关键分析指标 col1, col2, col3 = st.columns(3) # 情感倾向 with col1: sentiment = conversation_analysis.get("sentiment", "neutral") sentiment_emoji = {"positive": "😊", "negative": "😠", "neutral": "😐"} st.metric(f"{sentiment_emoji[sentiment]} 情感倾向", sentiment.upper()) # 紧急程度 with col2: urgency = conversation_analysis.get("urgency_level", "low") urgency_emoji = {"high": "🚨", "medium": "⚠️", "low": "🟢"} st.metric(f"{urgency_emoji[urgency]} 紧急程度", urgency.upper()) # 流失风险 with col3: probability = churn_prediction.get("churn_probability", 0) if probability < 0.3: st.metric("🟢 流失风险", "低") elif probability < 0.7: st.metric("🟡 流失风险", "中") else: st.metric("🔴 流失风险", "高") # 对话历史显示 if customer_service.conversation_history: st.markdown("### 📋 对话历史") for i, conversation in enumerate(customer_service.conversation_history[-5:]): # 显示最近5条 with st.expander(f"对话 {i+1} - {conversation['timestamp'][:19]}"): st.write(f"**用户**: {conversation['query']}") st.write(f"**客服**: {conversation['response']}") # 用户输入 st.markdown("### 💭 请输入您的查询") user_query = st.text_area("您的消息", placeholder="请输入您的问题或反馈...", value=user_content, key="user_query") if st.button("发送", type="primary") and user_query: with st.spinner("正在分析您的查询并生成响应..."): # 处理用户查询,使用流失分析结果 result = customer_service.process_customer_query( user_id, user_query, pre_analysis=conversation_analysis, pre_prediction=churn_prediction ) # 显示结果 st.markdown("### 🤖 客服响应") # 使用st.markdown来显示结构化响应,确保Markdown格式被正确渲染 st.markdown(result["response"]) def show_churn_analysis(customer_service: SmartCustomerService): """显示流失分析界面(支持手动输入用户ID和内容)""" st.markdown("

📊 用户流失分析

", unsafe_allow_html=True) # 模型训练状态 st.markdown("### 🤖 模型状态") col1, col2 = st.columns(2) with col1: if st.button("🔄 训练/更新模型", type="primary"): with st.spinner("正在使用XGBoost算法训练流失预测模型..."): success = customer_service.train_churn_model() if success: st.success("✅ 模型训练完成!使用您优化过的XGBoost参数") else: st.error("❌ 模型训练失败,请检查数据文件") with col2: if customer_service.churn_model is not None: st.success("✅ 模型已训练") st.caption(f"模型类型: XGBoost | 特征数量: {len(getattr(customer_service, 'feature_names', []))}") else: st.warning("⚠️ 模型未训练") st.markdown("---") # 手动输入区域 st.markdown("### 👤 手动输入分析") user_id = st.text_input("用户ID", placeholder="请输入用户ID,例如:user_001") user_content = st.text_area("用户反馈内容", placeholder="请输入用户反馈内容...", key="user_feedback") if st.button("🔍 进行流失分析", type="secondary") and user_id and user_content: with st.spinner("正在基于输入内容分析用户流失风险..."): # 分析当前对话内容 conversation_analysis = customer_service.analyze_conversation_content(user_content, "") # 获取用户历史数据 user_data = customer_service.get_user_data(user_id) # 将对话分析结果添加到用户数据中,用于风险因素分析 user_data['sentiment'] = conversation_analysis['sentiment'] user_data['urgency_level'] = conversation_analysis['urgency_level'] user_data['complaint_intensity'] = conversation_analysis['complaint_intensity'] user_data['keywords'] = conversation_analysis['keywords'] user_data['content'] = user_content # 添加用户输入的内容,用于服务态度风险分析 # 进行流失预测 churn_prediction = customer_service.predict_churn_risk(user_data) if "error" in churn_prediction: st.error(f"❌ 分析失败: {churn_prediction['error']}") else: # 显示对话内容分析 st.markdown("### 💬 对话内容分析") col1, col2, col3 = st.columns(3) with col1: sentiment = conversation_analysis.get("sentiment", "neutral") sentiment_emoji = {"positive": "😊", "negative": "😠", "neutral": "😐"} st.metric(f"{sentiment_emoji[sentiment]} 情感倾向", sentiment.upper()) with col2: urgency = conversation_analysis.get("urgency_level", "low") urgency_emoji = {"high": "🚨", "medium": "⚠️", "low": "🟢"} st.metric(f"{urgency_emoji[urgency]} 紧急程度", urgency.upper()) with col3: complaint_intensity = conversation_analysis.get("complaint_intensity", 0) if complaint_intensity > 0.7: st.metric("😠 投诉强度", f"{complaint_intensity:.1%}", delta="高") elif complaint_intensity > 0.3: st.metric("😠 投诉强度", f"{complaint_intensity:.1%}", delta="中") else: st.metric("😠 投诉强度", f"{complaint_intensity:.1%}", delta="低") # 显示对话关键词分析 st.markdown("#### 🔍 关键词分析") keywords = conversation_analysis.get("keywords", []) if keywords: # 按类型分组显示关键词 keyword_types = {} for keyword in keywords: if keyword['type'] not in keyword_types: keyword_types[keyword['type']] = [] keyword_types[keyword['type']].append(keyword) # 显示每个类型的关键词 for k_type, k_words in keyword_types.items(): with st.expander(f"{k_type} 关键词 ({len(k_words)}个)"): cols = st.columns(3) for i, keyword in enumerate(k_words[:6]): # 每个类型最多显示6个 with cols[i % 3]: st.info(f"**{keyword['word']}** (出现{keyword['count']}次)") else: st.info("未检测到关键词") st.markdown("---") # 显示核心流失指标 st.markdown("### 📈 流失风险指标") col1, col2, col3, col4 = st.columns(4) probability = churn_prediction["churn_probability"] churn_level = churn_prediction["churn_level"] confidence = churn_prediction["prediction_confidence"] # 流失概率(带颜色指示) with col1: if probability < 0.3: st.metric("🟢 流失概率", f"{probability:.2%}", delta="低风险") elif probability < 0.7: st.metric("🟡 流失概率", f"{probability:.2%}", delta="中风险") else: st.metric("🔴 流失概率", f"{probability:.2%}", delta="高风险") # 风险等级 with col2: level_colors = {"low": "🟢", "medium": "🟡", "high": "🔴"} st.metric(f"{level_colors[churn_level]} 风险等级", churn_level.upper()) # 预测置信度 with col3: if confidence > 0.85: st.metric("🎯 预测置信度", f"{confidence:.1%}", delta="高") elif confidence > 0.7: st.metric("🎯 预测置信度", f"{confidence:.1%}", delta="中") else: st.metric("🎯 预测置信度", f"{confidence:.1%}", delta="低") # 风险因素数量 with col4: risk_count = len(churn_prediction["risk_factors"]) if risk_count == 0: st.metric("✅ 风险因素", risk_count, delta="无风险") elif risk_count <= 2: st.metric("⚠️ 风险因素", risk_count, delta="低风险") else: st.metric("🚨 风险因素", risk_count, delta="高风险") st.markdown("---") # 详细风险分析(基于对话内容) st.markdown("### ⚠️ 基于对话的风险分析") risk_factors = churn_prediction["risk_factors"] if risk_factors: if isinstance(risk_factors[0], dict): # 按影响程度排序 impact_order = {"极高": 4, "高": 3, "中": 2, "低": 1} sorted_factors = sorted(risk_factors, key=lambda x: impact_order.get(x.get("impact", "低"), 0), reverse=True) for i, factor in enumerate(sorted_factors, 1): with st.expander(f"{i}. {factor['factor']} ({factor['impact']}影响)", expanded=i<=3): col1, col2 = st.columns([1, 3]) with col1: st.write(f"**当前值**: {factor['value']}") st.write(f"**阈值**: {factor['threshold']}") with col2: st.write(f"**说明**: {factor['description']}") else: for factor in risk_factors: st.write(f"• {factor}") else: # 当没有风险因素时,显示友好的提示 st.info("无明显风险因素 (低影响)") col1, col2 = st.columns([1, 3]) with col1: st.write("**当前值**: -") st.write("**阈值**: -") with col2: st.write("**说明**: 当前用户行为未表现出明显的流失风险") # 存储分析结果到会话状态,供对话客服使用 st.session_state.current_analysis = { "user_id": user_id, "content": user_content, "analysis": conversation_analysis, "churn_prediction": churn_prediction } # 分析完成提示 st.success("✅ 基于对话内容的流失分析完成!") st.info("💡 分析结果已存储,请返回【对话客服】查看客服响应") def show_system_settings(customer_service: SmartCustomerService): """显示系统设置界面""" st.markdown("

⚙️ 系统设置

", unsafe_allow_html=True) st.markdown("### 🔧 模型配置") st.info("当前使用优化后的XGBoost模型进行流失预测") st.markdown("### 📈 系统状态") col1, col2 = st.columns(2) with col1: st.metric("对话记录数", len(customer_service.conversation_history)) with col2: model_status = "已训练" if customer_service.churn_model else "未训练" st.metric("模型状态", model_status) st.markdown("### 🔄 数据管理") if st.button("清空对话历史"): customer_service.conversation_history = [] st.success("对话历史已清空") if st.button("清除流失分析结果"): if 'current_analysis' in st.session_state: del st.session_state.current_analysis st.success("流失分析结果已清除") else: st.info("当前没有流失分析结果") def show_smart_customer_service(): """显示智能客服界面""" import streamlit as st # 航空主题样式 st.markdown(""" """, unsafe_allow_html=True) st.title("💬 智能客服系统") # 初始化客服系统 if 'customer_service' not in st.session_state: # 使用智能路径查找数据文件 data_paths = [ "d:/HuaweiMoveData/Users/马艺洁/Desktop/MLwork/bigwork/data/Tweets.csv", "data/Tweets.csv", "../data/Tweets.csv" ] data_path = None for path in data_paths: import os if os.path.exists(path): data_path = path break if data_path is None: st.error("无法找到数据文件,请检查Tweets.csv文件位置") return st.session_state.customer_service = SmartCustomerService(data_path) customer_service = st.session_state.customer_service # 创建选项卡(调整顺序:流失分析在对话客服之前) tab1, tab2, tab3 = st.tabs(["📊 流失分析", "💬 对话客服", "⚙️ 系统设置"]) with tab1: show_churn_analysis(customer_service) with tab2: show_chat_interface(customer_service) with tab3: show_system_settings(customer_service) if __name__ == "__main__": show_smart_customer_service()