Pandas Bootcamp
首页特训项目关于开始学习
01
电商销售数据质检审计
入门
02
基于购物车的复购行为与商品关联预测
入门
03
RFM 模型与用户价值聚类分层
进阶
04
AIGC 训练语料库去偏与去噪
进阶
05
游戏玩家行为序列与留存漏斗分析
进阶
06
金融信贷反欺诈特征工程
高级
07
IoT 传感器时序数据异常检测
高级
08
多源数据融合与主数据治理
进阶
09
AI 数据质量监控与血缘追踪
高级
10
AI 辅助数据自动化分析与故事化报告
高级
返回首页
PROJECT 09高级

AI 数据质量监控与血缘追踪

AI Data Quality Monitoring

构建数据 Schema 校验、自定义校验函数与元数据记录的质量监控体系。

Schema 校验自定义校验器分布漂移检测元数据记录数据血缘

项目背景

某 AI 团队的数据管道每天处理 TB 级数据,但数据质量问题(schema 变更、空值激增、分布漂移)频繁导致模型性能下降。需要建立自动化质量监控与血缘追踪系统。

模拟数据集

batch_id,table_name,column_name,record_count,null_count,null_pct,mean_value,std_value,min_value,max_value,ingestion_time
B20240701,users,id,100000,0,0.0,50000.0,28867.5,1,100000,2024-07-01 02:00:00
B20240701,users,age,100000,150,0.0015,35.2,12.3,18,80,2024-07-01 02:00:00
B20240701,users,income,100000,2000,0.02,58000.0,25000.0,0,500000,2024-07-01 02:00:00
B20240702,users,id,100500,0,0.0,50250.0,29000.0,1,100500,2024-07-02 02:00:00
B20240702,users,age,100500,5000,0.0498,34.8,15.1,18,80,2024-07-02 02:00:00
B20240702,users,income,100500,15000,0.1493,45000.0,30000.0,0,500000,2024-07-02 02:00:00

代码练习区

在下方编辑器中编写你的 Pandas 代码。可记录笔记、编写伪代码,参考答案在下方。

pandas_exercise.py
Loading...

参考答案

reference_solution.py
import pandas as pd
import numpy as np
from datetime import datetime

df = pd.read_csv('quality_metrics.csv')
df['ingestion_time'] = pd.to_datetime(df['ingestion_time'])

# 1. Schema 校验
expected_schema = {
    'users': {
        'id': {'dtype': 'int64', 'nullable': False, 'min': 1},
        'age': {'dtype': 'float64', 'nullable': True, 'min': 18, 'max': 100},
        'income': {'dtype': 'float64', 'nullable': True, 'min': 0}
    }
}

def validate_schema(df, schema):
    violations = []
    for col, rules in schema.items():
        if col in df.columns:
            if not rules['nullable'] and df[col].isnull().any():
                violations.append(f"{col}: 发现空值")
            if 'min' in rules and df[col].min() < rules['min']:
                violations.append(f"{col}: 最小值 {df[col].min()} < {rules['min']}")
            if 'max' in rules and df[col].max() > rules['max']:
                violations.append(f"{col}: 最大值 {df[col].max()} > {rules['max']}")
    return violations

# 2. 空值率突变检测
df['null_pct_change'] = df.groupby('column_name')['null_pct'].pct_change()
null_spikes = df[df['null_pct_change'] > 5]  # 空值率增长超过 5 倍

# 3. 分布漂移检测 (PSI - Population Stability Index)
def calculate_psi(expected, actual, bins=10):
    breakpoints = np.linspace(0, 1, bins + 1)
    expected_percents = np.histogram(expected, bins=breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, bins=breakpoints)[0] / len(actual)
    expected_percents[expected_percents == 0] = 0.0001
    actual_percents[actual_percents == 0] = 0.0001
    psi = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents))
    return psi

# 4. 血缘追踪记录
lineage_log = {
    'batch_id': 'B20240702',
    'source_tables': ['raw_users', 'stg_events'],
    'transformations': ['filter', 'join', 'aggregate'],
    'output_table': 'users',
    'quality_score': 0.95,
    'validated_at': datetime.now().isoformat()
}

业务解读

数据质量监控是 ML Pipeline 的基础设施。Schema 校验在数据入口设置 '护栏',空值率突变可能意味着上游数据源故障。PSI (群体稳定性指数) 是检测分布漂移的标准方法:PSI < 0.1 表示稳定,0.1-0.25 表示轻微漂移,> 0.25 表示显著漂移需要重新训练模型。