CourseDesignTemplate/示例项目-AI数据分析师/app.py

736 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
📊 AI 数据分析师
==================
一个基于 DeepSeek API + Streamlit 的智能数据分析应用
功能特性:
- 📁 上传 CSV 数据文件
- 📈 自动生成可视化图表
- 🤖 AI 智能分析数据特征
- 💬 自然语言问答数据
- 📝 生成分析报告
技术栈:
- Streamlit: Web 界面
- Pandas: 数据处理
- Altair: 数据可视化
- DeepSeek API: AI 分析
"""
import streamlit as st
import pandas as pd
import altair as alt
import json
from openai import OpenAI
import os
from dotenv import load_dotenv
from io import StringIO
# 加载环境变量
load_dotenv()
# ============== 配置 ==============
API_KEY = os.getenv("DEEPSEEK_API_KEY", "")
BASE_URL = "https://api.deepseek.com"
MODEL = "deepseek-chat"
# ============== 页面配置 ==============
st.set_page_config(
page_title="AI 数据分析师",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
# ============== 自定义样式 ==============
st.markdown("""
<style>
/* 渐变标题 */
.main-title {
background: linear-gradient(120deg, #667eea 0%, #764ba2 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-size: 2.5rem;
font-weight: 800;
margin-bottom: 0;
}
/* 统计卡片 */
.stat-card {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 15px;
padding: 1.5rem;
color: white;
text-align: center;
box-shadow: 0 10px 30px rgba(102, 126, 234, 0.3);
}
.stat-card h2 {
font-size: 2rem;
margin: 0;
font-weight: 700;
}
.stat-card p {
margin: 0.5rem 0 0 0;
opacity: 0.9;
font-size: 0.9rem;
}
/* 洞察卡片 */
.insight-card {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
border-radius: 15px;
padding: 1.5rem;
color: white;
margin: 1rem 0;
box-shadow: 0 10px 30px rgba(245, 87, 108, 0.3);
}
/* 分析报告样式 */
.report-section {
background: rgba(102, 126, 234, 0.1);
border-left: 4px solid #667eea;
padding: 1rem 1.5rem;
margin: 1rem 0;
border-radius: 0 10px 10px 0;
}
/* 问答区域 */
.qa-section {
background: linear-gradient(135deg, #11998e 0%, #38ef7d 100%);
border-radius: 15px;
padding: 1.5rem;
color: white;
margin: 1rem 0;
}
/* 上传区域美化 */
.uploadedFile {
border: 2px dashed #667eea !important;
border-radius: 15px !important;
}
/* Tab 样式 */
.stTabs [data-baseweb="tab-list"] {
gap: 8px;
}
.stTabs [data-baseweb="tab"] {
border-radius: 10px;
padding: 10px 20px;
}
</style>
""", unsafe_allow_html=True)
# ============== 辅助函数 ==============
def get_data_summary(df: pd.DataFrame) -> str:
"""生成数据摘要文本供 AI 分析"""
summary = []
# 基本信息
summary.append(f"数据集包含 {len(df)} 行和 {len(df.columns)} 列。")
summary.append(f"\n列名:{', '.join(df.columns.tolist())}")
# 数据类型
summary.append(f"\n\n各列数据类型:")
for col in df.columns:
dtype = str(df[col].dtype)
null_count = df[col].isnull().sum()
summary.append(f"- {col}: {dtype}, 缺失值: {null_count}")
# 数值列统计
numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
if numeric_cols:
summary.append(f"\n\n数值列统计摘要:")
stats = df[numeric_cols].describe().to_string()
summary.append(stats)
# 分类列信息
cat_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
if cat_cols:
summary.append(f"\n\n分类列信息:")
for col in cat_cols[:5]: # 最多显示5个
unique_count = df[col].nunique()
top_values = df[col].value_counts().head(5).to_dict()
summary.append(f"- {col}: {unique_count} 个唯一值, 前5个: {top_values}")
# 样本数据
summary.append(f"\n\n前3行数据样本")
summary.append(df.head(3).to_string())
return "\n".join(summary)
def analyze_with_ai(client: OpenAI, data_summary: str, analysis_type: str = "general") -> str:
"""使用 AI 分析数据"""
prompts = {
"general": """你是一个专业的数据分析师。请根据以下数据摘要,提供全面的数据分析洞察。
请从以下几个方面进行分析:
1. 📊 数据概况:数据的整体情况和质量评估
2. 🔍 关键发现:数据中的重要模式和特征
3. 📈 建议可视化:推荐适合这个数据集的图表类型
4. ⚠️ 注意事项:数据中可能存在的问题或需要注意的地方
5. 💡 进一步分析建议:可以深入探索的方向
请用清晰的结构和通俗易懂的语言回答。""",
"correlation": """你是一个专业的数据分析师。请分析以下数据中各变量之间可能存在的相关性和关联。
请重点分析:
1. 哪些变量之间可能存在正相关或负相关?
2. 是否存在明显的因果关系假设?
3. 建议做哪些相关性分析?
请用通俗易懂的语言解释。""",
"anomaly": """你是一个专业的数据分析师。请分析以下数据中可能存在的异常值或异常模式。
请关注:
1. 数值是否有明显的异常值?
2. 数据分布是否有异常?
3. 是否存在数据质量问题?
4. 建议如何处理这些异常?
请给出具体的分析和建议。"""
}
system_prompt = prompts.get(analysis_type, prompts["general"])
response = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"数据摘要:\n\n{data_summary}"}
],
max_tokens=2000,
temperature=0.7
)
return response.choices[0].message.content
def ask_data_question(client: OpenAI, data_summary: str, question: str, chat_history: list) -> str:
"""根据数据回答用户问题"""
system_prompt = """你是一个专业的数据分析助手。用户会就数据集提出问题,你需要根据提供的数据摘要来回答。
回答要求:
1. 基于数据事实回答,不要编造数据中没有的信息
2. 如果数据摘要中没有足够信息回答问题,请诚实说明
3. 尽可能提供具体的数字和分析
4. 可以给出进一步分析的建议"""
messages = [{"role": "system", "content": system_prompt}]
messages.append({"role": "user", "content": f"数据摘要:\n{data_summary}\n\n请记住这个数据摘要,我接下来会问你关于这个数据的问题。"})
messages.append({"role": "assistant", "content": "好的,我已经了解了这个数据集的情况。请问您想了解什么?"})
# 添加对话历史
for msg in chat_history:
messages.append(msg)
# 添加当前问题
messages.append({"role": "user", "content": question})
response = client.chat.completions.create(
model=MODEL,
messages=messages,
max_tokens=1500,
temperature=0.7
)
return response.choices[0].message.content
def generate_report(client: OpenAI, data_summary: str, analyses: dict) -> str:
"""生成完整的分析报告"""
system_prompt = """你是一个专业的数据分析师,请根据数据摘要和已有的分析结果,生成一份完整的数据分析报告。
报告格式要求:
1. 使用 Markdown 格式
2. 包含标题、摘要、详细分析、结论和建议
3. 结构清晰,层次分明
4. 语言专业但易懂
5. 适当使用 emoji 增加可读性"""
user_content = f"""数据摘要:
{data_summary}
已有分析结果:
{json.dumps(analyses, ensure_ascii=False, indent=2)}
请生成一份完整的数据分析报告。"""
response = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
],
max_tokens=3000,
temperature=0.7
)
return response.choices[0].message.content
def create_visualizations(df: pd.DataFrame):
"""根据数据自动生成可视化图表"""
charts = []
numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
cat_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
# 1. 数值分布直方图(第一个数值列)
if numeric_cols:
col = numeric_cols[0]
hist_chart = alt.Chart(df).mark_bar(
opacity=0.7,
color='#667eea'
).encode(
alt.X(f'{col}:Q', bin=alt.Bin(maxbins=30), title=col),
alt.Y('count()', title='频数'),
tooltip=[alt.Tooltip(f'{col}:Q', bin=alt.Bin(maxbins=30)), 'count()']
).properties(
title=f'📊 {col} 分布直方图',
height=300
)
charts.append(('distribution', hist_chart, f'{col} 的数值分布'))
# 2. 分类计数柱状图(第一个分类列)
if cat_cols:
col = cat_cols[0]
if df[col].nunique() <= 20: # 类别不要太多
bar_chart = alt.Chart(df).mark_bar(
color='#764ba2'
).encode(
x=alt.X(f'{col}:N', sort='-y', title=col),
y=alt.Y('count()', title='计数'),
tooltip=[col, 'count()']
).properties(
title=f'📈 {col} 分类计数',
height=300
)
charts.append(('category', bar_chart, f'{col} 各类别的计数'))
# 3. 散点图(如果有多个数值列)
if len(numeric_cols) >= 2:
col1, col2 = numeric_cols[0], numeric_cols[1]
color_col = cat_cols[0] if cat_cols and df[cat_cols[0]].nunique() <= 10 else None
scatter_encoding = {
'x': alt.X(f'{col1}:Q', title=col1),
'y': alt.Y(f'{col2}:Q', title=col2),
'tooltip': [col1, col2]
}
if color_col:
scatter_encoding['color'] = alt.Color(f'{color_col}:N', title=color_col)
scatter_encoding['tooltip'].append(color_col)
scatter_chart = alt.Chart(df).mark_circle(
size=60,
opacity=0.6
).encode(
**scatter_encoding
).properties(
title=f'🔵 {col1} vs {col2} 散点图',
height=350
).interactive()
charts.append(('scatter', scatter_chart, f'{col1}{col2} 的关系'))
# 4. 箱线图(数值列按分类)
if numeric_cols and cat_cols:
num_col = numeric_cols[0]
cat_col = cat_cols[0]
if df[cat_col].nunique() <= 10:
box_chart = alt.Chart(df).mark_boxplot(
color='#11998e'
).encode(
x=alt.X(f'{cat_col}:N', title=cat_col),
y=alt.Y(f'{num_col}:Q', title=num_col),
color=alt.Color(f'{cat_col}:N', legend=None)
).properties(
title=f'📦 {num_col}{cat_col} 分组箱线图',
height=300
)
charts.append(('boxplot', box_chart, f'{num_col} 在不同 {cat_col} 下的分布'))
# 5. 相关性热力图(如果有多个数值列)
if len(numeric_cols) >= 3:
corr_df = df[numeric_cols].corr().reset_index().melt(id_vars='index')
corr_df.columns = ['var1', 'var2', 'correlation']
heatmap = alt.Chart(corr_df).mark_rect().encode(
x=alt.X('var1:N', title=''),
y=alt.Y('var2:N', title=''),
color=alt.Color('correlation:Q',
scale=alt.Scale(scheme='redblue', domain=[-1, 1]),
title='相关系数'),
tooltip=['var1', 'var2', alt.Tooltip('correlation:Q', format='.2f')]
).properties(
title='🔥 相关性热力图',
height=300
)
charts.append(('heatmap', heatmap, '各数值变量间的相关性'))
return charts
# ============== 初始化 Session State ==============
if "df" not in st.session_state:
st.session_state.df = None
if "data_summary" not in st.session_state:
st.session_state.data_summary = None
if "analyses" not in st.session_state:
st.session_state.analyses = {}
if "qa_history" not in st.session_state:
st.session_state.qa_history = []
# ============== 侧边栏 ==============
with st.sidebar:
st.markdown("## 🔧 设置")
# API Key 输入
api_key_input = st.text_input(
"DeepSeek API Key",
value=API_KEY,
type="password",
help="输入你的 DeepSeek API Key"
)
if api_key_input:
API_KEY = api_key_input
st.divider()
# 文件上传
st.markdown("## 📁 数据上传")
uploaded_file = st.file_uploader(
"上传 CSV 文件",
type=['csv'],
help="支持 CSV 格式的数据文件"
)
# 或使用示例数据
use_sample = st.checkbox("使用示例数据", value=False)
if use_sample:
# 生成示例数据
import numpy as np
np.random.seed(42)
sample_df = pd.DataFrame({
'姓名': [f'员工{i}' for i in range(1, 101)],
'部门': np.random.choice(['技术部', '市场部', '财务部', '人事部', '运营部'], 100),
'年龄': np.random.randint(22, 55, 100),
'工龄': np.random.randint(1, 20, 100),
'月薪': np.random.randint(8000, 50000, 100),
'绩效评分': np.round(np.random.uniform(60, 100, 100), 1),
'满意度': np.round(np.random.uniform(1, 5, 100), 1)
})
st.session_state.df = sample_df
st.session_state.data_summary = get_data_summary(sample_df)
st.success("✅ 已加载示例数据")
elif uploaded_file is not None:
try:
df = pd.read_csv(uploaded_file)
st.session_state.df = df
st.session_state.data_summary = get_data_summary(df)
st.session_state.analyses = {} # 重置分析结果
st.session_state.qa_history = [] # 重置问答历史
st.success(f"✅ 成功加载 {len(df)} 行数据")
except Exception as e:
st.error(f"❌ 文件加载失败: {str(e)}")
# 显示数据信息
if st.session_state.df is not None:
st.divider()
st.markdown("### 📊 数据概览")
df = st.session_state.df
st.markdown(f"- **行数**: {len(df)}")
st.markdown(f"- **列数**: {len(df.columns)}")
st.markdown(f"- **数值列**: {len(df.select_dtypes(include=['number']).columns)}")
st.markdown(f"- **分类列**: {len(df.select_dtypes(include=['object', 'category']).columns)}")
# ============== 主界面 ==============
st.markdown('<h1 class="main-title">📊 AI 数据分析师</h1>', unsafe_allow_html=True)
st.markdown("*上传数据,让 AI 帮你发现数据中的故事*")
# 检查 API Key
if not API_KEY:
st.warning("⚠️ 请在侧边栏输入你的 DeepSeek API Key")
st.stop()
# 检查数据
if st.session_state.df is None:
st.info("👈 请在侧边栏上传 CSV 文件或使用示例数据")
# 显示使用说明
with st.expander("📖 使用说明", expanded=True):
st.markdown("""
### 欢迎使用 AI 数据分析师!
**功能介绍:**
1. **📈 数据预览** - 查看数据基本信息和统计摘要
2. **📊 智能可视化** - 自动生成适合数据的图表
3. **🤖 AI 分析** - AI 深度分析数据特征和洞察
4. **💬 数据问答** - 用自然语言询问关于数据的问题
5. **📝 生成报告** - 一键生成完整的分析报告
**开始使用:**
1. 在左侧边栏输入你的 DeepSeek API Key
2. 上传 CSV 文件或勾选"使用示例数据"
3. 切换不同标签页探索各项功能
""")
st.stop()
# 创建 OpenAI 客户端
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
df = st.session_state.df
# 创建标签页
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"📈 数据预览",
"📊 智能可视化",
"🤖 AI 分析",
"💬 数据问答",
"📝 生成报告"
])
# ============== Tab 1: 数据预览 ==============
with tab1:
st.header("📈 数据预览")
# 统计卡片
col1, col2, col3, col4 = st.columns(4)
with col1:
st.markdown(f"""
<div class="stat-card">
<h2>{len(df)}</h2>
<p>数据行数</p>
</div>
""", unsafe_allow_html=True)
with col2:
st.markdown(f"""
<div class="stat-card">
<h2>{len(df.columns)}</h2>
<p>数据列数</p>
</div>
""", unsafe_allow_html=True)
with col3:
missing = df.isnull().sum().sum()
st.markdown(f"""
<div class="stat-card">
<h2>{missing}</h2>
<p>缺失值数量</p>
</div>
""", unsafe_allow_html=True)
with col4:
numeric_cols = len(df.select_dtypes(include=['number']).columns)
st.markdown(f"""
<div class="stat-card">
<h2>{numeric_cols}</h2>
<p>数值列数量</p>
</div>
""", unsafe_allow_html=True)
st.markdown("---")
# 数据表格
st.subheader("📋 数据表格")
st.dataframe(df, use_container_width=True, height=400)
# 统计摘要
col_left, col_right = st.columns(2)
with col_left:
st.subheader("📊 数值列统计")
numeric_df = df.select_dtypes(include=['number'])
if not numeric_df.empty:
st.dataframe(numeric_df.describe(), use_container_width=True)
else:
st.info("没有数值列")
with col_right:
st.subheader("📝 列信息")
info_df = pd.DataFrame({
'列名': df.columns,
'数据类型': df.dtypes.astype(str).values,
'非空值数': df.count().values,
'唯一值数': [df[col].nunique() for col in df.columns]
})
st.dataframe(info_df, use_container_width=True, hide_index=True)
# ============== Tab 2: 智能可视化 ==============
with tab2:
st.header("📊 智能可视化")
st.markdown("*根据数据特征自动生成适合的图表*")
charts = create_visualizations(df)
if not charts:
st.warning("数据列类型不足以生成可视化图表")
else:
for i, (chart_type, chart, description) in enumerate(charts):
st.markdown(f"**{description}**")
st.altair_chart(chart, use_container_width=True)
if i < len(charts) - 1:
st.divider()
# ============== Tab 3: AI 分析 ==============
with tab3:
st.header("🤖 AI 智能分析")
analysis_type = st.selectbox(
"选择分析类型",
options=[
("general", "🔍 综合分析 - 全面了解数据特征"),
("correlation", "🔗 相关性分析 - 探索变量间关系"),
("anomaly", "⚠️ 异常检测 - 发现数据问题")
],
format_func=lambda x: x[1]
)
if st.button("🚀 开始 AI 分析", type="primary", use_container_width=True):
with st.spinner("AI 正在分析数据,请稍候..."):
try:
result = analyze_with_ai(
client,
st.session_state.data_summary,
analysis_type[0]
)
st.session_state.analyses[analysis_type[0]] = result
st.success("✅ 分析完成!")
except Exception as e:
st.error(f"❌ 分析出错: {str(e)}")
# 显示分析结果
if analysis_type[0] in st.session_state.analyses:
st.markdown("### 📋 分析结果")
st.markdown(st.session_state.analyses[analysis_type[0]])
# 显示历史分析
other_analyses = {k: v for k, v in st.session_state.analyses.items() if k != analysis_type[0]}
if other_analyses:
st.divider()
with st.expander("📚 查看其他分析结果"):
for key, value in other_analyses.items():
type_names = {"general": "综合分析", "correlation": "相关性分析", "anomaly": "异常检测"}
st.markdown(f"#### {type_names.get(key, key)}")
st.markdown(value)
st.divider()
# ============== Tab 4: 数据问答 ==============
with tab4:
st.header("💬 数据问答")
st.markdown("*用自然语言询问关于数据的任何问题*")
# 示例问题
with st.expander("💡 示例问题"):
st.markdown("""
- 这个数据集的主要特征是什么?
- 哪个部门的平均薪资最高?
- 年龄和工资之间有什么关系?
- 数据中有哪些异常值?
- 如何提高员工满意度?
""")
# 显示对话历史
for msg in st.session_state.qa_history:
avatar = "👤" if msg["role"] == "user" else "🤖"
with st.chat_message(msg["role"], avatar=avatar):
st.markdown(msg["content"])
# 问题输入
if question := st.chat_input("输入你关于数据的问题..."):
# 显示用户问题
with st.chat_message("user", avatar="👤"):
st.markdown(question)
# AI 回答
with st.chat_message("assistant", avatar="🤖"):
with st.spinner("思考中..."):
try:
answer = ask_data_question(
client,
st.session_state.data_summary,
question,
st.session_state.qa_history
)
st.markdown(answer)
# 保存对话历史
st.session_state.qa_history.append({"role": "user", "content": question})
st.session_state.qa_history.append({"role": "assistant", "content": answer})
except Exception as e:
st.error(f"❌ 回答出错: {str(e)}")
# 清空对话按钮
if st.session_state.qa_history:
if st.button("🗑️ 清空对话历史"):
st.session_state.qa_history = []
st.rerun()
# ============== Tab 5: 生成报告 ==============
with tab5:
st.header("📝 数据分析报告")
if not st.session_state.analyses:
st.info("💡 请先在「AI 分析」标签页进行分析,然后再生成报告")
else:
st.markdown("*基于已有分析结果生成完整的数据分析报告*")
if st.button("📄 生成分析报告", type="primary", use_container_width=True):
with st.spinner("正在生成报告,请稍候..."):
try:
report = generate_report(
client,
st.session_state.data_summary,
st.session_state.analyses
)
st.session_state.report = report
st.success("✅ 报告生成完成!")
except Exception as e:
st.error(f"❌ 生成报告出错: {str(e)}")
# 显示报告
if "report" in st.session_state:
st.markdown("---")
st.markdown(st.session_state.report)
# 下载按钮
st.download_button(
label="📥 下载报告 (Markdown)",
data=st.session_state.report,
file_name="数据分析报告.md",
mime="text/markdown"
)
# ============== 页脚 ==============
st.divider()
st.caption("Made with ❤️ using DeepSeek API + Streamlit | Python 程序设计课程设计示例")