📚 引言

在上一篇Pandas教程中,我们学习了数据处理的基础技巧。今天,我们将深入探讨Python数据分析的进阶内容,帮助你从"会用"到"精通",真正提升数据分析效率。本文涵盖了性能优化、内存管理、并行处理等高级主题。


1. 使用eval()query()提升性能

1.1 性能对比

对于大型数据集,eval()query()可以显著提升运算速度。

import pandas as pd
import numpy as np

# 创建100万行测试数据
df = pd.DataFrame({
    'A': np.random.randn(1000000),
    'B': np.random.randn(1000000),
    'C': np.random.randn(1000000),
    'D': np.random.randn(1000000)
})

# 传统方法(慢)
%timeit df['result'] = df['A'] + df['B'] * df['C'] - df['D']
# 输出: 35.7 ms ± 2.1 ms per loop

# 使用eval(快3-5倍)
%timeit df.eval('result = A + B * C - D', inplace=True)
# 输出: 8.2 ms ± 0.5 ms per loop

# 复杂条件筛选
# 传统方法
mask = (df['A'] > 0) & (df['B'] < 0) & (df['C'] > df['D'])

# query方法(更快且更易读)
result = df.query('A > 0 and B < 0 and C > D')

1.2 eval()支持的操作

# 算术运算
df.eval('E = A + B')
df.eval('F = A * B - C / D')

# 比较运算
df.eval('G = A > B')

# 使用Python函数
df.eval('H = abs(A) + sqrt(B**2)')

# 条件表达式
df.eval('I = (A > 0) ? A : 0')

2. 内存优化技巧

2.1 数据类型优化

def optimize_dtypes(df, verbose=True):
    """自动优化DataFrame的数据类型"""
    start_mem = df.memory_usage().sum() / 1024**2

    for col in df.columns:
        col_type = df[col].dtype

        if col_type != 'object':
            c_min = df[col].min()
            c_max = df[col].max()

            if str(col_type)[:3] == 'int':
                # 整数类型优化
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
            else:
                # 浮点数优化
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
        else:
            # 分类数据优化
            if df[col].nunique() / len(df) < 0.5:
                df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() / 1024**2

    if verbose:
        print(f'内存使用: {start_mem:.2f} MB -> {end_mem:.2f} MB')
        print(f'节省: {(1 - end_mem/start_mem)*100:.1f}%')

    return df

# 使用示例
df_optimized = optimize_dtypes(df)

2.2 分块读取大文件

# 分块读取CSV
chunk_size = 10000
chunks = []

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    # 处理每个数据块
    processed_chunk = chunk[chunk['value'] > 0]
    chunks.append(processed_chunk)

# 合并结果
result = pd.concat(chunks, ignore_index=True)

# 使用迭代器
with pd.read_csv('large_file.csv', chunksize=chunk_size) as reader:
    for chunk in reader:
        # 逐块处理
        process_chunk(chunk)

3. 向量化操作 vs 循环

3.1 性能对比

import pandas as pd
import numpy as np
from numba import jit

df = pd.DataFrame({
    'x': np.random.randn(1000000),
    'y': np.random.randn(1000000)
})

# 方法1:循环(最慢)
def loop_method(x, y):
    result = []
    for i in range(len(x)):
        if x[i] > 0:
            result.append(x[i] + y[i])
        else:
            result.append(x[i] - y[i])
    return result

%timeit loop_method(df['x'].values, df['y'].values)
# 输出: 2.5 s per loop

# 方法2:apply(较慢)
%timeit df.apply(lambda row: row['x'] + row['y'] if row['x'] > 0 else row['x'] - row['y'], axis=1)
# 输出: 3.2 s per loop

# 方法3:向量化+np.where(快)
%timeit np.where(df['x'] > 0, df['x'] + df['y'], df['x'] - df['y'])
# 输出: 15 ms per loop

# 方法4:使用numba JIT编译(最快)
@jit(nopython=True)
def numba_method(x, y):
    result = np.empty(len(x))
    for i in range(len(x)):
        if x[i] > 0:
            result[i] = x[i] + y[i]
        else:
            result[i] = x[i] - y[i]
    return result

