上传文件至 src/__pycache__

This commit is contained in:
林嘉烨 2026-01-16 19:36:56 +08:00
parent 13c2d66aa0
commit 736628bb2c
4 changed files with 518 additions and 0 deletions

View File

@ -0,0 +1,184 @@
from pydantic import BaseModel, Field
import joblib
import json
import pandas as pd
import numpy as np
import logging
import os
import random
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==========================================
# 1. 定义 Pydantic 数据模型
# ==========================================
class CustomerFeatures(BaseModel):
"""客户特征输入模型"""
age: int = Field(ge=18, le=120, description="客户年龄")
job: str = Field(description="职业")
marital: str = Field(description="婚姻状况")
education: str = Field(description="教育程度")
default: str = Field(pattern="^(yes|no)$", description="是否有违约记录")
balance: int = Field(description="账户余额")
housing: str = Field(pattern="^(yes|no)$", description="是否有住房贷款")
loan: str = Field(pattern="^(yes|no)$", description="是否有个人贷款")
contact: str = Field(description="联系方式")
day: int = Field(ge=1, le=31, description="最后联系日")
month: str = Field(description="最后联系月份")
campaign: int = Field(ge=1, description="本次活动联系次数")
pdays: int = Field(description="距离上次联系天数 (-1代表未联系)")
previous: int = Field(ge=0, description="活动前联系次数")
poutcome: str = Field(description="上次活动结果")
# 注意:我们不包含 duration因为它是事后变量
class Decision(BaseModel):
"""Agent 输出的结构化决策"""
risk_score: float = Field(ge=0, le=1, description="预测购买概率 (0-1)")
customer_segment: str = Field(description="客户分群 (如: 高价值/潜在/沉睡)")
decision: str = Field(description="建议策略 (如: 立即致电/邮件触达/放弃)")
actions: list[str] = Field(description="可执行动作清单")
rationale: str = Field(description="决策依据 (结合模型预测与业务规则)")
# ==========================================
# 2. 定义 Agent 类
# ==========================================
class MarketingAgent:
def __init__(self, model_path="models/model_artifacts.pkl"):
self.model_path = model_path
self.artifacts = None
self._load_model()
def _load_model(self):
if os.path.exists(self.model_path):
self.artifacts = joblib.load(self.model_path)
logger.info(f"Agent 已加载模型: {self.model_path}")
else:
logger.warning(f"模型文件不存在: {self.model_path}Agent 将无法进行预测")
def predict_risk(self, features: CustomerFeatures) -> dict:
"""
Tool 1: 调用 ML 模型预测购买概率
"""
if not self.artifacts:
return {"score": 0.0, "reason": "Model not loaded"}
# 转换输入为 DataFrame
data = features.model_dump()
df = pd.DataFrame([data])
# 预处理 (使用训练时保存的 encoder)
# 注意:这里需要严格复现训练时的预处理逻辑
# 训练时我们做了 Label Encoding
for col, le in self.artifacts['encoders'].items():
if col in df.columns:
# 处理未知类别
try:
df[col] = le.transform(df[col].astype(str))
except:
# 遇到未知类别,这里简单处理为 0 (或者 mode)
logger.warning(f"Unknown category in {col}")
df[col] = 0
# 确保列顺序一致
# 我们训练时用了 X (df.drop(target))
# 这里需要筛选出 numeric_cols + categorical_cols
# 简单起见,我们假设 feature names 保存了顺序
feature_names = self.artifacts['features']
# 补齐可能缺失的列
for col in feature_names:
if col not in df.columns:
df[col] = 0
X_input = df[feature_names]
# 预测
model = self.artifacts['lgb_model'] # 优先使用 LightGBM
prob = model.predict_proba(X_input)[0][1]
return {
"score": float(prob),
"top_features": ["balance", "poutcome"] # 这里简化,实际可用 SHAP
}
def get_strategy(self, score: float) -> dict:
"""
Tool 2: 规则引擎/检索工具
"""
if score > 0.6:
return {
"segment": "高意向 VIP",
"action_type": "人工介入",
"templates": ["尊贵的客户,鉴于您...", "专属理财经理一对一服务"]
}
elif score > 0.3:
return {
"segment": "潜在客户",
"action_type": "自动化营销",
"templates": ["你好,近期理财活动...", "点击领取加息券"]
}
else:
return {
"segment": "低意向群体",
"action_type": "静默/邮件",
"templates": ["月度财经摘要"]
}
def run(self, features: CustomerFeatures) -> Decision:
"""
Agent 主流程
"""
logger.info(f"Agent 正在处理客户: {features.job}, {features.age}")
# 1. 感知 (调用 ML 工具)
pred_result = self.predict_risk(features)
score = pred_result["score"]
# 2. 规划 (调用 策略工具)
strategy = self.get_strategy(score)
# 3. 决策 (模拟 LLM 整合)
# 在真实场景中,这里构建 Prompt 发送给 DeepSeek
# 这里我们用 Python 逻辑模拟 LLM 的结构化输出能力
decision = Decision(
risk_score=round(score, 4),
customer_segment=strategy["segment"],
decision=f"建议采取 {strategy['action_type']}",
actions=[f"使用话术: {t}" for t in strategy["templates"]],
rationale=f"模型预测概率为 {score:.1%},属于{strategy['segment']}。该群体对{strategy['action_type']}转化率较高。"
)
return decision
if __name__ == "__main__":
# 测试 Agent
agent = MarketingAgent()
# 构造一个测试用例
test_customer = CustomerFeatures(
age=35,
job="management",
marital="married",
education="tertiary",
default="no",
balance=2000,
housing="yes",
loan="no",
contact="cellular",
day=15,
month="may",
campaign=1,
pdays=-1,
previous=0,
poutcome="unknown"
)
result = agent.run(test_customer)
print("\n=== Agent Decision ===")
print(result.model_dump_json(indent=2))

