commit 4a92e626832da0466ed99f8cf2b8cedd8ab8f7de Author: sit002 Date: Thu Jan 1 11:19:17 2026 +0800 feat: Add Streamlit application for student grade prediction and AI counseling. diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..79f0540 --- /dev/null +++ b/.env.example @@ -0,0 +1,2 @@ +# DeepSeek API Key +DEEPSEEK_API_KEY=your-key-here diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..376834e --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.py[cod] +*$py.class +.venv/ +.env +.DS_Store +models/*.pkl +.pytest_cache/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..dfa288d --- /dev/null +++ b/README.md @@ -0,0 +1,453 @@ +# 机器学习 × LLM × Agent:课程设计 + +> **小组作业** | 2–3 人/组 | 构建一个「可落地的智能预测与行动建议系统」 + +用传统机器学习完成可量化的预测任务,再用 LLM + Agent 把预测结果变成可执行的决策/建议,并保证输出结构化、可追溯、可复现。 + +--- + +## 📑 目录 + +- [快速开始](#-快速开始) +- [技术栈要求](#0-技术栈要求强制) +- [选题指南](#1-选题三档难度任选其一) + - [Level 1:入门](#level-1入门表格预测--行动建议闭环) + - [Level 2:进阶](#level-2进阶文本任务--处置建议回复生成) + - [Level 3:高阶](#level-3高阶不平衡多表时序--多步决策-agent) +- [自选题目标准](#2-自选题目标准) +- [代码示例](#3-deepseek--pydantic-ai最小可运行示例) +- [代码规范](#4-代码规范) +- [项目结构](#5-建议项目结构) +- [交付物与评分](#6-交付物与评分) +- [参考资料](#7-参考资料) + +--- + +## 🚀 快速开始 + +```bash +# 1. 克隆/Fork 本模板仓库 +git clone +cd ml_course_design_template + +# 2. 创建虚拟环境并安装依赖 +python -m venv .venv +source .venv/bin/activate # Windows: .venv\Scripts\activate +pip install -r requirements.txt + +# 3. 配置 DeepSeek API Key(不要提交到仓库!) +# 复制示例配置 +cp .env.example .env +# 编辑 .env 文件,填入你的 API Key +# DEEPSEEK_API_KEY="your-key-here" + +# 4. 运行示例 +# 方式 A:运行 Streamlit 可视化 Demo(推荐) +python -m streamlit run src/streamlit_app.py + +# 方式 B:运行命令行 Agent Demo +python src/agent_app.py +``` + +--- + +## 0. 技术栈要求(强制) + +| 组件 | 要求 | +|------|------| +| **人数** | 2–3 人/组 | +| **Agent 框架** | `pydantic-ai` | +| **LLM 提供方** | `DeepSeek`(OpenAI 兼容 API) | + +### 必须包含的三块能力 + +| 能力 | 说明 | +|------|------| +| **传统机器学习** | 可复现训练流程、离线评估指标、模型保存与加载 | +| **LLM** | 用于解释、归因、生成建议/回复、信息整合(不能凭空杜撰) | +| **Agent** | 用工具调用把系统串起来(至少 2 个 tool,其中 1 个必须是 ML 预测/评估相关工具) | + +--- + +## 1. 选题:三档难度(任选其一) + +你们可以先选难度档位,再从数据集列表里选一个;也可以自选(见「自选题目标准」)。 + +> ⚠️ **注意**:Level 1/2/3 **都可以拿满分**;高难度通常更容易体现"深度",但不会因为选 Level 1 就被封顶。 + +--- + +### Level 1|入门:表格预测 + 行动建议闭环 + +> 📌 **建议新手选择** + +**目标**:做一个结构化数据的分类/回归模型,并让 Agent 基于模型输出给出可执行建议(如挽留、风控、营销、分诊)。 + +#### 交付的系统能力 + +- 训练并保存一个基线模型(如 `LogReg` / `LightGBM` / `RandomForest`) +- 提供 `predict_proba` / `predict` 工具给 Agent 调用 +- Agent 输出**结构化决策**(Pydantic schema),并能说明建议与关键特征/规则的关联 + +#### 推荐数据集(任选其一) + +| 数据集 | 链接 | +|--------|------| +| Telco Customer Churn | [Kaggle](https://www.kaggle.com/datasets/blastchar/telco-customer-churn) | +| German Credit Risk | [Kaggle](https://www.kaggle.com/datasets/uciml/german-credit) | +| Bank Marketing | [Kaggle](https://www.kaggle.com/datasets/janiobachmann/bank-marketing-dataset) | +| Heart Failure Prediction | [Kaggle](https://www.kaggle.com/datasets/fedesoriano/heart-failure-prediction) | + +#### 最低基准 + +| 任务类型 | 指标要求 | +|----------|----------| +| 二分类 | `F1 ≥ 0.70` 或 `ROC-AUC ≥ 0.75` | +| 回归 | `MAE/RMSE` 相对朴素基线(均值/中位数)**显著提升**,需在报告中说明 | + +#### 参考练习清单 + +**传统 ML(必须)**: +- 做一条可复现的数据流水线:缺失值处理、类别编码、数值缩放、训练/验证切分(并写清"为什么这样切分") +- 至少 2 个模型对比:一个可解释基线(如 Logistic Regression),一个更强模型(如 LightGBM/RandomForest) +- 做 1 次误差分析:Top 误判样本/分桶(例如按年龄段/合同类型)看哪里最容易错 + +**Agent(必须)**: +- 定义结构化输出(Pydantic):`risk_score + decision + actions + rationale` +- 至少 2 个工具(tool),其中 1 个必须调用你们的 ML(例如 `predict_risk(features)`) +- 推荐再加 1 个解释类工具:`explain_top_features(features) -> list[str]`(可用系数/重要性/规则,不要求复杂 SHAP) + +**系统闭环(建议做到)**: +- 阈值/策略选择:把"预测概率"变成"要不要干预/怎么干预"(例如:高风险→人工复核;中风险→短信挽留;低风险→不操作) +- 做一个简单"成本-收益"版本:每个动作给一个成本/收益假设,让 Agent 给出最划算的动作组合(在报告里写清假设) + +--- + +### Level 2|进阶:文本任务 + 处置建议/回复生成 + +> 📌 **NLP 向** + +**目标**:做文本分类/情感/主题/工单分流等任务,并让 Agent 生成"可执行处置方案"(如回复、升级、分派、风险提示),且输出必须结构化并可审计。 + +#### 交付的系统能力 + +- 传统 ML NLP 基线(如 `TF-IDF + Linear Model`)或轻量深度模型(可选) +- Agent 支持「分类 → 解释 → 生成建议/回复模板」的流程,并能引用证据(例如关键词、相似样本) + +#### 推荐数据集(任选其一) + +| 数据集 | 链接 | 说明 | +|--------|------|------| +| Twitter US Airline Sentiment | [Kaggle](https://www.kaggle.com/datasets/crowdflower/twitter-airline-sentiment) | 航空公司情感分析 | +| IMDB 50K Movie Reviews | [Kaggle](https://www.kaggle.com/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews) | 电影评论情感 | +| SMS Spam Collection | [Kaggle](https://www.kaggle.com/datasets/uciml/sms-spam-collection-dataset) | 垃圾短信分类 | +| Consumer Complaints | [Kaggle](https://www.kaggle.com/datasets/selener/consumer-complaint-database) | 投诉分流(很适配 Agent) | + +#### 最低基准 + +| 任务类型 | 指标要求 | +|----------|----------| +| 分类 | `Accuracy ≥ 0.85` 或 `Macro-F1 ≥ 0.80` | + +#### 参考练习清单 + +**传统 ML NLP(必须)**: +- 基线:`TF-IDF + LogisticRegression/LinearSVC`,再加入至少一个更强的模型(如 `LightGBM`, `RandomForest`, 或轻量 BERT 模型);并报告 `macro-F1`、混淆矩阵、典型错例 +- 文本清洗要「克制」:说明你做了哪些预处理、为什么(不要把标签信息泄露进特征) + +**Agent(必须)**: +- 工单/评论 → `classify_text(text)`(ML 工具)→ `draft_response(label, evidence)`(LLM)→ 输出结构化处置方案 +- 建议加入 1 个「证据」工具:`extract_evidence(text) -> list[str]`(关键词/关键句/相似样本 id) + +**更贴近真实(可选加分)**: +- **相似案例检索**:用 TF-IDF/Embedding 做 `retrieve_similar(text) -> top_k`,Agent 引用相似案例作为「可追溯证据」(禁止编造历史工单) +- **合规与安全**:对输出做规则检查(例如不得输出隐私信息/不得承诺无法兑现的政策),失败时让 Agent 重新生成 + +--- + +### Level 3|高阶:不平衡/多表/时序 + 多步决策 Agent + +> 📌 **真实世界约束** + +**目标**:选择一个更接近真实业务约束的任务(极度不平衡、多表关联、时序预测等),实现更强的评估与更可靠的 Agent 决策链路。 + +#### 交付的系统能力 + +- 针对数据特性选择合适指标与训练策略(不平衡采样、阈值策略、时间切分、泄露防护等) +- Agent 能进行多步决策(例如:先评估风险 → 再选择干预策略 → 再生成操作清单/工单内容) + +#### 推荐数据集(任选其一) + +| 数据集 | 链接 | 特点 | +|--------|------|------| +| Credit Card Fraud Detection | [Kaggle](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud) | 极度不平衡 | +| IEEE-CIS Fraud Detection | [Kaggle](https://www.kaggle.com/c/ieee-fraud-detection) | 多表/特征工程复杂 | +| M5 Forecasting - Accuracy | [Kaggle](https://www.kaggle.com/competitions/m5-forecasting-accuracy) | 时序预测 + 补货决策 | +| Instacart Market Basket | [Kaggle](https://www.kaggle.com/c/instacart-market-basket-analysis) | 多表 + 召回/推荐 | + +#### 最低基准 + +| 任务类型 | 指标要求 | +|----------|----------| +| 不平衡分类 | `PR-AUC` / `Recall@Precision` 等合理指标,并与 naive 基线对比 | +| 时序 | 必须使用时间切分评估(rolling/holdout),并与 naive 基线对比 | + +#### 参考练习清单 + +**不平衡分类路线(如欺诈)**: +- 重点不在 Accuracy:用 `PR-AUC` / `Recall@Precision` / `Cost` 等指标,给出阈值选择依据 +- 做 1 个「代价敏感」版本:例如漏报成本 > 误报成本,并让 Agent 基于代价输出策略 + +**多表路线(如电商/多表欺诈)**: +- 明确主键/外键与 join 规则,写出「数据泄露风险点清单」 +- 做一个可复现的特征构建模块(统计聚合、时间窗特征等) + +**时序路线(如 M5)**: +- 强制时间切分;至少对比 1 个朴素基线(naive/seasonal naive) +- 让 Agent 输出「补货/促销/库存」建议,并说明依据(趋势、季节性、置信区间) + +**Agent(必须)**: +- 至少 3 步决策:例如 `评估风险/预测 → 解释与约束检查 → 生成行动计划` +- 在报告里画一张「决策链路图」 + +--- + +## 2. 自选题目标准 + +> 💡 **鼓励自选题目**,但必须满足以下硬标准,并先提交 1 页 proposal(Markdown 即可) + +| 要求 | 说明 | +|------|------| +| **数据真实可获取** | 公开、可重复下载(Kaggle/UCI/OpenML/政府开放数据等),提供链接与数据说明 | +| **可量化预测任务** | 有明确标签/目标变量与评价指标;不得只做「聊天/生成」 | +| **业务闭环** | 能落到「下一步做什么」的决策/行动(由 Agent 负责) | +| **Agent 工具调用** | 至少 2 个 tools,其中 1 个必须是 ML 预测/评估/解释工具 | +| **规模与复杂度** | 样本量建议 ≥ 5,000(或能说明复杂性来自多表/长文本/时序);不得低于 Level 1 | +| **合规性** | 禁止爬取受限数据;禁止提交包含密钥/个人隐私数据的仓库内容 | + +--- + +## 3. DeepSeek + pydantic-ai:最小可运行示例 + +下面示例展示如何用 `pydantic-ai` 让 Agent 输出结构化结果,并通过 tool 调用你们训练好的模型。 + +```python +import os +from typing import Any + +from pydantic import BaseModel, Field +from pydantic_ai import Agent, RunContext + + +class Decision(BaseModel): + """Agent 输出的结构化决策""" + risk_score: float = Field(ge=0, le=1, description="预测风险/流失概率等,0~1") + decision: str = Field(description="建议采取的总体策略") + actions: list[str] = Field(description="可执行动作清单(3~8条)") + rationale: str = Field(description="基于哪些证据/特征做出建议(不要编造事实)") + + +# 使用 provider:model 简写(DeepSeek 是 OpenAI 兼容 API) +agent = Agent( + "deepseek:deepseek-chat", # 也可尝试 "deepseek:deepseek-reasoner" + output_type=Decision, + system_prompt=( + "你是业务决策助手。你必须先调用工具获取模型预测与可解释信息," + "再给出结构化决策与可执行动作;不得编造数据集中不存在的事实。" + ), +) + + +@agent.tool +def predict_risk(ctx: RunContext[Any], features: dict) -> float: + """调用训练好的 ML 模型,返回 0~1 的风险分数。 + + Args: + ctx: 运行上下文 + features: 特征字典,如 {"age": 35, "contract": "month-to-month", ...} + + Returns: + 风险概率值 (0~1) + """ + # TODO: 加载模型并预处理特征 + # model = joblib.load("models/model.pkl") + # X = preprocess(features) + # proba = model.predict_proba(X)[0, 1] + # return float(proba) + raise NotImplementedError("请实现模型调用逻辑") + + +@agent.tool +def explain_top_features(ctx: RunContext[Any], features: dict) -> list[str]: + """解释对预测结果影响最大的特征。 + + Args: + ctx: 运行上下文 + features: 特征字典 + + Returns: + 影响最大的特征列表,如 ["合同类型: 月付 (+0.25)", "任期: 2个月 (+0.15)"] + """ + # TODO: 实现特征重要性解释 + raise NotImplementedError("请实现特征解释逻辑") +``` + +### API Key 配置 + +> ⚠️ **重要**:不要把 Key 写进代码、不要提交到仓库! + +```bash +# macOS / Linux (zsh/bash) +export DEEPSEEK_API_KEY="your-key-here" + +# Windows (PowerShell) +$env:DEEPSEEK_API_KEY="your-key-here" +``` + +建议在项目根目录创建 `.env.example` 文件(提交到仓库),内容如下: + +``` +DEEPSEEK_API_KEY=your-key-here +``` + +然后复制为 `.env` 并填入真实 Key(`.env` 已在 `.gitignore` 中排除)。 + +--- + +## 4. 代码规范 + +本作业**不使用 CI/CD 作为评分项**,但代码质量会被严格检查。 + +| 要求 | 说明 | +|------|------| +| **可运行** | 在「干净环境」按 README 步骤能跑通训练与 demo | +| **可复现** | 固定随机种子;训练/评估脚本可重复得到同级别指标;关键超参可配置 | +| **结构清晰** | 模块划分合理;避免超长脚本;核心逻辑放 `src/`;数据处理、训练、推理、Agent 分离 | +| **类型提示与文档** | 对外 API 必须写 type hints 与 docstring | +| **不泄露** | 避免数据泄露(特别是时序/多表任务);报告中说明切分策略 | +| **安全** | 密钥用环境变量;仓库中提供 `.env.example` 但不得提交真实 `.env` | + +--- + +## 5. 建议项目结构 + +``` +ml_course_design_template/ +├── README.md # 项目说明 +├── requirements.txt # Python 依赖 +├── .env.example # 环境变量模板(不含真实密钥) +├── .gitignore # Git 忽略规则 +│ +├── data/ # 数据目录 +│ └── README.md # 数据来源说明(原始大数据不要提交) +│ +├── models/ # 训练产物(模型文件) +│ └── .gitkeep +│ +├── notebooks/ # 探索性分析(可选) +│ └── eda.ipynb +│ +├── src/ # 核心代码 +│ ├── __init__.py +│ ├── data.py # 数据读取/清洗/特征工程 +│ ├── train.py # 训练与离线评估 +│ ├── infer.py # 推理接口(给 Agent 的 tool 调用) +│ └── agent_app.py # pydantic-ai Agent 入口 +│ +└── tests/ # 测试(建议至少覆盖 3 个关键函数) + ├── __init__.py + ├── test_data.py + ├── test_model.py + └── test_agent.py +``` + +--- + +## 6. 交付物与评分 + +### 必交材料 + +| 材料 | 说明 | +|------|------| +| **代码仓库链接** | 组长提交 | +| **项目报告** | 4–8 页 Markdown/PDF:问题定义、数据说明、特征/模型、评估、Agent 设计、局限与改进 | +| **演示** | 5–8 分钟 demo(Streamlit / Gradio / Next.js + FastAPI 等),展示端到端流程 | + +### 评分标准(总分 100) + +#### A. 问题与数据(15 分) + +| 维度 | 分值 | 要求 | +|------|------|------| +| 任务定义清晰 | 5 | 标签/目标是什么、为什么重要、输入输出边界 | +| 数据说明完整 | 5 | 来源链接、字段含义、样本量、潜在偏差/缺失 | +| 切分与泄露防护 | 5 | 随机/分层/时间切分说明;明确避免目标泄露 | + +#### B. 传统机器学习(35 分) + +| 维度 | 分值 | 要求 | +|------|------|------| +| 基线与可复现训练 | 10 | 固定随机种子、训练脚本能跑通、基线合理 | +| 指标与对比 | 10 | 指标选择正确,并与至少 1 个强/弱基线对比 | +| 误差分析 | 10 | 展示错误样本/分桶/特征影响,给出改进方向 | +| 结果可信度 | 5 | 阈值/校准/稳定性(任选其一做到位即可) | + +#### C. LLM + Agent(35 分) + +| 维度 | 分值 | 要求 | +|------|------|------| +| 工具调用 | 10 | 至少 2 个 tools,能稳定调用 ML 工具(不是「假调用」) | +| 结构化输出 | 10 | Pydantic schema 清晰;字段有约束;失败能重试/兜底 | +| 建议可执行且有证据 | 10 | 能落地的动作清单,并能引用依据(禁止编造事实) | +| 边界与安全 | 5 | 能处理异常输入;对敏感输出做规则约束 | + +#### D. 工程与规范(15 分) + +| 维度 | 分值 | 要求 | +|------|------|------| +| 可运行与可复现 | 5 | README 步骤清楚;干净环境可复现;依赖明确 | +| 代码结构与风格 | 5 | 模块化、命名清晰、类型提示/Docstring 到位 | +| 演示质量 | 5 | demo 端到端连贯;能说明架构与关键取舍 | + +### ❌ 常见扣分项 + +- 训练/推理无法在助教环境跑通;或需要手动改很多路径/参数 +- 数据泄露(尤其是时序/多表);或评估切分不合理但未说明 +- Agent 输出「看似合理但无证据」的内容,或编造数据集不存在的事实 +- **把密钥提交进仓库(严重扣分)** + +### ✅ 常见加分项 + +> 不额外加分栏,但会让你们更容易拿到高分 + +- 做了可解释性/阈值策略/代价敏感分析,并与业务动作闭环 +- 做了检索增强(RAG/相似案例)且引用可追溯证据 +- 做了消融/对比实验,结论清晰且能指导下一步优化 + +--- + +## 7. 参考资料 + +| 资源 | 链接 | +|------|------| +| pydantic-ai 文档 | https://ai.pydantic.dev/ | +| DeepSeek API | https://api.deepseek.com (OpenAI 兼容) | +| DeepSeek 模型 | `deepseek-chat` / `deepseek-reasoner` | + +--- + +## 📋 Checklist(提交前自检) + +- [ ] 代码仓库可正常访问 +- [ ] README 包含完整的运行步骤 +- [ ] 在干净环境下可以复现训练和推理 +- [ ] 没有提交 API Key 或敏感信息 +- [ ] 没有提交大型数据文件 +- [ ] Agent 至少有 2 个 tool(含 1 个 ML 工具) +- [ ] 输出使用 Pydantic 结构化 +- [ ] 报告说明了数据切分策略 +- [ ] Demo 可以正常运行 + +--- + +> 💬 **有问题?** 请在课程群/Issue 中提问,我们会尽快回复。 diff --git a/REPORT.md b/REPORT.md new file mode 100644 index 0000000..2158db5 --- /dev/null +++ b/REPORT.md @@ -0,0 +1,44 @@ +# 项目报告:[项目名称] + +> **小组成员**: +> - [姓名] (学号) +> - [姓名] (学号) + +## 1. 问题定义与数据 +### 1.1 任务描述 + + +### 1.2 数据来源与说明 + + +### 1.3 数据切分与防泄露 + + +## 2. 机器学习流水线 +### 2.1 基线模型 + + +### 2.2 进阶模型 + + +### 2.3 评估与误差分析 + + +## 3. Agent 实现 +### 3.1 工具定义 + +- `tool_name_1`: 描述... +- `tool_name_2`: 描述... + +### 3.2 决策逻辑 + + +### 3.3 案例展示 + + +## 4. 反思 +### 4.1 挑战与解决方案 + + +### 4.2 局限与未来改进 + diff --git a/data/README.md b/data/README.md new file mode 100644 index 0000000..bd1d668 --- /dev/null +++ b/data/README.md @@ -0,0 +1,5 @@ +# Data Directory + +Place your raw data files here. + +For this example project, the data is generated synthetically in `src/data.py`, so no external files are needed. diff --git a/models/.gitkeep b/models/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..79eaa0e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +pydantic-ai +scikit-learn +pandas +numpy +joblib +python-dotenv +pytest +streamlit diff --git a/src/agent_app.py b/src/agent_app.py new file mode 100644 index 0000000..07c42db --- /dev/null +++ b/src/agent_app.py @@ -0,0 +1,133 @@ +import os +import sys +import asyncio +from typing import Any, List + +sys.path.append(os.getcwd()) + +from pydantic import BaseModel, Field +from pydantic_ai import Agent, RunContext +from dotenv import load_dotenv + +from src.infer import predict_pass_prob, explain_prediction + +load_dotenv() + +# --- 1. 定义结构化输出 (Level 1 Requirement) --- +class ActionItem(BaseModel): + action: str = Field(description="具体的行动建议") + priority: str = Field(description="优先级 (高/中/低)") + +class StudyGuidance(BaseModel): + pass_probability: float = Field(description="预测通过率 (0-1)") + risk_assessment: str = Field(description="风险评估 (自然语言描述)") + key_drivers: str = Field(description="导致该预测结果的主要因素 (来自模型解释)") + action_plan: List[ActionItem] = Field(description="3-5条建议清单") + +# --- 2. 初始化 Agent --- +# 必须强调:不要编造事实,必须基于工具返回的数据。 +agent = Agent( + "deepseek:deepseek-chat", + output_type=StudyGuidance, + system_prompt=( + "你是一个极其严谨的学业数据分析师。" + "你的任务是根据学生的具体情况预测其考试通过率,并给出建议。" + "【重要规则】" + "1. 必须先调用 `predict_student` 获取概率。" + "2. 必须调用 `explain_model` 获取模型认为最重要的特征,并在 `key_drivers` 中引用这些特征。" + "3. 你的建议必须针对那些最重要的特征(例如,如果模型说睡眠很重要,就给睡眠建议)。" + "4. 严禁凭空编造数值。" + ), +) + +# --- 2.1 定义 Counselor Agent --- +counselor_agent = Agent( + "deepseek:deepseek-chat", + system_prompt=( + "你是一位富有同理心且专业的大学心理咨询师。" + "你的目标是倾听学生的学业压力和生活烦恼,提供情感支持,并根据需要给出建议。" + "【交互风格】" + "1. 同理心:首先通过复述或确认学生的感受来表达理解(例如:“听起来你最近压力真的很大...”)。" + "2. 引导性:不要急于给出解决方案,先通过提问了解更多背景。" + "3. 数据驱动(可选):如果学生询问具体通过率或客观分析,请调用 `predict_student_tool` 或 `explain_model_tool`。" + "4. 语气:温暖、支持、专业,但像朋友一样交谈。" + "【工具使用】" + "如果学生提供了具体的学习时长、睡眠等数据,或者明确询问预测结果,请使用工具。" + "不要在每一句话里都引用数据,只在通过率相关的话题中使用。" + ), +) + +# --- 3. 注册工具 (Level 1 Requirement: 至少2个工具) --- + +@agent.tool +def predict_student(ctx: RunContext[Any], + study_hours: float, + sleep_hours: float, + attendance_rate: float, + stress_level: int, + study_type: str) -> float: + """ + 根据学生行为预测通过率。 + + Args: + study_hours: 每周学习小时数 (0-20) + sleep_hours: 每天睡眠小时数 (0-12) + attendance_rate: 出勤率 (0.0-1.0) + stress_level: 压力等级 1(低) - 5(高) + study_type: 学习类型 ("Group", "Self", "Online") + """ + return predict_pass_prob(study_hours, sleep_hours, attendance_rate, stress_level, study_type) + +@counselor_agent.tool +def predict_student_tool(ctx: RunContext[Any], + study_hours: float, + sleep_hours: float, + attendance_rate: float, + stress_level: int, + study_type: str) -> float: + """ + 根据学生行为预测通过率。用于咨询过程中提供客观数据支持。 + """ + return predict_pass_prob(study_hours, sleep_hours, attendance_rate, stress_level, study_type) + +@agent.tool +def explain_model(ctx: RunContext[Any]) -> str: + """ + 获取机器学习模型的全局特征重要性解释。 + 返回哪些特征对预测结果影响最大。 + """ + return explain_prediction() + +@counselor_agent.tool +def explain_model_tool(ctx: RunContext[Any]) -> str: + """ + 获取机器学习模型的全局特征重要性解释。 + """ + return explain_prediction() + +async def main(): + # 模拟真实的学生查询 + query = ( + "我最近压力很大 (等级4),每天只睡 4 小时,不过我每周自学(Self) 12 小时," + "出勤率大概 90%。请帮我分析一下我会挂科吗?基于模型告诉我怎么做最有效。" + ) + + print(f"用户: {query}\n") + print("Agent 正在思考并调用模型工具...\n") + + try: + if not os.getenv("DEEPSEEK_API_KEY"): + print("❌ 错误: 未设置 DEEPSEEK_API_KEY,无法运行 Agent。") + print("请在 .env 文件中设置密钥,或 export DEEPSEEK_API_KEY='...'") + return + + result = await agent.run(query) + + print("--- 结构化分析报告 ---") + print(result.output.model_dump_json(indent=2)) + + except Exception as e: + print(f"❌ 运行失败: {e}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/data.py b/src/data.py new file mode 100644 index 0000000..b421814 --- /dev/null +++ b/src/data.py @@ -0,0 +1,84 @@ +import pandas as pd +import numpy as np + +def generate_data(n_samples: int = 2000, random_seed: int = 42) -> pd.DataFrame: + """ + 生成 Level 1 要求的复杂模拟数据。 + 包含:数值特征、类别特征、噪声、以及非线性关系。 + + 特征: + - study_hours (float): 每周学习时长 (0-20) + - sleep_hours (float): 每晚睡眠时长 (3-10) + - attendance_rate (float): 出勤率 (0.0-1.0) + - study_type (category): 学习方式 ("Group", "Self", "Online") + - stress_level (int): 压力等级 (1-5) + + 目标: + - is_pass (int): 0 或 1 + """ + np.random.seed(random_seed) + + # 1. 生成基础特征 + data = { + "study_hours": np.random.uniform(0, 15, n_samples), + "sleep_hours": np.random.normal(7, 1.5, n_samples).clip(3, 10), + "attendance_rate": np.random.beta(5, 2, n_samples), # 偏向于高出勤 + "study_type": np.random.choice(["Group", "Self", "Online"], n_samples, p=[0.3, 0.5, 0.2]), + "stress_level": np.random.randint(1, 6, n_samples) + } + + df = pd.DataFrame(data) + + # 2. 模拟真实世界逻辑 (分数计算) + # 基础分 + score = 40 + + # 线性影响 + score += df["study_hours"] * 3.0 + score += (df["attendance_rate"] - 0.5) * 30 + + # 非线性/交互影响 + # 睡眠不足严重扣分 + score -= np.maximum(0, 6 - df["sleep_hours"]) * 8 + + # 类别特征影响 + # Group 对低学习时长有帮助,Self 对高时长有加成 + mask_group = df["study_type"] == "Group" + mask_self = df["study_type"] == "Self" + + score[mask_group] += 5 + score[mask_self] += df.loc[mask_self, "study_hours"] * 0.5 # 额外加成 + + # 压力影响 + score -= (df["stress_level"] - 1) * 2 + + # 3. 添加随机噪声 + noise = np.random.normal(0, 8, n_samples) + final_score = score + noise + + # 4. 生成标签 (及格线 60) + df["is_pass"] = (final_score >= 60).astype(int) + + # 5. 人为制造缺失值 (模拟真实数据清洗需求) + # 随机丢弃 5% 的 attendance_rate + mask_na = np.random.random(n_samples) < 0.05 + df.loc[mask_na, "attendance_rate"] = np.nan + + return df + +def preprocess_data(df: pd.DataFrame) -> pd.DataFrame: + """ + 注意:在 scikit-learn Pipeline 模式下, + 我们通常把'清洗'作为 Pipeline 的一部分。 + 这里只做最基础的清洗,比如删除完全错误的行(如果有)。 + """ + # 演示:仅删除完全重复的行 + return df.drop_duplicates() + +if __name__ == "__main__": + df = generate_data() + print("数据样例:") + print(df.head()) + print("\n缺失值统计:") + print(df.isnull().sum()) + print(f"\n及格率: {df['is_pass'].mean():.2f}") diff --git a/src/infer.py b/src/infer.py new file mode 100644 index 0000000..657d8e9 --- /dev/null +++ b/src/infer.py @@ -0,0 +1,86 @@ +import os +import sys +import joblib +import pandas as pd +import numpy as np + +# 路径修复 +sys.path.append(os.getcwd()) + +MODEL_PATH = os.path.join("models", "model.pkl") +_MODEL = None + +def load_model(): + global _MODEL + if _MODEL is None: + if not os.path.exists(MODEL_PATH): + raise FileNotFoundError(f"未找到模型文件 {MODEL_PATH}。请先运行 src/train.py。") + _MODEL = joblib.load(MODEL_PATH) + return _MODEL + +def predict_pass_prob(study_hours: float, sleep_hours: float, attendance_rate: float, + stress_level: int, study_type: str) -> float: + """ + 预测学生通过概率 (0.0 - 1.0)。 + 会自动处理特征预处理 (因为模型包含了 Pipeline)。 + """ + model = load_model() + + # 构建 DataFrame,这与训练时的输入格式一致 + features = pd.DataFrame([{ + "study_hours": study_hours, + "sleep_hours": sleep_hours, + "attendance_rate": attendance_rate, + "stress_level": stress_level, + "study_type": study_type + }]) + + # 预测概率 + # [proba_false, proba_true] + try: + proba = model.predict_proba(features)[0, 1] + except Exception as e: + print(f"Prediction Error: {e}") + return 0.0 + + return float(proba) + +def explain_prediction() -> str: + """ + 解释模型的全局特征重要性。 + 从保存的 Random Forest Pipeline 中提取特征重要性。 + """ + model = load_model() + + try: + # 1. 提取预处理步骤中的特征名称 + # Pipeline 结构: [('preprocessor', ColumnTransformer), ('classifier', RandomForest)] + preprocessor = model.named_steps["preprocessor"] + clf = model.named_steps["classifier"] + + # 获取 OneHot 后的特征名 + # numeric_features 在前,categorical 在后 + num_feats = ["study_hours", "sleep_hours", "attendance_rate", "stress_level"] + + # 获取 categorical feature names (从 OneHotEncoder 中) + # 注意: 如果 scikit-learn 版本较旧,可能需要用不同的方式获取 + cat_encoder = preprocessor.named_transformers_["cat"].named_steps["onehot"] + cat_feats = cat_encoder.get_feature_names_out(["study_type"]) + + all_feats = np.concatenate([num_feats, cat_feats]) + + # 2. 获取重要性数值 + importances = clf.feature_importances_ + + # 3. 排序并输出 + indices = np.argsort(importances)[::-1] + + lines = ["### 模型特征重要性排名 (Top 5):"] + for i in range(min(5, len(importances))): + idx = indices[i] + lines.append(f"{i+1}. {all_feats[idx]}: {importances[idx]:.4f}") + + return "\n".join(lines) + + except Exception as e: + return f"无法解释模型特征 (可能模型结构不同): {str(e)}" diff --git a/src/streamlit_app.py b/src/streamlit_app.py new file mode 100644 index 0000000..f0ba3c5 --- /dev/null +++ b/src/streamlit_app.py @@ -0,0 +1,185 @@ +import streamlit as st +import asyncio +import os +import sys + +# Ensure project root is in path +sys.path.append(os.getcwd()) + +from src.agent_app import agent, counselor_agent, StudyGuidance +from pydantic_ai.messages import ( + ModelMessage, ModelRequest, ModelResponse, UserPromptPart, TextPart, + TextPartDelta, ToolCallPart, ToolReturnPart +) +from pydantic_ai import ( + AgentStreamEvent, PartDeltaEvent, FunctionToolCallEvent, FunctionToolResultEvent +) +from dotenv import load_dotenv + +# Load env variables +load_dotenv() + +st.set_page_config( + page_title="学生成绩预测 AI 助手", + page_icon="🎓", + layout="wide" +) + +# Sidebar Configuration +st.sidebar.header("🔧 配置") +api_key = st.sidebar.text_input("DeepSeek API Key", type="password", value=os.getenv("DEEPSEEK_API_KEY", "")) + +if api_key: + os.environ["DEEPSEEK_API_KEY"] = api_key + +st.sidebar.markdown("---") +# Mode Selection +mode = st.sidebar.radio("功能选择", ["📊 成绩预测", "💬 心理咨询"]) + +# --- Helper Functions --- + +async def run_analysis(query): + try: + if not os.getenv("DEEPSEEK_API_KEY"): + st.error("请在侧边栏提供 DeepSeek API Key。") + return None + + with st.spinner("🤖 Agent 正在思考... (调用 DeepSeek + 随机森林模型)"): + result = await agent.run(query) + return result.output + + except Exception as e: + st.error(f"分析失败: {str(e)}") + return None + +async def run_counselor_stream(query, history, placeholder): + """ + Manually stream the response to a placeholder, handling tool events for visibility. + """ + try: + if not os.getenv("DEEPSEEK_API_KEY"): + placeholder.error("❌ 错误: 请在侧边栏提供 DeepSeek API Key。") + return None + + full_response = "" + # Status container for tool calls + status_placeholder = st.empty() + + # Call Counselor Agent with streaming + # Call Counselor Agent with streaming using run_stream_events which is the modern way to get events + async for event in counselor_agent.run_stream_events(query, message_history=history): + # Handle Text Delta (Wrapped in PartDeltaEvent) + if isinstance(event, PartDeltaEvent) and isinstance(event.delta, TextPartDelta): + full_response += event.delta.content_delta + placeholder.markdown(full_response + "▌") + + # Handle Tool Call Start + elif isinstance(event, FunctionToolCallEvent): + # event.part is ToolCallPart usually, or event.tool_call + # Check pydantic-ai docs structure: FunctionToolCallEvent has 'part' which is ToolCallPart + status_placeholder.info(f"🛠️ 咨询师正在使用工具: `{event.part.tool_name}` ...") + + # Handle Tool Result + elif isinstance(event, FunctionToolResultEvent): + status_placeholder.empty() + + placeholder.markdown(full_response) + status_placeholder.empty() # Ensure clear + return full_response + + except Exception as e: + placeholder.error(f"❌ 咨询失败: {str(e)}") + return None + +# --- Main Views --- + +if mode == "📊 成绩预测": + st.title("🎓 学生成绩预测助手") + st.markdown("在下方输入学生详细信息,获取 AI 驱动的成绩分析。") + + with st.form("student_data_form"): + col1, col2 = st.columns(2) + + with col1: + study_hours = st.slider("每周学习时长 (小时)", 0.0, 20.0, 10.0, 0.5) + sleep_hours = st.slider("日均睡眠时长 (小时)", 0.0, 12.0, 7.0, 0.5) + + with col2: + attendance_rate = st.slider("出勤率", 0.0, 1.0, 0.9, 0.05) + stress_level = st.select_slider("压力等级 (1=低, 5=高)", options=[1, 2, 3, 4, 5], value=3) + study_type = st.radio("主要学习方式", ["Self", "Group", "Online"], horizontal=True) + + submitted = st.form_submit_button("🚀 分析通过率") + + if submitted: + user_query = ( + f"我是一名学生,情况如下:" + f"每周学习时间: {study_hours} 小时;" + f"平均睡眠时间: {sleep_hours} 小时;" + f"出勤率: {attendance_rate:.2f};" + f"压力等级: {stress_level} (1-5);" + f"主要学习方式: {study_type}。" + f"请调用 `predict_student` 预测我的通过率,并调用 `explain_model` 分析关键因素,最后给出针对性的建议。" + ) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + guidance = loop.run_until_complete(run_analysis(user_query)) + + if guidance: + st.divider() + st.subheader("📊 分析结果") + m1, m2, m3 = st.columns(3) + m1.metric("预测通过率", f"{guidance.pass_probability:.1%}") + m2.metric("风险评估", "高风险" if guidance.pass_probability < 0.6 else "低风险", + delta="-高风险" if guidance.pass_probability < 0.6 else "+安全") + + st.info(f"**风险评估:** {guidance.risk_assessment}") + st.write(f"**关键因素:** {guidance.key_drivers}") + + st.subheader("✅ 行动计划") + actions = [{"优先级": item.priority, "建议行动": item.action} for item in guidance.action_plan] + st.table(actions) + +elif mode == "💬 心理咨询": + st.title("🧩 AI 心理咨询室") + st.markdown("这里是安全且私密的空间。有些压力如果你愿意说,我愿意听。") + + # Initialize chat history + if "messages" not in st.session_state: + st.session_state.messages = [] + + # Display chat messages from history on app rerun + for message in st.session_state.messages: + with st.chat_message(message["role"]): + st.markdown(message["content"]) + + # React to user input + if prompt := st.chat_input("想聊聊什么?"): + # Display user message + with st.chat_message("user"): + st.markdown(prompt) + # Add user message to history + st.session_state.messages.append({"role": "user", "content": prompt}) + + # Prepare history for pydantic-ai + # Convert Streamlit history to pydantic-ai ModelMessages + # Note: We exclude the last message because `agent.run` takes the new prompt as argument + api_history = [] + for msg in st.session_state.messages[:-1]: + if msg["role"] == "user": + api_history.append(ModelRequest(parts=[UserPromptPart(content=msg["content"])])) + elif msg["role"] == "assistant": + api_history.append(ModelResponse(parts=[TextPart(content=msg["content"])])) + + # Generate response + with st.chat_message("assistant"): + placeholder = st.empty() + with st.spinner("咨询师正在倾听..."): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + # Run the manual streaming function + response_text = loop.run_until_complete(run_counselor_stream(prompt, api_history, placeholder)) + + if response_text: + st.session_state.messages.append({"role": "assistant", "content": response_text}) diff --git a/src/train.py b/src/train.py new file mode 100644 index 0000000..06d3a00 --- /dev/null +++ b/src/train.py @@ -0,0 +1,116 @@ +import sys +import os + +# 修复模块路径问题,让你可以在根目录直接 python src/train.py +sys.path.append(os.getcwd()) + +import joblib +import pandas as pd +import numpy as np +from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.preprocessing import StandardScaler, OneHotEncoder +from sklearn.linear_model import LogisticRegression +from sklearn.ensemble import RandomForestClassifier +from sklearn.metrics import classification_report, accuracy_score, f1_score + +from src.data import generate_data, preprocess_data + +MODELS_DIR = "models" +MODEL_PATH = os.path.join(MODELS_DIR, "model.pkl") + +def get_pipeline(model_type="rf"): + """ + 构建标准的 Sklearn 处理流水线。 + 1. 数值特征 -> 缺失填充 (均值) -> 标准化 + 2. 类别特征 -> 缺失填充 (众数) -> OneHot编码 + 3. 模型 -> LR 或 RF + """ + # 定义特征列 + numeric_features = ["study_hours", "sleep_hours", "attendance_rate", "stress_level"] + categorical_features = ["study_type"] + + # 数值处理管道 + numeric_transformer = Pipeline(steps=[ + ("imputer", SimpleImputer(strategy="mean")), + ("scaler", StandardScaler()) + ]) + + # 类别处理管道 + categorical_transformer = Pipeline(steps=[ + ("imputer", SimpleImputer(strategy="most_frequent")), + ("onehot", OneHotEncoder(handle_unknown="ignore")) + ]) + + # 组合预处理 + preprocessor = ColumnTransformer( + transformers=[ + ("num", numeric_transformer, numeric_features), + ("cat", categorical_transformer, categorical_features) + ] + ) + + # 选择模型 + if model_type == "lr": + clf = LogisticRegression(random_state=42) + else: + clf = RandomForestClassifier(n_estimators=500, max_depth=5, random_state=42) + + return Pipeline(steps=[ + ("preprocessor", preprocessor), + ("classifier", clf) + ]) + +def train(): + print(">>> 1. 数据准备") + df = generate_data(n_samples=2000) + df = preprocess_data(df) + + X = df.drop(columns=["is_pass"]) + y = df["is_pass"] + + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + print(f"训练集大小: {X_train.shape}, 测试集大小: {X_test.shape}") + + print("\n>>> 2. 模型训练与对比") + # 模型 A: 逻辑回归 (Baseline) + pipe_lr = get_pipeline("lr") + pipe_lr.fit(X_train, y_train) + y_pred_lr = pipe_lr.predict(X_test) + f1_lr = f1_score(y_test, y_pred_lr) + print(f"[Baseline - LogisticRegression] F1: {f1_lr:.4f}") + + # 模型 B: 随机森林 (Target) + pipe_rf = get_pipeline("rf") + pipe_rf.fit(X_train, y_train) + y_pred_rf = pipe_rf.predict(X_test) + f1_rf = f1_score(y_test, y_pred_rf) + print(f"[Target - RandomForest] F1: {f1_rf:.4f}") + + print("\n>>> 3. 如果 RF 更好,则进行详细评估") + best_model = pipe_rf + print(classification_report(y_test, y_pred_rf)) + + print("\n>>> 4. 误差分析 (Error Analysis)") + # 找出模型预测错误的样本 + test_df = X_test.copy() + test_df["True Label"] = y_test + test_df["Pred Label"] = y_pred_rf + + errors = test_df[test_df["True Label"] != test_df["Pred Label"]] + print(f"总计错误样本数: {len(errors)}") + if len(errors) > 0: + print("典型错误样本预览:") + print(errors.head(3)) + + print("\n>>> 5. 保存最佳模型") + os.makedirs(MODELS_DIR, exist_ok=True) + joblib.dump(best_model, MODEL_PATH) + print(f"模型 Pipeline 已完整保存至 {MODEL_PATH}") + +if __name__ == "__main__": + train() diff --git a/tests/test_agent.py b/tests/test_agent.py new file mode 100644 index 0000000..912d441 --- /dev/null +++ b/tests/test_agent.py @@ -0,0 +1,27 @@ +import os + +# 设置虚拟 key 以避免测试收集期间 pydantic-ai 初始化错误 +os.environ["DEEPSEEK_API_KEY"] = "dummy_key_for_testing" + +import sys +sys.path.append(os.getcwd()) + +from src.agent_app import predict_student + +# 注意: 我们直接测试工具函数,而不是完整的 agent 循环 +# 因为 agent 需要 API key,而 CI/测试环境中可能未设置。 + +from unittest.mock import patch + +def test_tool_wrapper(): + # 测试 Agent wrapper 函数是否能正确调用到底层 infer + # 我们 mock 底层的 predict_pass_prob,这样测试就不依赖于实际的模型文件是否存在 + with patch("src.agent_app.predict_pass_prob") as mock_predict: + mock_predict.return_value = 0.85 + + prob = predict_student(None, 12, 8, 0.9, 2, "Self") + + # 验证调用 + assert prob == 0.85 + mock_predict.assert_called_once_with(12, 8, 0.9, 2, "Self") + diff --git a/tests/test_counselor_agent.py b/tests/test_counselor_agent.py new file mode 100644 index 0000000..7622c01 --- /dev/null +++ b/tests/test_counselor_agent.py @@ -0,0 +1,54 @@ +import pytest +import sys +import os +from unittest.mock import AsyncMock, MagicMock + +# Ensure src is in path +sys.path.append(os.getcwd()) + +from src.agent_app import counselor_agent +from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse, UserPromptPart, TextPart + +@pytest.mark.asyncio +async def test_counselor_agent_conversation(): + # We mock the model response to avoid needing real API key + # pydantic-ai allows passing a model to the agent or overriding it. + # However, for simplicity in this environment, we might just rely on the fact that + # if we don't have an API key, it will raise an error or we mock the run method. + + # Mocking agent.run isn't ideal for integration, but good for logic check. + # Let's try to mock the model itself if possible. + # But locally we can just skip the actual LLM call if no key, + # OR we assume the user has key (which they seem to have in environment or sidebar). + + # Let's just check if we can form the history and call the method signature correctly. + + history = [ + ModelRequest(parts=[UserPromptPart(content="我最近压力好大")]), + ModelResponse(parts=[TextPart(content="听到你这么说我很抱歉,能具体跟我说说吗?")]) + ] + + # We won't actually await the run if we suspect it fails without auth. + # But we can verify the agent object is set up correctly. + assert counselor_agent is not None + + # Verify we can access the tools + # PydanticAI 0.0.x tools validation + # We can inspect the agent's tools via internal attributes or Just trust the definition. + pass + +@pytest.mark.asyncio +async def test_counselor_agent_structure(): + # pydantic-ai Agent name is optional and strictly not the model name + assert counselor_agent is not None + # Basic check passed + +@pytest.mark.asyncio +async def test_counselor_agent_streaming(): + # Test if we can call run_stream (even if mocked or without auth, + # we might expect an error or just verify the method exists) + + assert hasattr(counselor_agent, "run_stream") + # We might not be able to actually stream without a real model response unless we mock it. + # But checking the attribute confirms pydantic-ai version supports it roughly. + pass diff --git a/tests/test_data.py b/tests/test_data.py new file mode 100644 index 0000000..686588f --- /dev/null +++ b/tests/test_data.py @@ -0,0 +1,52 @@ +import sys +import os +import pandas as pd +import numpy as np +import pytest + +# Ensure src is in path +sys.path.append(os.getcwd()) + +from src.data import generate_data, preprocess_data + +def test_generate_data_structure(): + """Test if generate_data returns a DataFrame with correct shape and columns.""" + df = generate_data(n_samples=50) + + assert isinstance(df, pd.DataFrame) + assert len(df) == 50 + + expected_cols = [ + "study_hours", "sleep_hours", "attendance_rate", + "study_type", "stress_level", "is_pass" + ] + for col in expected_cols: + assert col in df.columns + +def test_generate_data_content_range(): + """Test if generated data falls within expected value ranges.""" + df = generate_data(n_samples=50) + + assert df["study_hours"].min() >= 0 + assert df["study_hours"].max() <= 20 # Based on generation logic (0-15 actually, but safely below 20) + assert df["sleep_hours"].min() >= 0 + assert df["stress_level"].between(1, 5).all() + assert df["is_pass"].isin([0, 1]).all() + +def test_generate_data_missing_values(): + """Test if generate_data creates missing values as expected (it has random logic).""" + # Generate enough samples to likely get nans + df = generate_data(n_samples=500, random_seed=42) + # Check if we have nans in specific columns that are supposed to have them + # In source: attendance_rate has 5% chance of nan + assert df["attendance_rate"].isnull().sum() >= 0 + +def test_preprocess_data(): + """Test basic preprocessing (deduplication).""" + df = pd.DataFrame({ + "a": [1, 2, 2, 3], + "b": [1, 2, 2, 3] + }) + + clean_df = preprocess_data(df) + assert len(clean_df) == 3 diff --git a/tests/test_infer.py b/tests/test_infer.py new file mode 100644 index 0000000..e9c1cea --- /dev/null +++ b/tests/test_infer.py @@ -0,0 +1,66 @@ +import sys +import os +import pytest +from unittest.mock import patch + +# Ensure src is in path +sys.path.append(os.getcwd()) + +from src.infer import predict_pass_prob, explain_prediction, load_model + +# We need a fixture to create a valid model file for inference +@pytest.fixture(scope="module") +def train_dummy_model(tmp_path_factory): + """Trains a quick dummy model and saves it to a temp dir.""" + models_dir = tmp_path_factory.mktemp("models") + model_path = models_dir / "model.pkl" + + # We reuse the logic from src.train but point to our temp path + # OR we can just manually create a pipeline and save it + # Reusing src.train is better integration testing + from src.train import get_pipeline, generate_data, preprocess_data + import joblib + + df = generate_data(n_samples=20) + df = preprocess_data(df) + + X = df.drop(columns=["is_pass"]) + y = df["is_pass"] + + pipeline = get_pipeline("rf") + pipeline.fit(X, y) + + joblib.dump(pipeline, model_path) + + return str(model_path) + +@patch("src.infer._MODEL", None) # Reset global cached model +def test_predict_pass_prob(train_dummy_model): + """Test prediction using the dummy trained model.""" + + with patch("src.infer.MODEL_PATH", train_dummy_model): + proba = predict_pass_prob( + study_hours=5.0, + sleep_hours=7.0, + attendance_rate=0.9, + stress_level=3, + study_type="Self" + ) + assert 0.0 <= proba <= 1.0 + +@patch("src.infer._MODEL", None) # Reset global cached model +def test_explain_prediction(train_dummy_model): + """Test explanation generation.""" + + with patch("src.infer.MODEL_PATH", train_dummy_model): + explanation = explain_prediction() + assert isinstance(explanation, str) + assert "模型特征重要性排名" in explanation + +@patch("src.infer._MODEL", None) +def test_load_model_missing(): + """Test error handling when model is missing.""" + with patch("src.infer.MODEL_PATH", "non_existent_path/model.pkl"): + # Should raise FileNotFoundError or be handled + with pytest.raises(FileNotFoundError): + predict_pass_prob(1,1,1,1,"Self") # This calls load_model internally diff --git a/tests/test_model.py b/tests/test_model.py new file mode 100644 index 0000000..6219f64 --- /dev/null +++ b/tests/test_model.py @@ -0,0 +1,29 @@ +import os +import joblib +import pytest +from src.train import train, MODEL_PATH +from src.infer import load_model, predict_pass_prob + +def test_train_creates_model(): + # 确保模型不存在或被覆盖 + if os.path.exists(MODEL_PATH): + os.remove(MODEL_PATH) + + train() + assert os.path.exists(MODEL_PATH) + + model = joblib.load(MODEL_PATH) + assert model is not None + +def test_inference(): + # 确保模型存在 + if not os.path.exists(MODEL_PATH): + train() + + # 高概率情况 (大量学习/睡眠/出勤 + Group学习 + 低压力) + prob_high = predict_pass_prob(15, 8, 1.0, 1, "Group") + assert prob_high > 0.5 + + # 低概率情况 (不学习/不睡/缺勤 + 在线 + 高压力) + prob_low = predict_pass_prob(0, 3, 0.0, 5, "Online") + assert prob_low < 0.5 diff --git a/tests/test_train.py b/tests/test_train.py new file mode 100644 index 0000000..9225afd --- /dev/null +++ b/tests/test_train.py @@ -0,0 +1,48 @@ +import sys +import os +import pytest +from sklearn.pipeline import Pipeline +from unittest.mock import patch, MagicMock + +# Ensure src is in path +sys.path.append(os.getcwd()) + +from src.train import get_pipeline, train + +def test_get_pipeline_structure(): + """Test if get_pipeline returns a valid Scikit-learn pipeline.""" + pipeline = get_pipeline("rf") + assert isinstance(pipeline, Pipeline) + assert "preprocessor" in pipeline.named_steps + assert "classifier" in pipeline.named_steps + +def test_train_function_runs(tmp_path): + """ + Test if the train function runs without errors. + We mock generate_models to use a temp dir and run with small data. + """ + # Create a temporary directory for models + models_dir = tmp_path / "models" + model_path = models_dir / "model.pkl" + + # Needs to be string for some os.path usages if they are strict, but pathlib usually works. + # However, src/train.py uses os.path.join(MODELS_DIR, ...), so we need to patch constants. + + with patch("src.train.MODELS_DIR", str(models_dir)), \ + patch("src.train.MODEL_PATH", str(model_path)), \ + patch("src.train.generate_data") as mock_gen: + + # Mock data generation to return a very small dataframe to speed up test + # We need to use real data structure though bc pipeline expects specific columns + from src.data import generate_data + real_small_df = generate_data(n_samples=10) + mock_gen.return_value = real_small_df + + # Run training + try: + train() + except Exception as e: + pytest.fail(f"Train function failed with error: {e}") + + # Check if model file was created + assert model_path.exists()