在数据分析工作中,80% 的时间花在数据清洗上。本文整理了一套基于 Pandas 的高效数据处理流水线,覆盖缺失值处理、类型转换、条件筛选、分组聚合、多表合并等常见场景。
1. 数据加载与概览
1 2 3 4 5 6 7 8 9 10 11 12
| import pandas as pd import numpy as np
df = pd.read_csv('sales_data.csv', encoding='utf-8')
print(df.shape) print(df.dtypes) print(df.describe()) print(df.isnull().sum())
|
2. 缺失值处理策略
不是所有缺失值都应该填 0 或均值,需要根据业务场景判断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| missing_ratio = df.isnull().sum() / len(df) print(missing_ratio[missing_ratio > 0])
threshold = 0.5 cols_to_drop = missing_ratio[missing_ratio > threshold].index df.drop(columns=cols_to_drop, inplace=True)
num_cols = df.select_dtypes(include=np.number).columns df[num_cols] = df[num_cols].fillna(df[num_cols].median())
cat_cols = df.select_dtypes(include='object').columns df[cat_cols] = df[cat_cols].fillna(df[cat_cols].mode().iloc[0])
|
3. 数据类型优化
数据类型的正确选择直接影响内存占用和计算效率。
1 2 3 4 5 6 7 8 9 10 11 12 13
| print(df.memory_usage(deep=True).sum() / 1024**2, 'MB')
for col in cat_cols: if df[col].nunique() / len(df) < 0.5: df[col] = df[col].astype('category')
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
df[num_cols] = df[num_cols].apply(pd.to_numeric, downcast='integer')
|
4. 条件筛选与数据转换
用链式操作避免中间变量,代码更清晰。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| result = (df .query('amount > 100 and status == "completed"') .assign( tax=lambda x: (x['amount'] * 0.13).round(2), total=lambda x: x['amount'] + x['tax'], order_month=lambda x: x['order_date'].dt.to_period('M') ) .sort_values('total', ascending=False) )
df['level'] = pd.cut( df['amount'], bins=[0, 500, 2000, 10000, np.inf], labels=['小额', '中额', '大额', '超大额'] )
|
5. 分组聚合与透视
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| summary = (df .groupby(['order_month', 'category'], observed=True) .agg( total_amount=('amount', 'sum'), avg_amount=('amount', 'mean'), order_count=('order_id', 'nunique'), customer_count=('customer_id', 'nunique') ) .reset_index() )
pivot = df.pivot_table( values='amount', index='order_month', columns='category', aggfunc='sum', fill_value=0 )
|
6. 多表合并
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| merged = pd.merge( orders_df, customers_df[['customer_id', 'name', 'region']], on='customer_id', how='left' )
all_data = pd.concat( [pd.read_csv(f'sales_{m}.csv') for m in months], ignore_index=True )
dedup = df.drop_duplicates(subset=['order_id'], keep='last')
|
7. 异常值检测
1 2 3 4 5 6 7 8 9 10 11 12 13
| Q1 = df['amount'].quantile(0.25) Q3 = df['amount'].quantile(0.75) IQR = Q3 - Q1 lower = Q1 - 1.5 * IQR upper = Q3 + 1.5 * IQR
outliers = df[(df['amount'] < lower) | (df['amount'] > upper)] print(f'异常值占比: {len(outliers)/len(df):.2%}')
df['amount_clipped'] = df['amount'].clip(lower, upper) df['is_outlier'] = (df['amount'] < lower) | (df['amount'] > upper)
|
8. 输出与导出
1 2 3 4 5 6 7 8
| with pd.ExcelWriter('processed_data.xlsx', engine='openpyxl') as writer: summary.to_excel(writer, sheet_name='汇总', index=False) pivot.to_excel(writer, sheet_name='透视表') outliers.to_excel(writer, sheet_name='异常值', index=False)
df.to_csv('output.csv', index=False, encoding='utf-8-sig')
|
以上流水线可以直接套用到大多数结构化数据处理场景中,作为项目初始的 data_preprocess.py 模板使用。