92
src/__pycache__/data.py Normal file
View File

@ -0,0 +1,92 @@
import polars as pl
import pandera as pa
from pandera import Column, Check, DataFrameSchema
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==========================================
# 1. 定义 Pandera Schema (数据契约)
# ==========================================
# 原始数据 Schema
raw_schema = DataFrameSchema({
"age": Column(int, checks=Check.ge(18)),
"job": Column(str),
"marital": Column(str),
"education": Column(str),
"default": Column(str, checks=Check.isin(["yes", "no"])),
"balance": Column(int),
"housing": Column(str, checks=Check.isin(["yes", "no"])),
"loan": Column(str, checks=Check.isin(["yes", "no"])),
"contact": Column(str),
"day": Column(int, checks=[Check.ge(1), Check.le(31)]),
"month": Column(str),
"duration": Column(int, checks=Check.ge(0)),
"campaign": Column(int, checks=Check.ge(1)),
"pdays": Column(int),
"previous": Column(int, checks=Check.ge(0)),
"poutcome": Column(str),
"deposit": Column(str, checks=Check.isin(["yes", "no"])),
})
# 清洗后 Schema
processed_schema = DataFrameSchema({
"age": Column(int),
"balance": Column(int),
"deposit": Column(int, checks=Check.isin([0, 1])),
# 其他数值化或保留的特征...
})
def load_and_clean_data(file_path: str):
"""
使用 Polars 加载并清洗数据
"""
logger.info(f"正在加载数据: {file_path}")
# 1. Lazy Load
lf = pl.scan_csv(file_path)
# 2. 初步清洗计划
# - 移除 duration (避免数据泄露)
# - 将 deposit (yes/no) 转换为 (1/0)
# - 简单的分类变量编码 (为了 LightGBM我们可以保留分类类型或做 Label Encoding)
# LightGBM 原生支持 Category但 sklearn 需要数值。
# 为了通用性,这里做 Label Encoding 或者 One-Hot。
# 但 Polars 的 Label Encoding 比较手动。
# 我们这里先只做核心转换。
processed_lf = (
lf.drop(["duration"]) # 移除泄露特征
.with_columns([
pl.col("deposit").replace({"yes": 1, "no": 0}).cast(pl.Int64).alias("target"),
# 简单的特征工程示例:将 pdays -1 处理为 999 或单独一类 (这里保持原样,树模型能处理)
])
.drop("deposit") # 移除原始标签列,保留 target
)
# 3. 执行计算 (Collect)
df = processed_lf.collect()
logger.info(f"数据加载完成,形状: {df.shape}")
# 4. Pandera 验证 (转换回 Pandas 验证,因为 Pandera 对 Polars 支持尚在实验阶段或部分支持)
# 这里我们验证关键字段
try:
# 简单验证一下 target 是否只有 0 和 1
assert df["target"].n_unique() <= 2
logger.info("基础数据验证通过")
except Exception as e:
logger.error(f"数据验证失败: {e}")
raise e
return df
if __name__ == "__main__":
# 测试代码
try:
df = load_and_clean_data("data/bank.csv")
print(df.head())
except Exception as e:
print(f"Error: {e}")