%timeit numba_method(df['x'].values, df['y'].values)
# 输出: 3 ms per loop

3.2 使用numpy向量化函数

# 三角函数向量化
angles = np.linspace(0, 2*np.pi, 1000000)
%timeit np.sin(angles)  # 快速
%timeit [math.sin(angle) for angle in angles]  # 慢100倍

# 自定义向量化函数
def custom_func(x):
    return x**2 + np.exp(x) - np.log(x + 1)

vectorized_func = np.vectorize(custom_func)
result = vectorized_func(df['x'].values)

4. 高级分组聚合技巧

4.1 使用transformapply

# 计算组内占比
df['category_pct'] = df.groupby('category')['value'].transform(
    lambda x: x / x.sum() * 100
)

# 计算组内排名
df['rank'] = df.groupby('category')['value'].rank(method='dense', ascending=False)

# 组内去重
df['unique_in_group'] = df.groupby('category')['value'].transform('nunique')

# 累积计算
df['cumsum_by_group'] = df.groupby('category')['value'].cumsum()
df['cummax_by_group'] = df.groupby('category')['value'].cummax()

4.2 多级分组与透视

# 多级分组聚合
result = df.groupby(['category', 'sub_category']).agg({
    'value': ['sum', 'mean', 'std', 'count'],
    'price': ['min', 'max']
})

# 使用pivot_table
pivot = pd.pivot_table(
    df,
    values='value',
    index='category',
    columns='date',
    aggfunc='sum',
    fill_value=0,
    margins=True,
    margins_name='Total'
)

# 交叉表(频数统计)
crosstab = pd.crosstab(
    df['category'],
    df['sub_category'],
    values=df['value'],
    aggfunc='sum',
    normalize='index'  # 行归一化
)

5. 时间序列高级处理

5.1 重采样与滚动窗口

# 设置时间索引
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

# 不同频率重采样
daily_data = df.resample('D').sum()      # 日
weekly_data = df.resample('W').mean()     # 周
monthly_data = df.resample('M').agg({     # 月,多指标
    'value': 'sum',
    'price': 'mean'
})
quarterly_data = df.resample('Q').first()  # 季

# 滚动窗口计算
df['rolling_mean_7'] = df['value'].rolling(window=7, min_periods=1).mean()
df['rolling_std_30'] = df['value'].rolling(window=30).std()
df['exp_weighted_mean'] = df['value'].ewm(span=10, adjust=False).mean()

# 扩展窗口(从开始到当前)
df['expanding_mean'] = df['value'].expanding().mean()
df['expanding_max'] = df['value'].expanding().max()

5.2 时间序列特征工程

# 滞后特征
for lag in [1, 3, 7, 14, 30]:
    df[f'lag_{lag}'] = df['value'].shift(lag)

# 差分特征
df['diff_1'] = df['value'].diff(1)
df['diff_7'] = df['value'].diff(7)
df['pct_change'] = df['value'].pct_change()

# 日期特征提取
df['year'] = df.index.year
df['month'] = df.index.month
df['quarter'] = df.index.quarter
df['dayofweek'] = df.index.dayofweek
df['dayofyear'] = df.index.dayofyear
df['weekofyear'] = df.index.isocalendar().week
df['is_weekend'] = df.index.dayofweek >= 5
df['is_month_start'] = df.index.is_month_start
df['is_month_end'] = df.index.is_month_end

# 周期性编码
df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)

6. 缺失值高级处理

6.1 智能填充策略

from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

# 前向/后向填充(时间序列)
df['value'].fillna(method='ffill', inplace=True)
df['value'].fillna(method='bfill', inplace=True)

# 插值法
df['value'] = df['value'].interpolate(method='linear')  # 线性插值
df['value'] = df['value'].interpolate(method='polynomial', order=2)  # 多项式
df['value'] = df['value'].interpolate(method='spline', order=3)  # 样条插值

# KNN填充
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
    imputer.fit_transform(df),
    columns=df.columns,
    index=df.index
)

# 多重插补
imputer = IterativeImputer(max_iter=10, random_state=42)
df_imputed = pd.DataFrame(
    imputer.fit_transform(df),
    columns=df.columns,
    index=df.index
)

