""" 📊 AI 数据分析师 ================== 一个基于 DeepSeek API + Streamlit 的智能数据分析应用 功能特性: - 📁 上传 CSV 数据文件 - 📈 自动生成可视化图表 - 🤖 AI 智能分析数据特征 - 💬 自然语言问答数据 - 📝 生成分析报告 技术栈: - Streamlit: Web 界面 - Pandas: 数据处理 - Altair: 数据可视化 - DeepSeek API: AI 分析 """ import streamlit as st import pandas as pd import altair as alt import json from openai import OpenAI import os from dotenv import load_dotenv from io import StringIO # 加载环境变量 load_dotenv() # ============== 配置 ============== API_KEY = os.getenv("DEEPSEEK_API_KEY", "") BASE_URL = "https://api.deepseek.com" MODEL = "deepseek-chat" # ============== 页面配置 ============== st.set_page_config( page_title="AI 数据分析师", page_icon="📊", layout="wide", initial_sidebar_state="expanded" ) # ============== 自定义样式 ============== st.markdown(""" """, unsafe_allow_html=True) # ============== 辅助函数 ============== def get_data_summary(df: pd.DataFrame) -> str: """生成数据摘要文本供 AI 分析""" summary = [] # 基本信息 summary.append(f"数据集包含 {len(df)} 行和 {len(df.columns)} 列。") summary.append(f"\n列名:{', '.join(df.columns.tolist())}") # 数据类型 summary.append(f"\n\n各列数据类型:") for col in df.columns: dtype = str(df[col].dtype) null_count = df[col].isnull().sum() summary.append(f"- {col}: {dtype}, 缺失值: {null_count}") # 数值列统计 numeric_cols = df.select_dtypes(include=['number']).columns.tolist() if numeric_cols: summary.append(f"\n\n数值列统计摘要:") stats = df[numeric_cols].describe().to_string() summary.append(stats) # 分类列信息 cat_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() if cat_cols: summary.append(f"\n\n分类列信息:") for col in cat_cols[:5]: # 最多显示5个 unique_count = df[col].nunique() top_values = df[col].value_counts().head(5).to_dict() summary.append(f"- {col}: {unique_count} 个唯一值, 前5个: {top_values}") # 样本数据 summary.append(f"\n\n前3行数据样本:") summary.append(df.head(3).to_string()) return "\n".join(summary) def analyze_with_ai(client: OpenAI, data_summary: str, analysis_type: str = "general") -> str: """使用 AI 分析数据""" prompts = { "general": """你是一个专业的数据分析师。请根据以下数据摘要,提供全面的数据分析洞察。 请从以下几个方面进行分析: 1. 📊 数据概况:数据的整体情况和质量评估 2. 🔍 关键发现:数据中的重要模式和特征 3. 📈 建议可视化:推荐适合这个数据集的图表类型 4. ⚠️ 注意事项:数据中可能存在的问题或需要注意的地方 5. 💡 进一步分析建议:可以深入探索的方向 请用清晰的结构和通俗易懂的语言回答。""", "correlation": """你是一个专业的数据分析师。请分析以下数据中各变量之间可能存在的相关性和关联。 请重点分析: 1. 哪些变量之间可能存在正相关或负相关? 2. 是否存在明显的因果关系假设? 3. 建议做哪些相关性分析? 请用通俗易懂的语言解释。""", "anomaly": """你是一个专业的数据分析师。请分析以下数据中可能存在的异常值或异常模式。 请关注: 1. 数值是否有明显的异常值? 2. 数据分布是否有异常? 3. 是否存在数据质量问题? 4. 建议如何处理这些异常? 请给出具体的分析和建议。""" } system_prompt = prompts.get(analysis_type, prompts["general"]) response = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"数据摘要:\n\n{data_summary}"} ], max_tokens=2000, temperature=0.7 ) return response.choices[0].message.content def ask_data_question(client: OpenAI, data_summary: str, question: str, chat_history: list) -> str: """根据数据回答用户问题""" system_prompt = """你是一个专业的数据分析助手。用户会就数据集提出问题,你需要根据提供的数据摘要来回答。 回答要求: 1. 基于数据事实回答,不要编造数据中没有的信息 2. 如果数据摘要中没有足够信息回答问题,请诚实说明 3. 尽可能提供具体的数字和分析 4. 可以给出进一步分析的建议""" messages = [{"role": "system", "content": system_prompt}] messages.append({"role": "user", "content": f"数据摘要:\n{data_summary}\n\n请记住这个数据摘要,我接下来会问你关于这个数据的问题。"}) messages.append({"role": "assistant", "content": "好的,我已经了解了这个数据集的情况。请问您想了解什么?"}) # 添加对话历史 for msg in chat_history: messages.append(msg) # 添加当前问题 messages.append({"role": "user", "content": question}) response = client.chat.completions.create( model=MODEL, messages=messages, max_tokens=1500, temperature=0.7 ) return response.choices[0].message.content def generate_report(client: OpenAI, data_summary: str, analyses: dict) -> str: """生成完整的分析报告""" system_prompt = """你是一个专业的数据分析师,请根据数据摘要和已有的分析结果,生成一份完整的数据分析报告。 报告格式要求: 1. 使用 Markdown 格式 2. 包含标题、摘要、详细分析、结论和建议 3. 结构清晰,层次分明 4. 语言专业但易懂 5. 适当使用 emoji 增加可读性""" user_content = f"""数据摘要: {data_summary} 已有分析结果: {json.dumps(analyses, ensure_ascii=False, indent=2)} 请生成一份完整的数据分析报告。""" response = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], max_tokens=3000, temperature=0.7 ) return response.choices[0].message.content def create_visualizations(df: pd.DataFrame): """根据数据自动生成可视化图表""" charts = [] numeric_cols = df.select_dtypes(include=['number']).columns.tolist() cat_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() # 1. 数值分布直方图(第一个数值列) if numeric_cols: col = numeric_cols[0] hist_chart = alt.Chart(df).mark_bar( opacity=0.7, color='#667eea' ).encode( alt.X(f'{col}:Q', bin=alt.Bin(maxbins=30), title=col), alt.Y('count()', title='频数'), tooltip=[alt.Tooltip(f'{col}:Q', bin=alt.Bin(maxbins=30)), 'count()'] ).properties( title=f'📊 {col} 分布直方图', height=300 ) charts.append(('distribution', hist_chart, f'{col} 的数值分布')) # 2. 分类计数柱状图(第一个分类列) if cat_cols: col = cat_cols[0] if df[col].nunique() <= 20: # 类别不要太多 bar_chart = alt.Chart(df).mark_bar( color='#764ba2' ).encode( x=alt.X(f'{col}:N', sort='-y', title=col), y=alt.Y('count()', title='计数'), tooltip=[col, 'count()'] ).properties( title=f'📈 {col} 分类计数', height=300 ) charts.append(('category', bar_chart, f'{col} 各类别的计数')) # 3. 散点图(如果有多个数值列) if len(numeric_cols) >= 2: col1, col2 = numeric_cols[0], numeric_cols[1] color_col = cat_cols[0] if cat_cols and df[cat_cols[0]].nunique() <= 10 else None scatter_encoding = { 'x': alt.X(f'{col1}:Q', title=col1), 'y': alt.Y(f'{col2}:Q', title=col2), 'tooltip': [col1, col2] } if color_col: scatter_encoding['color'] = alt.Color(f'{color_col}:N', title=color_col) scatter_encoding['tooltip'].append(color_col) scatter_chart = alt.Chart(df).mark_circle( size=60, opacity=0.6 ).encode( **scatter_encoding ).properties( title=f'🔵 {col1} vs {col2} 散点图', height=350 ).interactive() charts.append(('scatter', scatter_chart, f'{col1} 与 {col2} 的关系')) # 4. 箱线图(数值列按分类) if numeric_cols and cat_cols: num_col = numeric_cols[0] cat_col = cat_cols[0] if df[cat_col].nunique() <= 10: box_chart = alt.Chart(df).mark_boxplot( color='#11998e' ).encode( x=alt.X(f'{cat_col}:N', title=cat_col), y=alt.Y(f'{num_col}:Q', title=num_col), color=alt.Color(f'{cat_col}:N', legend=None) ).properties( title=f'📦 {num_col} 按 {cat_col} 分组箱线图', height=300 ) charts.append(('boxplot', box_chart, f'{num_col} 在不同 {cat_col} 下的分布')) # 5. 相关性热力图(如果有多个数值列) if len(numeric_cols) >= 3: corr_df = df[numeric_cols].corr().reset_index().melt(id_vars='index') corr_df.columns = ['var1', 'var2', 'correlation'] heatmap = alt.Chart(corr_df).mark_rect().encode( x=alt.X('var1:N', title=''), y=alt.Y('var2:N', title=''), color=alt.Color('correlation:Q', scale=alt.Scale(scheme='redblue', domain=[-1, 1]), title='相关系数'), tooltip=['var1', 'var2', alt.Tooltip('correlation:Q', format='.2f')] ).properties( title='🔥 相关性热力图', height=300 ) charts.append(('heatmap', heatmap, '各数值变量间的相关性')) return charts # ============== 初始化 Session State ============== if "df" not in st.session_state: st.session_state.df = None if "data_summary" not in st.session_state: st.session_state.data_summary = None if "analyses" not in st.session_state: st.session_state.analyses = {} if "qa_history" not in st.session_state: st.session_state.qa_history = [] # ============== 侧边栏 ============== with st.sidebar: st.markdown("## 🔧 设置") # API Key 输入 api_key_input = st.text_input( "DeepSeek API Key", value=API_KEY, type="password", help="输入你的 DeepSeek API Key" ) if api_key_input: API_KEY = api_key_input st.divider() # 文件上传 st.markdown("## 📁 数据上传") uploaded_file = st.file_uploader( "上传 CSV 文件", type=['csv'], help="支持 CSV 格式的数据文件" ) # 或使用示例数据 use_sample = st.checkbox("使用示例数据", value=False) if use_sample: # 生成示例数据 import numpy as np np.random.seed(42) sample_df = pd.DataFrame({ '姓名': [f'员工{i}' for i in range(1, 101)], '部门': np.random.choice(['技术部', '市场部', '财务部', '人事部', '运营部'], 100), '年龄': np.random.randint(22, 55, 100), '工龄': np.random.randint(1, 20, 100), '月薪': np.random.randint(8000, 50000, 100), '绩效评分': np.round(np.random.uniform(60, 100, 100), 1), '满意度': np.round(np.random.uniform(1, 5, 100), 1) }) st.session_state.df = sample_df st.session_state.data_summary = get_data_summary(sample_df) st.success("✅ 已加载示例数据") elif uploaded_file is not None: try: df = pd.read_csv(uploaded_file) st.session_state.df = df st.session_state.data_summary = get_data_summary(df) st.session_state.analyses = {} # 重置分析结果 st.session_state.qa_history = [] # 重置问答历史 st.success(f"✅ 成功加载 {len(df)} 行数据") except Exception as e: st.error(f"❌ 文件加载失败: {str(e)}") # 显示数据信息 if st.session_state.df is not None: st.divider() st.markdown("### 📊 数据概览") df = st.session_state.df st.markdown(f"- **行数**: {len(df)}") st.markdown(f"- **列数**: {len(df.columns)}") st.markdown(f"- **数值列**: {len(df.select_dtypes(include=['number']).columns)}") st.markdown(f"- **分类列**: {len(df.select_dtypes(include=['object', 'category']).columns)}") # ============== 主界面 ============== st.markdown('
数据行数
数据列数
缺失值数量
数值列数量