185 lines
6.9 KiB
Python
185 lines
6.9 KiB
Python
|
|
from pydantic import BaseModel, Field
|
|||
|
|
import joblib
|
|||
|
|
import json
|
|||
|
|
import pandas as pd
|
|||
|
|
import numpy as np
|
|||
|
|
import logging
|
|||
|
|
import os
|
|||
|
|
import random
|
|||
|
|
|
|||
|
|
# 配置日志
|
|||
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
# ==========================================
|
|||
|
|
# 1. 定义 Pydantic 数据模型
|
|||
|
|
# ==========================================
|
|||
|
|
|
|||
|
|
class CustomerFeatures(BaseModel):
|
|||
|
|
"""客户特征输入模型"""
|
|||
|
|
age: int = Field(ge=18, le=120, description="客户年龄")
|
|||
|
|
job: str = Field(description="职业")
|
|||
|
|
marital: str = Field(description="婚姻状况")
|
|||
|
|
education: str = Field(description="教育程度")
|
|||
|
|
default: str = Field(pattern="^(yes|no)$", description="是否有违约记录")
|
|||
|
|
balance: int = Field(description="账户余额")
|
|||
|
|
housing: str = Field(pattern="^(yes|no)$", description="是否有住房贷款")
|
|||
|
|
loan: str = Field(pattern="^(yes|no)$", description="是否有个人贷款")
|
|||
|
|
contact: str = Field(description="联系方式")
|
|||
|
|
day: int = Field(ge=1, le=31, description="最后联系日")
|
|||
|
|
month: str = Field(description="最后联系月份")
|
|||
|
|
campaign: int = Field(ge=1, description="本次活动联系次数")
|
|||
|
|
pdays: int = Field(description="距离上次联系天数 (-1代表未联系)")
|
|||
|
|
previous: int = Field(ge=0, description="活动前联系次数")
|
|||
|
|
poutcome: str = Field(description="上次活动结果")
|
|||
|
|
|
|||
|
|
# 注意:我们不包含 duration,因为它是事后变量
|
|||
|
|
|
|||
|
|
class Decision(BaseModel):
|
|||
|
|
"""Agent 输出的结构化决策"""
|
|||
|
|
risk_score: float = Field(ge=0, le=1, description="预测购买概率 (0-1)")
|
|||
|
|
customer_segment: str = Field(description="客户分群 (如: 高价值/潜在/沉睡)")
|
|||
|
|
decision: str = Field(description="建议策略 (如: 立即致电/邮件触达/放弃)")
|
|||
|
|
actions: list[str] = Field(description="可执行动作清单")
|
|||
|
|
rationale: str = Field(description="决策依据 (结合模型预测与业务规则)")
|
|||
|
|
|
|||
|
|
# ==========================================
|
|||
|
|
# 2. 定义 Agent 类
|
|||
|
|
# ==========================================
|
|||
|
|
|
|||
|
|
class MarketingAgent:
|
|||
|
|
def __init__(self, model_path="models/model_artifacts.pkl"):
|
|||
|
|
self.model_path = model_path
|
|||
|
|
self.artifacts = None
|
|||
|
|
self._load_model()
|
|||
|
|
|
|||
|
|
def _load_model(self):
|
|||
|
|
if os.path.exists(self.model_path):
|
|||
|
|
self.artifacts = joblib.load(self.model_path)
|
|||
|
|
logger.info(f"Agent 已加载模型: {self.model_path}")
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"模型文件不存在: {self.model_path},Agent 将无法进行预测")
|
|||
|
|
|
|||
|
|
def predict_risk(self, features: CustomerFeatures) -> dict:
|
|||
|
|
"""
|
|||
|
|
Tool 1: 调用 ML 模型预测购买概率
|
|||
|
|
"""
|
|||
|
|
if not self.artifacts:
|
|||
|
|
return {"score": 0.0, "reason": "Model not loaded"}
|
|||
|
|
|
|||
|
|
# 转换输入为 DataFrame
|
|||
|
|
data = features.model_dump()
|
|||
|
|
df = pd.DataFrame([data])
|
|||
|
|
|
|||
|
|
# 预处理 (使用训练时保存的 encoder)
|
|||
|
|
# 注意:这里需要严格复现训练时的预处理逻辑
|
|||
|
|
# 训练时我们做了 Label Encoding
|
|||
|
|
for col, le in self.artifacts['encoders'].items():
|
|||
|
|
if col in df.columns:
|
|||
|
|
# 处理未知类别
|
|||
|
|
try:
|
|||
|
|
df[col] = le.transform(df[col].astype(str))
|
|||
|
|
except:
|
|||
|
|
# 遇到未知类别,这里简单处理为 0 (或者 mode)
|
|||
|
|
logger.warning(f"Unknown category in {col}")
|
|||
|
|
df[col] = 0
|
|||
|
|
|
|||
|
|
# 确保列顺序一致
|
|||
|
|
# 我们训练时用了 X (df.drop(target))
|
|||
|
|
# 这里需要筛选出 numeric_cols + categorical_cols
|
|||
|
|
# 简单起见,我们假设 feature names 保存了顺序
|
|||
|
|
feature_names = self.artifacts['features']
|
|||
|
|
|
|||
|
|
# 补齐可能缺失的列
|
|||
|
|
for col in feature_names:
|
|||
|
|
if col not in df.columns:
|
|||
|
|
df[col] = 0
|
|||
|
|
|
|||
|
|
X_input = df[feature_names]
|
|||
|
|
|
|||
|
|
# 预测
|
|||
|
|
model = self.artifacts['lgb_model'] # 优先使用 LightGBM
|
|||
|
|
prob = model.predict_proba(X_input)[0][1]
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"score": float(prob),
|
|||
|
|
"top_features": ["balance", "poutcome"] # 这里简化,实际可用 SHAP
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def get_strategy(self, score: float) -> dict:
|
|||
|
|
"""
|
|||
|
|
Tool 2: 规则引擎/检索工具
|
|||
|
|
"""
|
|||
|
|
if score > 0.6:
|
|||
|
|
return {
|
|||
|
|
"segment": "高意向 VIP",
|
|||
|
|
"action_type": "人工介入",
|
|||
|
|
"templates": ["尊贵的客户,鉴于您...", "专属理财经理一对一服务"]
|
|||
|
|
}
|
|||
|
|
elif score > 0.3:
|
|||
|
|
return {
|
|||
|
|
"segment": "潜在客户",
|
|||
|
|
"action_type": "自动化营销",
|
|||
|
|
"templates": ["你好,近期理财活动...", "点击领取加息券"]
|
|||
|
|
}
|
|||
|
|
else:
|
|||
|
|
return {
|
|||
|
|
"segment": "低意向群体",
|
|||
|
|
"action_type": "静默/邮件",
|
|||
|
|
"templates": ["月度财经摘要"]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def run(self, features: CustomerFeatures) -> Decision:
|
|||
|
|
"""
|
|||
|
|
Agent 主流程
|
|||
|
|
"""
|
|||
|
|
logger.info(f"Agent 正在处理客户: {features.job}, {features.age}岁")
|
|||
|
|
|
|||
|
|
# 1. 感知 (调用 ML 工具)
|
|||
|
|
pred_result = self.predict_risk(features)
|
|||
|
|
score = pred_result["score"]
|
|||
|
|
|
|||
|
|
# 2. 规划 (调用 策略工具)
|
|||
|
|
strategy = self.get_strategy(score)
|
|||
|
|
|
|||
|
|
# 3. 决策 (模拟 LLM 整合)
|
|||
|
|
# 在真实场景中,这里构建 Prompt 发送给 DeepSeek
|
|||
|
|
# 这里我们用 Python 逻辑模拟 LLM 的结构化输出能力
|
|||
|
|
|
|||
|
|
decision = Decision(
|
|||
|
|
risk_score=round(score, 4),
|
|||
|
|
customer_segment=strategy["segment"],
|
|||
|
|
decision=f"建议采取 {strategy['action_type']}",
|
|||
|
|
actions=[f"使用话术: {t}" for t in strategy["templates"]],
|
|||
|
|
rationale=f"模型预测概率为 {score:.1%},属于{strategy['segment']}。该群体对{strategy['action_type']}转化率较高。"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return decision
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# 测试 Agent
|
|||
|
|
agent = MarketingAgent()
|
|||
|
|
|
|||
|
|
# 构造一个测试用例
|
|||
|
|
test_customer = CustomerFeatures(
|
|||
|
|
age=35,
|
|||
|
|
job="management",
|
|||
|
|
marital="married",
|
|||
|
|
education="tertiary",
|
|||
|
|
default="no",
|
|||
|
|
balance=2000,
|
|||
|
|
housing="yes",
|
|||
|
|
loan="no",
|
|||
|
|
contact="cellular",
|
|||
|
|
day=15,
|
|||
|
|
month="may",
|
|||
|
|
campaign=1,
|
|||
|
|
pdays=-1,
|
|||
|
|
previous=0,
|
|||
|
|
poutcome="unknown"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
result = agent.run(test_customer)
|
|||
|
|
print("\n=== Agent Decision ===")
|
|||
|
|
print(result.model_dump_json(indent=2))
|