6.2 缺失值分析

def missing_analysis(df):
    """缺失值分析报告"""
    missing = df.isnull().sum()
    missing_pct = missing / len(df) * 100

    missing_df = pd.DataFrame({
        '缺失数量': missing,
        '缺失比例(%)': missing_pct,
        '数据类型': df.dtypes
    })

    missing_df = missing_df[missing_df['缺失数量'] > 0].sort_values('缺失比例(%)', ascending=False)

    if len(missing_df) > 0:
        print("缺失值分析:")
        print(missing_df)

        # 可视化缺失模式
        import matplotlib.pyplot as plt
        plt.figure(figsize=(12, 6))
        plt.imshow(df.isnull(), aspect='auto', cmap='viridis')
        plt.colorbar(label='缺失值')
        plt.title('缺失值模式可视化')
        plt.show()

    return missing_df

7. 并行处理

7.1 使用swifter加速

import swifter

# 自动选择最优并行方式
df['result'] = df.swifter.apply(lambda x: complex_function(x), axis=1)

# 并行apply
df['processed'] = df['text'].swifter.apply(lambda x: x.lower().strip())

7.2 使用dask处理超大数据

import dask.dataframe as dd

# 读取大数据集
ddf = dd.read_csv('very_large_file.csv')

# 惰性计算
result = ddf.groupby('category')['value'].mean().compute()

# 分区操作
ddf = ddf.repartition(npartitions=10)
result = ddf.map_partitions(lambda df: df[df['value'] > 0]).compute()

# 转换为pandas
df_small = ddf.head(100000)  # 取前10万行

7.3 使用multiprocessing

from multiprocessing import Pool, cpu_count
from functools import partial

def process_chunk(chunk, threshold):
    """处理单个数据块"""
    return chunk[chunk['value'] > threshold]

def parallel_processing(df, func, n_cores=None):
    """并行处理DataFrame"""
    if n_cores is None:
        n_cores = cpu_count()

    # 分割数据
    chunks = np.array_split(df, n_cores)

    # 并行处理
    with Pool(n_cores) as pool:
        results = pool.map(func, chunks)

    # 合并结果
    return pd.concat(results, ignore_index=True)

# 使用示例
result = parallel_processing(df, lambda chunk: chunk[chunk['value'] > 0])

8. 管道与函数式编程

8.1 创建可复用的数据处理管道

class DataPipeline:
    """数据处理管道"""

    def __init__(self):
        self.steps = []

    def add_step(self, func, *args, **kwargs):
        """添加处理步骤"""
        self.steps.append((func, args, kwargs))
        return self

    def run(self, data):
        """执行管道"""
        for func, args, kwargs in self.steps:
            data = func(data, *args, **kwargs)
        return data

# 定义处理函数
def clean_data(df):
    return df.drop_duplicates().fillna(df.median())

def filter_data(df, column, threshold):
    return df[df[column] > threshold]

def add_features(df):
    df['log_value'] = np.log1p(df['value'])
    df['value_squared'] = df['value'] ** 2
    return df

def aggregate_data(df, group_col, agg_col):
    return df.groupby(group_col)[agg_col].agg(['mean', 'std', 'count'])

# 构建管道
pipeline = DataPipeline()
pipeline.add_step(clean_data)
pipeline.add_step(filter_data, 'value', 0)
pipeline.add_step(add_features)
pipeline.add_step(aggregate_data, 'category', 'value')

# 执行管道
result = pipeline.run(df)

8.2 使用pipe方法

def remove_outliers(df, column, n_std=3):
    """去除异常值"""
    mean = df[column].mean()
    std = df[column].std()
    return df[(df[column] >= mean - n_std*std) & (df[column] <= mean + n_std*std)]

def scale_features(df, columns):
    """标准化特征"""
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()
    df[columns] = scaler.fit_transform(df[columns])
    return df

def encode_categorical(df, columns):
    """编码分类变量"""
    from sklearn.preprocessing import LabelEncoder
    for col in columns:
        le = LabelEncoder()
        df[col] = le.fit_transform(df[col])
    return df

