From b4d4805cfa0a2689e232cd52105902dfeb5c3104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E4=BF=8A=E5=9D=87?= Date: Fri, 16 Jan 2026 04:18:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=20src/tweet=5Fdata.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/tweet_data.py | 315 ---------------------------------------------- 1 file changed, 315 deletions(-) delete mode 100644 src/tweet_data.py diff --git a/src/tweet_data.py b/src/tweet_data.py deleted file mode 100644 index 7d9237b..0000000 --- a/src/tweet_data.py +++ /dev/null @@ -1,315 +0,0 @@ -"""文本数据清洗模块 - -针对 Tweets.csv 航空情感分析数据集的文本清洗。 -遵循「克制」原则,仅进行必要的预处理,保留文本语义信息。 - -清洗策略: -1. 文本标准化:统一小写(不进行词形还原/词干提取,保留原始语义) -2. 去除噪声:移除用户提及(@username)、URL链接、多余空格 -3. 保留语义:保留表情符号、标点符号(它们对情感分析有价值) -4. 最小化处理:不进行停用词删除(否定词如"not"、"don't"对情感很重要) -""" - -import re -from pathlib import Path - -import pandera.polars as pa -import polars as pl - - -# --- Pandera Schema 定义 --- - - -class RawTweetSchema(pa.DataFrameModel): - """原始推文数据 Schema(清洗前校验) - - 允许缺失值存在,用于验证数据读取后的基本结构。 - """ - tweet_id: int = pa.Field(nullable=False) - airline_sentiment: str = pa.Field(nullable=True) - airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=True) - negativereason: str = pa.Field(nullable=True) - negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True) - airline: str = pa.Field(nullable=True) - text: str = pa.Field(nullable=True) - tweet_coord: str = pa.Field(nullable=True) - tweet_created: str = pa.Field(nullable=True) - tweet_location: str = pa.Field(nullable=True) - user_timezone: str = pa.Field(nullable=True) - - class Config: - strict = False - coerce = True - - -class CleanTweetSchema(pa.DataFrameModel): - """清洗后推文数据 Schema(严格模式) - - 不允许缺失值,强制约束检查。 - """ - tweet_id: int = pa.Field(nullable=False) - airline_sentiment: str = pa.Field(isin=["positive", "neutral", "negative"], nullable=False) - airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=False) - negativereason: str = pa.Field(nullable=True) - negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True) - airline: str = pa.Field(isin=["Virgin America", "United", "Southwest", "Delta", "US Airways", "American"], nullable=False) - text_cleaned: str = pa.Field(nullable=False) - text_original: str = pa.Field(nullable=False) - - class Config: - strict = True - coerce = True - - -# --- 文本清洗函数 --- - - -def clean_text(text: str) -> str: - """文本清洗函数(克制策略) - - 清洗原则: - - 移除:用户提及(@username)、URL链接、多余空格 - - 保留:表情符号、标点符号、否定词、原始大小写(后续统一小写) - - 不做:词形还原、词干提取、停用词删除 - - Args: - text: 原始文本 - - Returns: - str: 清洗后的文本 - """ - if not text or not isinstance(text, str): - return "" - - # 1. 移除用户提及 (@username) - text = re.sub(r'@\w+', '', text) - - # 2. 移除 URL 链接 - text = re.sub(r'http\S+|www\S+', '', text) - - # 3. 移除多余空格和换行 - text = re.sub(r'\s+', ' ', text).strip() - - return text - - -def normalize_text(text: str) -> str: - """文本标准化 - - 统一小写,但不进行词形还原或词干提取。 - - Args: - text: 清洗后的文本 - - Returns: - str: 标准化后的文本 - """ - if not text or not isinstance(text, str): - return "" - - # 仅统一小写 - return text.lower() - - -# --- 数据加载与预处理 --- - - -def load_tweets(file_path: str | Path = "Tweets.csv") -> pl.DataFrame: - """加载原始推文数据 - - Args: - file_path: CSV 文件路径 - - Returns: - pl.DataFrame: 原始推文数据 - """ - df = pl.read_csv(file_path) - return df - - -def validate_raw_tweets(df: pl.DataFrame) -> pl.DataFrame: - """验证原始推文数据结构(清洗前) - - Args: - df: 原始 Polars DataFrame - - Returns: - pl.DataFrame: 验证通过的 DataFrame - - Raises: - pa.errors.SchemaError: 验证失败 - """ - return RawTweetSchema.validate(df) - - -def validate_clean_tweets(df: pl.DataFrame) -> pl.DataFrame: - """验证清洗后推文数据(严格模式) - - Args: - df: 清洗后的 Polars DataFrame - - Returns: - pl.DataFrame: 验证通过的 DataFrame - - Raises: - pa.errors.SchemaError: 验证失败 - """ - return CleanTweetSchema.validate(df) - - -def preprocess_tweets( - df: pl.DataFrame, - validate: bool = True, - min_confidence: float = 0.5 -) -> pl.DataFrame: - """推文数据预处理流水线 - - 处理步骤: - 1. 筛选:仅保留情感置信度 >= min_confidence 的样本 - 2. 文本清洗:应用 clean_text 和 normalize_text - 3. 删除缺失值:删除 text 为空的样本 - 4. 删除重复行:基于 tweet_id 去重 - 5. 可选:进行 Schema 校验 - - Args: - df: 原始 Polars DataFrame - validate: 是否进行清洗后 Schema 校验 - min_confidence: 最低情感置信度阈值 - - Returns: - pl.DataFrame: 清洗后的 DataFrame - """ - # 1. 筛选高置信度样本 - df_filtered = df.filter( - pl.col("airline_sentiment_confidence") >= min_confidence - ) - - # 2. 文本清洗和标准化 - df_clean = df_filtered.with_columns([ - pl.col("text").map_elements(clean_text, return_dtype=pl.String).alias("text_cleaned"), - pl.col("text").alias("text_original"), - ]) - - df_clean = df_clean.with_columns([ - pl.col("text_cleaned").map_elements(normalize_text, return_dtype=pl.String).alias("text_cleaned"), - ]) - - # 3. 删除缺失值(text_cleaned 为空或 airline_sentiment 为空) - df_clean = df_clean.filter( - (pl.col("text_cleaned").is_not_null()) & - (pl.col("text_cleaned") != "") & - (pl.col("airline_sentiment").is_not_null()) - ) - - # 4. 删除重复行(基于 tweet_id) - df_clean = df_clean.unique(subset=["tweet_id"], keep="first") - - # 5. 选择需要的列 - df_clean = df_clean.select([ - "tweet_id", - "airline_sentiment", - "airline_sentiment_confidence", - "negativereason", - "negativereason_confidence", - "airline", - "text_cleaned", - "text_original", - ]) - - # 6. 可选校验 - if validate: - df_clean = validate_clean_tweets(df_clean) - - return df_clean - - -def save_cleaned_tweets(df: pl.DataFrame, output_path: str | Path = "data/Tweets_cleaned.csv") -> None: - """保存清洗后的数据 - - Args: - df: 清洗后的 Polars DataFrame - output_path: 输出文件路径 - """ - output_path = Path(output_path) - output_path.parent.mkdir(parents=True, exist_ok=True) - df.write_csv(output_path) - print(f"清洗后的数据已保存至 {output_path}") - - -def load_cleaned_tweets(file_path: str | Path = "data/Tweets_cleaned.csv") -> pl.DataFrame: - """加载清洗后的推文数据 - - Args: - file_path: 清洗后的 CSV 文件路径 - - Returns: - pl.DataFrame: 清洗后的推文数据 - """ - df = pl.read_csv(file_path) - return df - - -# --- 数据统计与分析 --- - - -def print_data_summary(df: pl.DataFrame, title: str = "数据统计") -> None: - """打印数据摘要信息 - - Args: - df: Polars DataFrame - title: 标题 - """ - print(f"\n{'='*60}") - print(f"{title}") - print(f"{'='*60}") - print(f"样本总数: {len(df)}") - print(f"\n情感分布:") - print(df.group_by("airline_sentiment").agg( - pl.len().alias("count"), - (pl.len() / len(df) * 100).alias("percentage") - ).sort("count", descending=True)) - - print(f"\n航空公司分布:") - print(df.group_by("airline").agg( - pl.len().alias("count"), - (pl.len() / len(df) * 100).alias("percentage") - ).sort("count", descending=True)) - - print(f"\n文本长度统计:") - df_with_length = df.with_columns([ - pl.col("text_cleaned").str.len_chars().alias("text_length") - ]) - print(df_with_length.select([ - pl.col("text_length").min().alias("最小长度"), - pl.col("text_length").max().alias("最大长度"), - pl.col("text_length").mean().alias("平均长度"), - pl.col("text_length").median().alias("中位数长度"), - ])) - - -if __name__ == "__main__": - print(">>> 1. 加载原始数据") - df_raw = load_tweets("Tweets.csv") - print(f"原始数据样本数: {len(df_raw)}") - print(df_raw.head(3)) - - print("\n>>> 2. 验证原始数据") - df_validated = validate_raw_tweets(df_raw) - print("✅ 原始数据验证通过") - - print("\n>>> 3. 清洗数据") - df_clean = preprocess_tweets(df_validated, validate=True, min_confidence=0.5) - print(f"清洗后样本数: {len(df_clean)} (原始: {len(df_raw)})") - print("✅ 清洗后数据验证通过") - - print("\n>>> 4. 保存清洗后的数据") - save_cleaned_tweets(df_clean, "data/Tweets_cleaned.csv") - - print("\n>>> 5. 数据统计") - print_data_summary(df_clean, "清洗后数据统计") - - print("\n>>> 6. 清洗示例对比") - print("\n原始文本:") - print(df_clean.select("text_original").head(3).to_pandas()["text_original"].to_string(index=False)) - print("\n清洗后文本:") - print(df_clean.select("text_cleaned").head(3).to_pandas()["text_cleaned"].to_string(index=False))