📚 引言
在上一篇Pandas教程中,我们学习了数据处理的基础技巧。今天,我们将深入探讨Python数据分析的进阶内容,帮助你从"会用"到"精通",真正提升数据分析效率。本文涵盖了性能优化、内存管理、并行处理等高级主题。
1. 使用eval()和query()提升性能
1.1 性能对比
对于大型数据集,eval()和query()可以显著提升运算速度。
import pandas as pd
import numpy as np
# 创建100万行测试数据
df = pd.DataFrame({
'A': np.random.randn(1000000),
'B': np.random.randn(1000000),
'C': np.random.randn(1000000),
'D': np.random.randn(1000000)
})
# 传统方法(慢)
%timeit df['result'] = df['A'] + df['B'] * df['C'] - df['D']
# 输出: 35.7 ms ± 2.1 ms per loop
# 使用eval(快3-5倍)
%timeit df.eval('result = A + B * C - D', inplace=True)
# 输出: 8.2 ms ± 0.5 ms per loop
# 复杂条件筛选
# 传统方法
mask = (df['A'] > 0) & (df['B'] < 0) & (df['C'] > df['D'])
# query方法(更快且更易读)
result = df.query('A > 0 and B < 0 and C > D')
1.2 eval()支持的操作
# 算术运算
df.eval('E = A + B')
df.eval('F = A * B - C / D')
# 比较运算
df.eval('G = A > B')
# 使用Python函数
df.eval('H = abs(A) + sqrt(B**2)')
# 条件表达式
df.eval('I = (A > 0) ? A : 0')
2. 内存优化技巧
2.1 数据类型优化
def optimize_dtypes(df, verbose=True):
"""自动优化DataFrame的数据类型"""
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != 'object':
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
# 整数类型优化
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
else:
# 浮点数优化
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
# 分类数据优化
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
end_mem = df.memory_usage().sum() / 1024**2
if verbose:
print(f'内存使用: {start_mem:.2f} MB -> {end_mem:.2f} MB')
print(f'节省: {(1 - end_mem/start_mem)*100:.1f}%')
return df
# 使用示例
df_optimized = optimize_dtypes(df)
2.2 分块读取大文件
# 分块读取CSV
chunk_size = 10000
chunks = []
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
# 处理每个数据块
processed_chunk = chunk[chunk['value'] > 0]
chunks.append(processed_chunk)
# 合并结果
result = pd.concat(chunks, ignore_index=True)
# 使用迭代器
with pd.read_csv('large_file.csv', chunksize=chunk_size) as reader:
for chunk in reader:
# 逐块处理
process_chunk(chunk)
3. 向量化操作 vs 循环
3.1 性能对比
import pandas as pd
import numpy as np
from numba import jit
df = pd.DataFrame({
'x': np.random.randn(1000000),
'y': np.random.randn(1000000)
})
# 方法1:循环(最慢)
def loop_method(x, y):
result = []
for i in range(len(x)):
if x[i] > 0:
result.append(x[i] + y[i])
else:
result.append(x[i] - y[i])
return result
%timeit loop_method(df['x'].values, df['y'].values)
# 输出: 2.5 s per loop
# 方法2:apply(较慢)
%timeit df.apply(lambda row: row['x'] + row['y'] if row['x'] > 0 else row['x'] - row['y'], axis=1)
# 输出: 3.2 s per loop
# 方法3:向量化+np.where(快)
%timeit np.where(df['x'] > 0, df['x'] + df['y'], df['x'] - df['y'])
# 输出: 15 ms per loop
# 方法4:使用numba JIT编译(最快)
@jit(nopython=True)
def numba_method(x, y):
result = np.empty(len(x))
for i in range(len(x)):
if x[i] > 0:
result[i] = x[i] + y[i]
else:
result[i] = x[i] - y[i]
return result
%timeit numba_method(df['x'].values, df['y'].values)
# 输出: 3 ms per loop
3.2 使用numpy向量化函数
# 三角函数向量化
angles = np.linspace(0, 2*np.pi, 1000000)
%timeit np.sin(angles) # 快速
%timeit [math.sin(angle) for angle in angles] # 慢100倍
# 自定义向量化函数
def custom_func(x):
return x**2 + np.exp(x) - np.log(x + 1)
vectorized_func = np.vectorize(custom_func)
result = vectorized_func(df['x'].values)
4. 高级分组聚合技巧
4.1 使用transform和apply
# 计算组内占比
df['category_pct'] = df.groupby('category')['value'].transform(
lambda x: x / x.sum() * 100
)
# 计算组内排名
df['rank'] = df.groupby('category')['value'].rank(method='dense', ascending=False)
# 组内去重
df['unique_in_group'] = df.groupby('category')['value'].transform('nunique')
# 累积计算
df['cumsum_by_group'] = df.groupby('category')['value'].cumsum()
df['cummax_by_group'] = df.groupby('category')['value'].cummax()
4.2 多级分组与透视
# 多级分组聚合
result = df.groupby(['category', 'sub_category']).agg({
'value': ['sum', 'mean', 'std', 'count'],
'price': ['min', 'max']
})
# 使用pivot_table
pivot = pd.pivot_table(
df,
values='value',
index='category',
columns='date',
aggfunc='sum',
fill_value=0,
margins=True,
margins_name='Total'
)
# 交叉表(频数统计)
crosstab = pd.crosstab(
df['category'],
df['sub_category'],
values=df['value'],
aggfunc='sum',
normalize='index' # 行归一化
)
5. 时间序列高级处理
5.1 重采样与滚动窗口
# 设置时间索引
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
# 不同频率重采样
daily_data = df.resample('D').sum() # 日
weekly_data = df.resample('W').mean() # 周
monthly_data = df.resample('M').agg({ # 月,多指标
'value': 'sum',
'price': 'mean'
})
quarterly_data = df.resample('Q').first() # 季
# 滚动窗口计算
df['rolling_mean_7'] = df['value'].rolling(window=7, min_periods=1).mean()
df['rolling_std_30'] = df['value'].rolling(window=30).std()
df['exp_weighted_mean'] = df['value'].ewm(span=10, adjust=False).mean()
# 扩展窗口(从开始到当前)
df['expanding_mean'] = df['value'].expanding().mean()
df['expanding_max'] = df['value'].expanding().max()
5.2 时间序列特征工程
# 滞后特征
for lag in [1, 3, 7, 14, 30]:
df[f'lag_{lag}'] = df['value'].shift(lag)
# 差分特征
df['diff_1'] = df['value'].diff(1)
df['diff_7'] = df['value'].diff(7)
df['pct_change'] = df['value'].pct_change()
# 日期特征提取
df['year'] = df.index.year
df['month'] = df.index.month
df['quarter'] = df.index.quarter
df['dayofweek'] = df.index.dayofweek
df['dayofyear'] = df.index.dayofyear
df['weekofyear'] = df.index.isocalendar().week
df['is_weekend'] = df.index.dayofweek >= 5
df['is_month_start'] = df.index.is_month_start
df['is_month_end'] = df.index.is_month_end
# 周期性编码
df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
6. 缺失值高级处理
6.1 智能填充策略
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
# 前向/后向填充(时间序列)
df['value'].fillna(method='ffill', inplace=True)
df['value'].fillna(method='bfill', inplace=True)
# 插值法
df['value'] = df['value'].interpolate(method='linear') # 线性插值
df['value'] = df['value'].interpolate(method='polynomial', order=2) # 多项式
df['value'] = df['value'].interpolate(method='spline', order=3) # 样条插值
# KNN填充
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
imputer.fit_transform(df),
columns=df.columns,
index=df.index
)
# 多重插补
imputer = IterativeImputer(max_iter=10, random_state=42)
df_imputed = pd.DataFrame(
imputer.fit_transform(df),
columns=df.columns,
index=df.index
)
6.2 缺失值分析
def missing_analysis(df):
"""缺失值分析报告"""
missing = df.isnull().sum()
missing_pct = missing / len(df) * 100
missing_df = pd.DataFrame({
'缺失数量': missing,
'缺失比例(%)': missing_pct,
'数据类型': df.dtypes
})
missing_df = missing_df[missing_df['缺失数量'] > 0].sort_values('缺失比例(%)', ascending=False)
if len(missing_df) > 0:
print("缺失值分析:")
print(missing_df)
# 可视化缺失模式
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.imshow(df.isnull(), aspect='auto', cmap='viridis')
plt.colorbar(label='缺失值')
plt.title('缺失值模式可视化')
plt.show()
return missing_df
7. 并行处理
7.1 使用swifter加速
import swifter
# 自动选择最优并行方式
df['result'] = df.swifter.apply(lambda x: complex_function(x), axis=1)
# 并行apply
df['processed'] = df['text'].swifter.apply(lambda x: x.lower().strip())
7.2 使用dask处理超大数据
import dask.dataframe as dd
# 读取大数据集
ddf = dd.read_csv('very_large_file.csv')
# 惰性计算
result = ddf.groupby('category')['value'].mean().compute()
# 分区操作
ddf = ddf.repartition(npartitions=10)
result = ddf.map_partitions(lambda df: df[df['value'] > 0]).compute()
# 转换为pandas
df_small = ddf.head(100000) # 取前10万行
7.3 使用multiprocessing
from multiprocessing import Pool, cpu_count
from functools import partial
def process_chunk(chunk, threshold):
"""处理单个数据块"""
return chunk[chunk['value'] > threshold]
def parallel_processing(df, func, n_cores=None):
"""并行处理DataFrame"""
if n_cores is None:
n_cores = cpu_count()
# 分割数据
chunks = np.array_split(df, n_cores)
# 并行处理
with Pool(n_cores) as pool:
results = pool.map(func, chunks)
# 合并结果
return pd.concat(results, ignore_index=True)
# 使用示例
result = parallel_processing(df, lambda chunk: chunk[chunk['value'] > 0])
8. 管道与函数式编程
8.1 创建可复用的数据处理管道
class DataPipeline:
"""数据处理管道"""
def __init__(self):
self.steps = []
def add_step(self, func, *args, **kwargs):
"""添加处理步骤"""
self.steps.append((func, args, kwargs))
return self
def run(self, data):
"""执行管道"""
for func, args, kwargs in self.steps:
data = func(data, *args, **kwargs)
return data
# 定义处理函数
def clean_data(df):
return df.drop_duplicates().fillna(df.median())
def filter_data(df, column, threshold):
return df[df[column] > threshold]
def add_features(df):
df['log_value'] = np.log1p(df['value'])
df['value_squared'] = df['value'] ** 2
return df
def aggregate_data(df, group_col, agg_col):
return df.groupby(group_col)[agg_col].agg(['mean', 'std', 'count'])
# 构建管道
pipeline = DataPipeline()
pipeline.add_step(clean_data)
pipeline.add_step(filter_data, 'value', 0)
pipeline.add_step(add_features)
pipeline.add_step(aggregate_data, 'category', 'value')
# 执行管道
result = pipeline.run(df)
8.2 使用pipe方法
def remove_outliers(df, column, n_std=3):
"""去除异常值"""
mean = df[column].mean()
std = df[column].std()
return df[(df[column] >= mean - n_std*std) & (df[column] <= mean + n_std*std)]
def scale_features(df, columns):
"""标准化特征"""
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
df[columns] = scaler.fit_transform(df[columns])
return df
def encode_categorical(df, columns):
"""编码分类变量"""
from sklearn.preprocessing import LabelEncoder
for col in columns:
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
return df
# 链式调用
df_processed = (df
.pipe(remove_outliers, 'value', 2)
.pipe(scale_features, ['value', 'price'])
.pipe(encode_categorical, ['category', 'region'])
)
9. 与SQL数据库高效交互
9.1 批量写入优化
from sqlalchemy import create_engine
# 创建数据库连接
engine = create_engine('mysql+pymysql://user:password@localhost/dbname')
# 批量写入(chunksize优化)
df.to_sql('table_name', engine, if_exists='append', index=False, chunksize=10000)
# 使用`method='multi'`加速
df.to_sql('table_name', engine, if_exists='replace', index=False,
method='multi', chunksize=5000)
# 分块写入大DataFrame
chunk_size = 50000
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
chunk.to_sql('table_name', engine, if_exists='append', index=False)
print(f'已写入 {i+len(chunk)} / {len(df)} 行')
9.2 高效读取
# 使用SQL查询过滤
query = """
SELECT category, SUM(value) as total_value, COUNT(*) as cnt
FROM table_name
WHERE date >= '2024-01-01'
GROUP BY category
HAVING total_value > 1000
"""
result = pd.read_sql(query, engine)
# 分块读取
chunks = pd.read_sql('SELECT * FROM large_table', engine, chunksize=10000)
for chunk in chunks:
process_chunk(chunk)
10. 性能分析与优化
10.1 使用line_profiler分析代码
# 安装: pip install line_profiler
@profile
def slow_function(df):
result = []
for i in range(len(df)):
if df.loc[i, 'value'] > 0:
result.append(df.loc[i, 'value'] * 2)
return result
# 运行: kernprof -l -v script.py
10.2 使用memory_profiler分析内存
# 安装: pip install memory_profiler
from memory_profiler import profile
@profile
def memory_intensive(df):
df_copy = df.copy()
df_copy['new_col'] = df_copy['value'] ** 2
return df_copy
# 运行: python -m memory_profiler script.py
10.3 性能对比工具
def compare_performance(funcs, *args, **kwargs):
"""对比多个函数的性能"""
results = []
for name, func in funcs:
start = time.time()
result = func(*args, **kwargs)
end = time.time()
results.append({
'function': name,
'time': end - start,
'result_shape': result.shape if hasattr(result, 'shape') else len(result)
})
return pd.DataFrame(results).sort_values('time')
# 使用示例
funcs = [
('loop_method', loop_method),
('vectorized_method', vectorized_method),
('numba_method', numba_method)
]
comparison = compare_performance(funcs, df['x'].values, df['y'].values)
print(comparison)
性能优化总结表
| 技巧 | 性能提升 | 适用场景 |
|---|---|---|
eval()/query() |
3-5倍 | 大型数据集运算 |
| 数据类型优化 | 50-90%内存 | 所有场景 |
| 向量化操作 | 100-1000倍 | 避免循环 |
numba JIT |
100-1000倍 | 数值计算 |
swifter并行 |
2-4倍 | apply操作 |
dask |
超越内存限制 | 超大数据集 |
| 分块处理 | 内存可控 | 大文件读取 |
实战练习
# 综合练习:优化以下代码
# 原始代码(慢)
def slow_process(df):
result = []
for i in range(len(df)):
if df.loc[i, 'value'] > 0:
temp = df.loc[i, 'value'] * df.loc[i, 'price']
result.append(temp)
return pd.Series(result)
# 优化后的代码
def fast_process(df):
mask = df['value'] > 0
return df.loc[mask, 'value'] * df.loc[mask, 'price']
总结
本文介绍了10个Python数据分析的高级技巧:
- ✅
eval()/query():提升运算性能 - ✅ 内存优化:数据类型优化和分块处理
- ✅ 向量化操作:避免循环,使用numpy
- ✅ 高级分组:transform、pivot_table
- ✅ 时间序列:重采样、滚动窗口、特征工程
- ✅ 智能填充:KNN、多重插补
- ✅ 并行处理:swifter、dask、multiprocessing
- ✅ 管道编程:函数式数据处理
- ✅ 数据库优化:批量读写
- ✅ 性能分析:profiler工具
📌 进阶建议:将常用函数封装成自己的工具库,在实际项目中不断优化和积累经验。
🔗 相关文章推荐
如果这篇文章对你有帮助,欢迎点赞、收藏、转发!