View File

@ -0,0 +1,152 @@
import streamlit as st
import pandas as pd
import joblib
import os
import sys
# 添加项目根目录到 Path 以便导入 src
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.agent_app import MarketingAgent, CustomerFeatures
# 配置页面
st.set_page_config(
page_title="智能银行营销助手",
page_icon="🤖",
layout="wide"
)
# 侧边栏:项目信息
with st.sidebar:
st.title("🏦 智能营销系统")
st.markdown("---")
st.info("**Day 5 演示版**")
st.markdown("""
**核心能力**
1. 📊 **LightGBM** 客户购买预测
2. 🧠 **Agent** 策略生成
3. 📝 **Pydantic** 结构化输出
""")
st.markdown("---")
st.caption("由第 X 组开发")
# 主界面
st.title("🤖 客户意向预测与决策系统")
# 1. 模拟客户输入
st.header("1. 录入客户信息")
col1, col2, col3 = st.columns(3)
# 映射字典
job_map = {
"management": "管理人员", "technician": "技术人员", "entrepreneur": "企业家",
"blue-collar": "蓝领", "unknown": "未知", "retired": "退休人员",
"admin.": "行政人员", "services": "服务业", "self-employed": "自雇人士",
"unemployed": "失业", "maid": "家政", "student": "学生"
}
education_map = {"tertiary": "高等教育", "secondary": "中等教育", "primary": "初等教育", "unknown": "未知"}
marital_map = {"married": "已婚", "single": "单身", "divorced": "离异"}
binary_map = {"yes": "", "no": ""}
contact_map = {"cellular": "手机", "telephone": "座机", "unknown": "未知"}
month_map = {
"jan": "1月", "feb": "2月", "mar": "3月", "apr": "4月", "may": "5月", "jun": "6月",
"jul": "7月", "aug": "8月", "sep": "9月", "oct": "10月", "nov": "11月", "dec": "12月"
}
poutcome_map = {"unknown": "未知", "failure": "失败", "other": "其他", "success": "成功"}
# 辅助函数:反向查找 key
def get_key(val, my_dict):
for key, value in my_dict.items():
if val == value: return key
return val
with col1:
age = st.number_input("年龄", 18, 100, 30)
job_display = st.selectbox("职业", list(job_map.values()))
job = get_key(job_display, job_map)
education_display = st.selectbox("教育", list(education_map.values()))
education = get_key(education_display, education_map)
balance = st.number_input("账户余额 (欧元)", -1000, 100000, 1500)
with col2:
marital_display = st.selectbox("婚姻", list(marital_map.values()))
marital = get_key(marital_display, marital_map)
housing_display = st.selectbox("是否有房贷", list(binary_map.values()))
housing = get_key(housing_display, binary_map)
loan_display = st.selectbox("是否有个人贷", list(binary_map.values()))
loan = get_key(loan_display, binary_map)
default_display = st.selectbox("是否有违约记录", list(binary_map.values()))
default = get_key(default_display, binary_map)
with col3:
contact_display = st.selectbox("联系方式", list(contact_map.values()))
contact = get_key(contact_display, contact_map)
month_display = st.selectbox("最后联系月份", list(month_map.values()))
month = get_key(month_display, month_map)
day = st.slider("最后联系日", 1, 31, 15)
poutcome_display = st.selectbox("上次活动结果", list(poutcome_map.values()))
poutcome = get_key(poutcome_display, poutcome_map)
# 隐藏的高级特征
with st.expander("高级营销特征 (可选)"):
campaign = st.number_input("本次活动联系次数", 1, 50, 1)
pdays = st.number_input("距离上次联系天数 (-1代表无)", -1, 999, -1)
previous = st.number_input("活动前联系次数", 0, 100, 0)
# 2. 触发 Agent
if st.button("🚀 开始分析与决策", type="primary"):
try:
# 构造 Pydantic 对象
customer = CustomerFeatures(
age=age, job=job, marital=marital, education=education,
default=default, balance=balance, housing=housing, loan=loan,
contact=contact, day=day, month=month,
campaign=campaign, pdays=pdays, previous=previous, poutcome=poutcome
)
# 初始化 Agent
with st.spinner("Agent 正在加载模型并思考..."):
agent = MarketingAgent()
decision = agent.run(customer)
# 3. 展示结果
st.divider()
st.header("2. 智能分析报告")
# 结果看板
res_col1, res_col2 = st.columns([1, 2])
with res_col1:
st.metric("预测购买概率", f"{decision.risk_score:.1%}")
if decision.risk_score > 0.6:
st.success(f"分群:{decision.customer_segment}")
elif decision.risk_score > 0.3:
st.warning(f"分群:{decision.customer_segment}")
else:
st.error(f"分群:{decision.customer_segment}")
with res_col2:
st.subheader("💡 决策建议")
st.info(decision.decision)
st.markdown(f"**决策依据:** {decision.rationale}")
# 行动清单
st.subheader("📝 执行清单")
for i, action in enumerate(decision.actions, 1):
st.write(f"{i}. {action}")
# JSON 视图
with st.expander("查看原始 JSON 输出 (Traceable)"):
st.json(decision.model_dump())
except Exception as e:
st.error(f"发生错误: {str(e)}")

