删除 src/tweet_data.py
This commit is contained in:
parent
5c519dda05
commit
a5660ec9a1
@ -1,315 +0,0 @@
|
||||
"""文本数据清洗模块
|
||||
|
||||
针对 Tweets.csv 航空情感分析数据集的文本清洗。
|
||||
遵循「克制」原则,仅进行必要的预处理,保留文本语义信息。
|
||||
|
||||
清洗策略:
|
||||
1. 文本标准化:统一小写(不进行词形还原/词干提取,保留原始语义)
|
||||
2. 去除噪声:移除用户提及(@username)、URL链接、多余空格
|
||||
3. 保留语义:保留表情符号、标点符号(它们对情感分析有价值)
|
||||
4. 最小化处理:不进行停用词删除(否定词如"not"、"don't"对情感很重要)
|
||||
"""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import pandera.polars as pa
|
||||
import polars as pl
|
||||
|
||||
|
||||
# --- Pandera Schema 定义 ---
|
||||
|
||||
|
||||
class RawTweetSchema(pa.DataFrameModel):
|
||||
"""原始推文数据 Schema(清洗前校验)
|
||||
|
||||
允许缺失值存在,用于验证数据读取后的基本结构。
|
||||
"""
|
||||
tweet_id: int = pa.Field(nullable=False)
|
||||
airline_sentiment: str = pa.Field(nullable=True)
|
||||
airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=True)
|
||||
negativereason: str = pa.Field(nullable=True)
|
||||
negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True)
|
||||
airline: str = pa.Field(nullable=True)
|
||||
text: str = pa.Field(nullable=True)
|
||||
tweet_coord: str = pa.Field(nullable=True)
|
||||
tweet_created: str = pa.Field(nullable=True)
|
||||
tweet_location: str = pa.Field(nullable=True)
|
||||
user_timezone: str = pa.Field(nullable=True)
|
||||
|
||||
class Config:
|
||||
strict = False
|
||||
coerce = True
|
||||
|
||||
|
||||
class CleanTweetSchema(pa.DataFrameModel):
|
||||
"""清洗后推文数据 Schema(严格模式)
|
||||
|
||||
不允许缺失值,强制约束检查。
|
||||
"""
|
||||
tweet_id: int = pa.Field(nullable=False)
|
||||
airline_sentiment: str = pa.Field(isin=["positive", "neutral", "negative"], nullable=False)
|
||||
airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=False)
|
||||
negativereason: str = pa.Field(nullable=True)
|
||||
negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True)
|
||||
airline: str = pa.Field(isin=["Virgin America", "United", "Southwest", "Delta", "US Airways", "American"], nullable=False)
|
||||
text_cleaned: str = pa.Field(nullable=False)
|
||||
text_original: str = pa.Field(nullable=False)
|
||||
|
||||
class Config:
|
||||
strict = True
|
||||
coerce = True
|
||||
|
||||
|
||||
# --- 文本清洗函数 ---
|
||||
|
||||
|
||||
def clean_text(text: str) -> str:
|
||||
"""文本清洗函数(克制策略)
|
||||
|
||||
清洗原则:
|
||||
- 移除:用户提及(@username)、URL链接、多余空格
|
||||
- 保留:表情符号、标点符号、否定词、原始大小写(后续统一小写)
|
||||
- 不做:词形还原、词干提取、停用词删除
|
||||
|
||||
Args:
|
||||
text: 原始文本
|
||||
|
||||
Returns:
|
||||
str: 清洗后的文本
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return ""
|
||||
|
||||
# 1. 移除用户提及 (@username)
|
||||
text = re.sub(r'@\w+', '', text)
|
||||
|
||||
# 2. 移除 URL 链接
|
||||
text = re.sub(r'http\S+|www\S+', '', text)
|
||||
|
||||
# 3. 移除多余空格和换行
|
||||
text = re.sub(r'\s+', ' ', text).strip()
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def normalize_text(text: str) -> str:
|
||||
"""文本标准化
|
||||
|
||||
统一小写,但不进行词形还原或词干提取。
|
||||
|
||||
Args:
|
||||
text: 清洗后的文本
|
||||
|
||||
Returns:
|
||||
str: 标准化后的文本
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return ""
|
||||
|
||||
# 仅统一小写
|
||||
return text.lower()
|
||||
|
||||
|
||||
# --- 数据加载与预处理 ---
|
||||
|
||||
|
||||
def load_tweets(file_path: str | Path = "Tweets.csv") -> pl.DataFrame:
|
||||
"""加载原始推文数据
|
||||
|
||||
Args:
|
||||
file_path: CSV 文件路径
|
||||
|
||||
Returns:
|
||||
pl.DataFrame: 原始推文数据
|
||||
"""
|
||||
df = pl.read_csv(file_path)
|
||||
return df
|
||||
|
||||
|
||||
def validate_raw_tweets(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""验证原始推文数据结构(清洗前)
|
||||
|
||||
Args:
|
||||
df: 原始 Polars DataFrame
|
||||
|
||||
Returns:
|
||||
pl.DataFrame: 验证通过的 DataFrame
|
||||
|
||||
Raises:
|
||||
pa.errors.SchemaError: 验证失败
|
||||
"""
|
||||
return RawTweetSchema.validate(df)
|
||||
|
||||
|
||||
def validate_clean_tweets(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""验证清洗后推文数据(严格模式)
|
||||
|
||||
Args:
|
||||
df: 清洗后的 Polars DataFrame
|
||||
|
||||
Returns:
|
||||
pl.DataFrame: 验证通过的 DataFrame
|
||||
|
||||
Raises:
|
||||
pa.errors.SchemaError: 验证失败
|
||||
"""
|
||||
return CleanTweetSchema.validate(df)
|
||||
|
||||
|
||||
def preprocess_tweets(
|
||||
df: pl.DataFrame,
|
||||
validate: bool = True,
|
||||
min_confidence: float = 0.5
|
||||
) -> pl.DataFrame:
|
||||
"""推文数据预处理流水线
|
||||
|
||||
处理步骤:
|
||||
1. 筛选:仅保留情感置信度 >= min_confidence 的样本
|
||||
2. 文本清洗:应用 clean_text 和 normalize_text
|
||||
3. 删除缺失值:删除 text 为空的样本
|
||||
4. 删除重复行:基于 tweet_id 去重
|
||||
5. 可选:进行 Schema 校验
|
||||
|
||||
Args:
|
||||
df: 原始 Polars DataFrame
|
||||
validate: 是否进行清洗后 Schema 校验
|
||||
min_confidence: 最低情感置信度阈值
|
||||
|
||||
Returns:
|
||||
pl.DataFrame: 清洗后的 DataFrame
|
||||
"""
|
||||
# 1. 筛选高置信度样本
|
||||
df_filtered = df.filter(
|
||||
pl.col("airline_sentiment_confidence") >= min_confidence
|
||||
)
|
||||
|
||||
# 2. 文本清洗和标准化
|
||||
df_clean = df_filtered.with_columns([
|
||||
pl.col("text").map_elements(clean_text, return_dtype=pl.String).alias("text_cleaned"),
|
||||
pl.col("text").alias("text_original"),
|
||||
])
|
||||
|
||||
df_clean = df_clean.with_columns([
|
||||
pl.col("text_cleaned").map_elements(normalize_text, return_dtype=pl.String).alias("text_cleaned"),
|
||||
])
|
||||
|
||||
# 3. 删除缺失值(text_cleaned 为空或 airline_sentiment 为空)
|
||||
df_clean = df_clean.filter(
|
||||
(pl.col("text_cleaned").is_not_null()) &
|
||||
(pl.col("text_cleaned") != "") &
|
||||
(pl.col("airline_sentiment").is_not_null())
|
||||
)
|
||||
|
||||
# 4. 删除重复行(基于 tweet_id)
|
||||
df_clean = df_clean.unique(subset=["tweet_id"], keep="first")
|
||||
|
||||
# 5. 选择需要的列
|
||||
df_clean = df_clean.select([
|
||||
"tweet_id",
|
||||
"airline_sentiment",
|
||||
"airline_sentiment_confidence",
|
||||
"negativereason",
|
||||
"negativereason_confidence",
|
||||
"airline",
|
||||
"text_cleaned",
|
||||
"text_original",
|
||||
])
|
||||
|
||||
# 6. 可选校验
|
||||
if validate:
|
||||
df_clean = validate_clean_tweets(df_clean)
|
||||
|
||||
return df_clean
|
||||
|
||||
|
||||
def save_cleaned_tweets(df: pl.DataFrame, output_path: str | Path = "data/Tweets_cleaned.csv") -> None:
|
||||
"""保存清洗后的数据
|
||||
|
||||
Args:
|
||||
df: 清洗后的 Polars DataFrame
|
||||
output_path: 输出文件路径
|
||||
"""
|
||||
output_path = Path(output_path)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
df.write_csv(output_path)
|
||||
print(f"清洗后的数据已保存至 {output_path}")
|
||||
|
||||
|
||||
def load_cleaned_tweets(file_path: str | Path = "data/Tweets_cleaned.csv") -> pl.DataFrame:
|
||||
"""加载清洗后的推文数据
|
||||
|
||||
Args:
|
||||
file_path: 清洗后的 CSV 文件路径
|
||||
|
||||
Returns:
|
||||
pl.DataFrame: 清洗后的推文数据
|
||||
"""
|
||||
df = pl.read_csv(file_path)
|
||||
return df
|
||||
|
||||
|
||||
# --- 数据统计与分析 ---
|
||||
|
||||
|
||||
def print_data_summary(df: pl.DataFrame, title: str = "数据统计") -> None:
|
||||
"""打印数据摘要信息
|
||||
|
||||
Args:
|
||||
df: Polars DataFrame
|
||||
title: 标题
|
||||
"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"{title}")
|
||||
print(f"{'='*60}")
|
||||
print(f"样本总数: {len(df)}")
|
||||
print(f"\n情感分布:")
|
||||
print(df.group_by("airline_sentiment").agg(
|
||||
pl.len().alias("count"),
|
||||
(pl.len() / len(df) * 100).alias("percentage")
|
||||
).sort("count", descending=True))
|
||||
|
||||
print(f"\n航空公司分布:")
|
||||
print(df.group_by("airline").agg(
|
||||
pl.len().alias("count"),
|
||||
(pl.len() / len(df) * 100).alias("percentage")
|
||||
).sort("count", descending=True))
|
||||
|
||||
print(f"\n文本长度统计:")
|
||||
df_with_length = df.with_columns([
|
||||
pl.col("text_cleaned").str.len_chars().alias("text_length")
|
||||
])
|
||||
print(df_with_length.select([
|
||||
pl.col("text_length").min().alias("最小长度"),
|
||||
pl.col("text_length").max().alias("最大长度"),
|
||||
pl.col("text_length").mean().alias("平均长度"),
|
||||
pl.col("text_length").median().alias("中位数长度"),
|
||||
]))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(">>> 1. 加载原始数据")
|
||||
df_raw = load_tweets("Tweets.csv")
|
||||
print(f"原始数据样本数: {len(df_raw)}")
|
||||
print(df_raw.head(3))
|
||||
|
||||
print("\n>>> 2. 验证原始数据")
|
||||
df_validated = validate_raw_tweets(df_raw)
|
||||
print("✅ 原始数据验证通过")
|
||||
|
||||
print("\n>>> 3. 清洗数据")
|
||||
df_clean = preprocess_tweets(df_validated, validate=True, min_confidence=0.5)
|
||||
print(f"清洗后样本数: {len(df_clean)} (原始: {len(df_raw)})")
|
||||
print("✅ 清洗后数据验证通过")
|
||||
|
||||
print("\n>>> 4. 保存清洗后的数据")
|
||||
save_cleaned_tweets(df_clean, "data/Tweets_cleaned.csv")
|
||||
|
||||
print("\n>>> 5. 数据统计")
|
||||
print_data_summary(df_clean, "清洗后数据统计")
|
||||
|
||||
print("\n>>> 6. 清洗示例对比")
|
||||
print("\n原始文本:")
|
||||
print(df_clean.select("text_original").head(3).to_pandas()["text_original"].to_string(index=False))
|
||||
print("\n清洗后文本:")
|
||||
print(df_clean.select("text_cleaned").head(3).to_pandas()["text_cleaned"].to_string(index=False))
|
||||
Loading…
Reference in New Issue
Block a user