import polars as pl import pandera as pa from pandera import Column, Check, DataFrameSchema import logging # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ========================================== # 1. 定义 Pandera Schema (数据契约) # ========================================== # 原始数据 Schema raw_schema = DataFrameSchema({ "age": Column(int, checks=Check.ge(18)), "job": Column(str), "marital": Column(str), "education": Column(str), "default": Column(str, checks=Check.isin(["yes", "no"])), "balance": Column(int), "housing": Column(str, checks=Check.isin(["yes", "no"])), "loan": Column(str, checks=Check.isin(["yes", "no"])), "contact": Column(str), "day": Column(int, checks=[Check.ge(1), Check.le(31)]), "month": Column(str), "duration": Column(int, checks=Check.ge(0)), "campaign": Column(int, checks=Check.ge(1)), "pdays": Column(int), "previous": Column(int, checks=Check.ge(0)), "poutcome": Column(str), "deposit": Column(str, checks=Check.isin(["yes", "no"])), }) # 清洗后 Schema processed_schema = DataFrameSchema({ "age": Column(int), "balance": Column(int), "deposit": Column(int, checks=Check.isin([0, 1])), # 其他数值化或保留的特征... }) def load_and_clean_data(file_path: str): """ 使用 Polars 加载并清洗数据 """ logger.info(f"正在加载数据: {file_path}") # 1. Lazy Load lf = pl.scan_csv(file_path) # 2. 初步清洗计划 # - 移除 duration (避免数据泄露) # - 将 deposit (yes/no) 转换为 (1/0) # - 简单的分类变量编码 (为了 LightGBM,我们可以保留分类类型或做 Label Encoding) # LightGBM 原生支持 Category,但 sklearn 需要数值。 # 为了通用性,这里做 Label Encoding 或者 One-Hot。 # 但 Polars 的 Label Encoding 比较手动。 # 我们这里先只做核心转换。 processed_lf = ( lf.drop(["duration"]) # 移除泄露特征 .with_columns([ pl.col("deposit").replace({"yes": 1, "no": 0}).cast(pl.Int64).alias("target"), # 简单的特征工程示例:将 pdays -1 处理为 999 或单独一类 (这里保持原样,树模型能处理) ]) .drop("deposit") # 移除原始标签列,保留 target ) # 3. 执行计算 (Collect) df = processed_lf.collect() logger.info(f"数据加载完成,形状: {df.shape}") # 4. Pandera 验证 (转换回 Pandas 验证,因为 Pandera 对 Polars 支持尚在实验阶段或部分支持) # 这里我们验证关键字段 try: # 简单验证一下 target 是否只有 0 和 1 assert df["target"].n_unique() <= 2 logger.info("基础数据验证通过") except Exception as e: logger.error(f"数据验证失败: {e}") raise e return df if __name__ == "__main__": # 测试代码 try: df = load_and_clean_data("data/bank.csv") print(df.head()) except Exception as e: print(f"Error: {e}")