90
src/__pycache__/train.py Normal file
View File

@ -0,0 +1,90 @@
import polars as pl
import pandas as pd
import lightgbm as lgb
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score, f1_score
from sklearn.preprocessing import LabelEncoder, StandardScaler
import joblib
import logging
import os
from src.data import load_and_clean_data
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def train_models(data_path="data/bank.csv", model_dir="models"):
# 1. 加载数据
df_pl = load_and_clean_data(data_path)
df = df_pl.to_pandas() # 转换为 Pandas 以兼容 Sklearn
# 2. 特征预处理
# 区分分类和数值特征
target_col = "target"
X = df.drop(columns=[target_col])
y = df[target_col]
cat_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()
num_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
# Label Encoding (为了简化LightGBM 可以直接处理 Category但 Sklearn 需要编码)
encoders = {}
for col in cat_cols:
le = LabelEncoder()
X[col] = le.fit_transform(X[col].astype(str))
encoders[col] = le
# 3. 数据切分
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 4. 训练基线模型 (Logistic Regression)
logger.info("训练基线模型 (Logistic Regression)...")
# 逻辑回归需要归一化
scaler = StandardScaler()
X_train_scaled = X_train.copy()
X_test_scaled = X_test.copy()
X_train_scaled[num_cols] = scaler.fit_transform(X_train[num_cols])
X_test_scaled[num_cols] = scaler.transform(X_test[num_cols])
lr_model = LogisticRegression(max_iter=1000, random_state=42)
lr_model.fit(X_train_scaled, y_train)
lr_pred = lr_model.predict(X_test_scaled)
lr_prob = lr_model.predict_proba(X_test_scaled)[:, 1]
logger.info(f"Baseline F1: {f1_score(y_test, lr_pred):.4f}")
logger.info(f"Baseline AUC: {roc_auc_score(y_test, lr_prob):.4f}")
# 5. 训练进阶模型 (LightGBM)
logger.info("训练进阶模型 (LightGBM)...")
lgb_model = lgb.LGBMClassifier(n_estimators=100, learning_rate=0.05, random_state=42, verbose=-1)
lgb_model.fit(X_train, y_train)
lgb_pred = lgb_model.predict(X_test)
lgb_prob = lgb_model.predict_proba(X_test)[:, 1]
logger.info(f"LightGBM F1: {f1_score(y_test, lgb_pred):.4f}")
logger.info(f"LightGBM AUC: {roc_auc_score(y_test, lgb_prob):.4f}")
# 6. 保存模型与元数据
if not os.path.exists(model_dir):
os.makedirs(model_dir)
artifacts = {
"lgb_model": lgb_model,
"lr_model": lr_model,
"scaler": scaler,
"encoders": encoders,
"features": list(X.columns),
"cat_cols": cat_cols,
"num_cols": num_cols
}
joblib.dump(artifacts, os.path.join(model_dir, "model_artifacts.pkl"))
logger.info(f"模型已保存至 {model_dir}/model_artifacts.pkl")
return artifacts
if __name__ == "__main__":
train_models()