"""文本数据清洗模块 针对 Tweets.csv 航空情感分析数据集的文本清洗。 遵循「克制」原则,仅进行必要的预处理,保留文本语义信息。 清洗策略: 1. 文本标准化:统一小写(不进行词形还原/词干提取,保留原始语义) 2. 去除噪声:移除用户提及(@username)、URL链接、多余空格 3. 保留语义:保留表情符号、标点符号(它们对情感分析有价值) 4. 最小化处理:不进行停用词删除(否定词如"not"、"don't"对情感很重要) """ import re from pathlib import Path import pandera.polars as pa import polars as pl # --- Pandera Schema 定义 --- class RawTweetSchema(pa.DataFrameModel): """原始推文数据 Schema(清洗前校验) 允许缺失值存在,用于验证数据读取后的基本结构。 """ tweet_id: int = pa.Field(nullable=False) airline_sentiment: str = pa.Field(nullable=True) airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=True) negativereason: str = pa.Field(nullable=True) negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True) airline: str = pa.Field(nullable=True) text: str = pa.Field(nullable=True) tweet_coord: str = pa.Field(nullable=True) tweet_created: str = pa.Field(nullable=True) tweet_location: str = pa.Field(nullable=True) user_timezone: str = pa.Field(nullable=True) class Config: strict = False coerce = True class CleanTweetSchema(pa.DataFrameModel): """清洗后推文数据 Schema(严格模式) 不允许缺失值,强制约束检查。 """ tweet_id: int = pa.Field(nullable=False) airline_sentiment: str = pa.Field(isin=["positive", "neutral", "negative"], nullable=False) airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=False) negativereason: str = pa.Field(nullable=True) negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True) airline: str = pa.Field(isin=["Virgin America", "United", "Southwest", "Delta", "US Airways", "American"], nullable=False) text_cleaned: str = pa.Field(nullable=False) text_original: str = pa.Field(nullable=False) class Config: strict = True coerce = True # --- 文本清洗函数 --- def clean_text(text: str) -> str: """文本清洗函数(克制策略) 清洗原则: - 移除:用户提及(@username)、URL链接、多余空格 - 保留:表情符号、标点符号、否定词、原始大小写(后续统一小写) - 不做:词形还原、词干提取、停用词删除 Args: text: 原始文本 Returns: str: 清洗后的文本 """ if not text or not isinstance(text, str): return "" # 1. 移除用户提及 (@username) text = re.sub(r'@\w+', '', text) # 2. 移除 URL 链接 text = re.sub(r'http\S+|www\S+', '', text) # 3. 移除多余空格和换行 text = re.sub(r'\s+', ' ', text).strip() return text def normalize_text(text: str) -> str: """文本标准化 统一小写,但不进行词形还原或词干提取。 Args: text: 清洗后的文本 Returns: str: 标准化后的文本 """ if not text or not isinstance(text, str): return "" # 仅统一小写 return text.lower() # --- 数据加载与预处理 --- def load_tweets(file_path: str | Path = "Tweets.csv") -> pl.DataFrame: """加载原始推文数据 Args: file_path: CSV 文件路径 Returns: pl.DataFrame: 原始推文数据 """ df = pl.read_csv(file_path) return df def validate_raw_tweets(df: pl.DataFrame) -> pl.DataFrame: """验证原始推文数据结构(清洗前) Args: df: 原始 Polars DataFrame Returns: pl.DataFrame: 验证通过的 DataFrame Raises: pa.errors.SchemaError: 验证失败 """ return RawTweetSchema.validate(df) def validate_clean_tweets(df: pl.DataFrame) -> pl.DataFrame: """验证清洗后推文数据(严格模式) Args: df: 清洗后的 Polars DataFrame Returns: pl.DataFrame: 验证通过的 DataFrame Raises: pa.errors.SchemaError: 验证失败 """ return CleanTweetSchema.validate(df) def preprocess_tweets( df: pl.DataFrame, validate: bool = True, min_confidence: float = 0.5 ) -> pl.DataFrame: """推文数据预处理流水线 处理步骤: 1. 筛选:仅保留情感置信度 >= min_confidence 的样本 2. 文本清洗:应用 clean_text 和 normalize_text 3. 删除缺失值:删除 text 为空的样本 4. 删除重复行:基于 tweet_id 去重 5. 可选:进行 Schema 校验 Args: df: 原始 Polars DataFrame validate: 是否进行清洗后 Schema 校验 min_confidence: 最低情感置信度阈值 Returns: pl.DataFrame: 清洗后的 DataFrame """ # 1. 筛选高置信度样本 df_filtered = df.filter( pl.col("airline_sentiment_confidence") >= min_confidence ) # 2. 文本清洗和标准化 df_clean = df_filtered.with_columns([ pl.col("text").map_elements(clean_text, return_dtype=pl.String).alias("text_cleaned"), pl.col("text").alias("text_original"), ]) df_clean = df_clean.with_columns([ pl.col("text_cleaned").map_elements(normalize_text, return_dtype=pl.String).alias("text_cleaned"), ]) # 3. 删除缺失值(text_cleaned 为空或 airline_sentiment 为空) df_clean = df_clean.filter( (pl.col("text_cleaned").is_not_null()) & (pl.col("text_cleaned") != "") & (pl.col("airline_sentiment").is_not_null()) ) # 4. 删除重复行(基于 tweet_id) df_clean = df_clean.unique(subset=["tweet_id"], keep="first") # 5. 选择需要的列 df_clean = df_clean.select([ "tweet_id", "airline_sentiment", "airline_sentiment_confidence", "negativereason", "negativereason_confidence", "airline", "text_cleaned", "text_original", ]) # 6. 可选校验 if validate: df_clean = validate_clean_tweets(df_clean) return df_clean def save_cleaned_tweets(df: pl.DataFrame, output_path: str | Path = "data/Tweets_cleaned.csv") -> None: """保存清洗后的数据 Args: df: 清洗后的 Polars DataFrame output_path: 输出文件路径 """ output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) df.write_csv(output_path) print(f"清洗后的数据已保存至 {output_path}") def load_cleaned_tweets(file_path: str | Path = "data/Tweets_cleaned.csv") -> pl.DataFrame: """加载清洗后的推文数据 Args: file_path: 清洗后的 CSV 文件路径 Returns: pl.DataFrame: 清洗后的推文数据 """ df = pl.read_csv(file_path) return df # --- 数据统计与分析 --- def print_data_summary(df: pl.DataFrame, title: str = "数据统计") -> None: """打印数据摘要信息 Args: df: Polars DataFrame title: 标题 """ print(f"\n{'='*60}") print(f"{title}") print(f"{'='*60}") print(f"样本总数: {len(df)}") print(f"\n情感分布:") print(df.group_by("airline_sentiment").agg( pl.len().alias("count"), (pl.len() / len(df) * 100).alias("percentage") ).sort("count", descending=True)) print(f"\n航空公司分布:") print(df.group_by("airline").agg( pl.len().alias("count"), (pl.len() / len(df) * 100).alias("percentage") ).sort("count", descending=True)) print(f"\n文本长度统计:") df_with_length = df.with_columns([ pl.col("text_cleaned").str.len_chars().alias("text_length") ]) print(df_with_length.select([ pl.col("text_length").min().alias("最小长度"), pl.col("text_length").max().alias("最大长度"), pl.col("text_length").mean().alias("平均长度"), pl.col("text_length").median().alias("中位数长度"), ])) if __name__ == "__main__": print(">>> 1. 加载原始数据") df_raw = load_tweets("Tweets.csv") print(f"原始数据样本数: {len(df_raw)}") print(df_raw.head(3)) print("\n>>> 2. 验证原始数据") df_validated = validate_raw_tweets(df_raw) print("✅ 原始数据验证通过") print("\n>>> 3. 清洗数据") df_clean = preprocess_tweets(df_validated, validate=True, min_confidence=0.5) print(f"清洗后样本数: {len(df_clean)} (原始: {len(df_raw)})") print("✅ 清洗后数据验证通过") print("\n>>> 4. 保存清洗后的数据") save_cleaned_tweets(df_clean, "data/Tweets_cleaned.csv") print("\n>>> 5. 数据统计") print_data_summary(df_clean, "清洗后数据统计") print("\n>>> 6. 清洗示例对比") print("\n原始文本:") print(df_clean.select("text_original").head(3).to_pandas()["text_original"].to_string(index=False)) print("\n清洗后文本:") print(df_clean.select("text_cleaned").head(3).to_pandas()["text_cleaned"].to_string(index=False))