# 链式调用
df_processed = (df
    .pipe(remove_outliers, 'value', 2)
    .pipe(scale_features, ['value', 'price'])
    .pipe(encode_categorical, ['category', 'region'])
)

9. 与SQL数据库高效交互

9.1 批量写入优化

from sqlalchemy import create_engine

# 创建数据库连接
engine = create_engine('mysql+pymysql://user:password@localhost/dbname')

# 批量写入(chunksize优化)
df.to_sql('table_name', engine, if_exists='append', index=False, chunksize=10000)

# 使用`method='multi'`加速
df.to_sql('table_name', engine, if_exists='replace', index=False, 
          method='multi', chunksize=5000)

# 分块写入大DataFrame
chunk_size = 50000
for i in range(0, len(df), chunk_size):
    chunk = df.iloc[i:i+chunk_size]
    chunk.to_sql('table_name', engine, if_exists='append', index=False)
    print(f'已写入 {i+len(chunk)} / {len(df)} 行')

9.2 高效读取

# 使用SQL查询过滤
query = """
SELECT category, SUM(value) as total_value, COUNT(*) as cnt
FROM table_name
WHERE date >= '2024-01-01'
GROUP BY category
HAVING total_value > 1000
"""

result = pd.read_sql(query, engine)

# 分块读取
chunks = pd.read_sql('SELECT * FROM large_table', engine, chunksize=10000)
for chunk in chunks:
    process_chunk(chunk)

10. 性能分析与优化

10.1 使用line_profiler分析代码

# 安装: pip install line_profiler

@profile
def slow_function(df):
    result = []
    for i in range(len(df)):
        if df.loc[i, 'value'] > 0:
            result.append(df.loc[i, 'value'] * 2)
    return result

# 运行: kernprof -l -v script.py

10.2 使用memory_profiler分析内存

# 安装: pip install memory_profiler

from memory_profiler import profile

@profile
def memory_intensive(df):
    df_copy = df.copy()
    df_copy['new_col'] = df_copy['value'] ** 2
    return df_copy

# 运行: python -m memory_profiler script.py

10.3 性能对比工具

def compare_performance(funcs, *args, **kwargs):
    """对比多个函数的性能"""
    results = []

    for name, func in funcs:
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()

        results.append({
            'function': name,
            'time': end - start,
            'result_shape': result.shape if hasattr(result, 'shape') else len(result)
        })

    return pd.DataFrame(results).sort_values('time')

# 使用示例
funcs = [
    ('loop_method', loop_method),
    ('vectorized_method', vectorized_method),
    ('numba_method', numba_method)
]

comparison = compare_performance(funcs, df['x'].values, df['y'].values)
print(comparison)

性能优化总结表

技巧 性能提升 适用场景
eval()/query() 3-5倍 大型数据集运算
数据类型优化 50-90%内存 所有场景
向量化操作 100-1000倍 避免循环
numba JIT 100-1000倍 数值计算
swifter并行 2-4倍 apply操作
dask 超越内存限制 超大数据集
分块处理 内存可控 大文件读取

实战练习

# 综合练习:优化以下代码

# 原始代码(慢)
def slow_process(df):
    result = []
    for i in range(len(df)):
        if df.loc[i, 'value'] > 0:
            temp = df.loc[i, 'value'] * df.loc[i, 'price']
            result.append(temp)
    return pd.Series(result)

# 优化后的代码
def fast_process(df):
    mask = df['value'] > 0
    return df.loc[mask, 'value'] * df.loc[mask, 'price']

总结

本文介绍了10个Python数据分析的高级技巧:

  • eval()/query():提升运算性能
  • 内存优化:数据类型优化和分块处理
  • 向量化操作:避免循环,使用numpy
  • 高级分组:transform、pivot_table
  • 时间序列:重采样、滚动窗口、特征工程
  • 智能填充:KNN、多重插补
  • 并行处理:swifter、dask、multiprocessing
  • 管道编程:函数式数据处理
  • 数据库优化:批量读写
  • 性能分析:profiler工具

📌 进阶建议:将常用函数封装成自己的工具库,在实际项目中不断优化和积累经验。


🔗 相关文章推荐


如果这篇文章对你有帮助,欢迎点赞、收藏、转发!