PROJECT 09高级
AI 数据质量监控与血缘追踪
AI Data Quality Monitoring
构建数据 Schema 校验、自定义校验函数与元数据记录的质量监控体系。
Schema 校验自定义校验器分布漂移检测元数据记录数据血缘
项目背景
某 AI 团队的数据管道每天处理 TB 级数据,但数据质量问题(schema 变更、空值激增、分布漂移)频繁导致模型性能下降。需要建立自动化质量监控与血缘追踪系统。
模拟数据集
batch_id,table_name,column_name,record_count,null_count,null_pct,mean_value,std_value,min_value,max_value,ingestion_time
B20240701,users,id,100000,0,0.0,50000.0,28867.5,1,100000,2024-07-01 02:00:00
B20240701,users,age,100000,150,0.0015,35.2,12.3,18,80,2024-07-01 02:00:00
B20240701,users,income,100000,2000,0.02,58000.0,25000.0,0,500000,2024-07-01 02:00:00
B20240702,users,id,100500,0,0.0,50250.0,29000.0,1,100500,2024-07-02 02:00:00
B20240702,users,age,100500,5000,0.0498,34.8,15.1,18,80,2024-07-02 02:00:00
B20240702,users,income,100500,15000,0.1493,45000.0,30000.0,0,500000,2024-07-02 02:00:00代码练习区
在下方编辑器中编写你的 Pandas 代码。可记录笔记、编写伪代码,参考答案在下方。
pandas_exercise.py
Loading...
参考答案
reference_solution.py
import pandas as pd
import numpy as np
from datetime import datetime
df = pd.read_csv('quality_metrics.csv')
df['ingestion_time'] = pd.to_datetime(df['ingestion_time'])
# 1. Schema 校验
expected_schema = {
'users': {
'id': {'dtype': 'int64', 'nullable': False, 'min': 1},
'age': {'dtype': 'float64', 'nullable': True, 'min': 18, 'max': 100},
'income': {'dtype': 'float64', 'nullable': True, 'min': 0}
}
}
def validate_schema(df, schema):
violations = []
for col, rules in schema.items():
if col in df.columns:
if not rules['nullable'] and df[col].isnull().any():
violations.append(f"{col}: 发现空值")
if 'min' in rules and df[col].min() < rules['min']:
violations.append(f"{col}: 最小值 {df[col].min()} < {rules['min']}")
if 'max' in rules and df[col].max() > rules['max']:
violations.append(f"{col}: 最大值 {df[col].max()} > {rules['max']}")
return violations
# 2. 空值率突变检测
df['null_pct_change'] = df.groupby('column_name')['null_pct'].pct_change()
null_spikes = df[df['null_pct_change'] > 5] # 空值率增长超过 5 倍
# 3. 分布漂移检测 (PSI - Population Stability Index)
def calculate_psi(expected, actual, bins=10):
breakpoints = np.linspace(0, 1, bins + 1)
expected_percents = np.histogram(expected, bins=breakpoints)[0] / len(expected)
actual_percents = np.histogram(actual, bins=breakpoints)[0] / len(actual)
expected_percents[expected_percents == 0] = 0.0001
actual_percents[actual_percents == 0] = 0.0001
psi = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents))
return psi
# 4. 血缘追踪记录
lineage_log = {
'batch_id': 'B20240702',
'source_tables': ['raw_users', 'stg_events'],
'transformations': ['filter', 'join', 'aggregate'],
'output_table': 'users',
'quality_score': 0.95,
'validated_at': datetime.now().isoformat()
}业务解读
数据质量监控是 ML Pipeline 的基础设施。Schema 校验在数据入口设置 '护栏',空值率突变可能意味着上游数据源故障。PSI (群体稳定性指数) 是检测分布漂移的标准方法:PSI < 0.1 表示稳定,0.1-0.25 表示轻微漂移,> 0.25 表示显著漂移需要重新训练模型。