第3周 - pandas 数据清洗与处理实战手册

pandas 数据清洗与处理实战手册(求职导向)

本手册聚焦数据分析师(侧重数据清洗与处理)岗位的实战需求,围绕真实业务场景(电商、用户行为、CRM等),系统讲解 pandas 核心应用,所有内容均服务于企业实际工作任务,可作为求职前技能强化资料。

第一章 数据读取与初步探查(实战第一步)

1.1 业务背景

拿到一份原始数据(如电商订单CSV、CRM客户Excel),首先需快速了解数据结构、字段含义、数据规模、是否存在明显问题(如缺失值、异常格式),为后续清洗做准备。

1.2 核心方法与参数详解

  • pd.read_csv():读取文本格式数据,核心参数包括filepath_or_buffer(文件路径)、sep(分隔符,默认逗号)、header(表头行,默认0)、names(自定义列名)、dtype(指定列数据类型)、na_values(指定缺失值标识)、parse_dates(自动解析日期列)。
  • df.info():查看数据基本信息,包括行数、列数、各列数据类型、非空值数量,快速定位缺失值和异常类型。
  • df.describe():统计数值型列的描述性统计量(计数、均值、标准差、最值、分位数),初步识别异常值。
  • df.value_counts():统计单个/多个列的唯一值频次,常用于分类字段分布分析(如用户性别占比、商品品类分布)。

1.3 带注释代码示例


import pandas as pd
import numpy as np

# 1. 读取电商订单数据(模拟真实场景,含缺失值标识、日期列)
# 业务场景:读取电商平台每日订单数据,原始数据中"无"表示缺失,日期列需解析
orders_df = pd.read_csv(
    "order_data.csv",  # 实际工作中替换为文件路径
    sep=",",
    header=0,
    names=["order_id", "user_id", "product_id", "order_amount", "order_time", "pay_status", "address"],  # 自定义列名(若原始数据无表头)
    dtype={"order_id": str, "user_id": str},  # 订单ID、用户ID设为字符串,避免丢失前导零
    na_values=["无", "NaN"],  # 将"无"识别为缺失值
    parse_dates=["order_time"]  # 自动解析订单时间列为datetime类型
)

# 2. 初步探查数据
print("数据基本信息:")
orders_df.info()  # 查看数据类型、非空值数量,快速发现address列缺失较多

print("\n数值型字段统计:")
print(orders_df.describe())  # 分析order_amount是否有异常(如负值、极大值)

print("\n支付状态分布:")
print(orders_df["pay_status"].value_counts(dropna=False))  # dropna=False显示缺失值频次,了解未支付/缺失情况

print("\n前5行数据预览:")
print(orders_df.head())

1.4 常见陷阱与最佳实践

  • 常见陷阱:① 订单ID、手机号等字段被自动识别为数值型,导致前导零丢失;② 日期列未解析为datetime类型,无法进行时间维度分析;③ 自定义缺失值标识(如"无"、"未知")未被识别,统计失真。
  • 最佳实践:① 读取时用dtype明确指定字符串类型字段(ID、手机号、地址);② 优先用parse_dates解析日期列,避免后续转换麻烦;③ 结合业务场景定义na_values,确保缺失值被正确识别;④ 读取大文件时可指定usecols筛选必要列,减少内存占用。

第二章 缺失值处理策略(业务导向)

2.1 业务背景

真实数据中普遍存在缺失值(如用户表中"年龄"缺失、订单表中"收货地址"缺失、CRM表中"客户行业"缺失),处理方式需结合业务逻辑,不能盲目删除或填充。

2.2 核心方法与参数详解

  • df.isnull()/df.notnull():检测缺失值,返回布尔矩阵。
  • df.isnull().sum():统计每列缺失值数量及占比,评估缺失影响。
  • df.dropna():删除缺失值,核心参数axis(0=行,1=列)、how("any"=存在缺失即删除,"all"=全为缺失才删除)、subset(指定判断缺失的列)。
  • df.fillna():填充缺失值,核心参数 value(固定值)、method("ffill"前向填充、"bfill"后向填充)、inplace(是否原地修改)。
  • 自定义填充:结合业务逻辑(如按品类填充商品单价、按用户等级填充消费金额)。

2.3 带注释代码示例


import pandas as pd
import numpy as np

# 模拟电商订单数据(含缺失值)
data = {
    "order_id": ["O001", "O002", "O003", "O004", "O005", "O006"],
    "user_id": ["U001", "U002", "U003", np.nan, "U002", "U001"],
    "product_id": ["P001", "P002", np.nan, "P003", "P002", "P001"],
    "order_amount": [199.0, 299.0, 399.0, np.nan, 299.0, 199.0],
    "order_time": pd.date_range("2026-01-01", periods=6, freq="D"),
    "pay_status": ["已支付", "已支付", np.nan, "未支付", "已支付", np.nan]
}
orders_df = pd.DataFrame(data)

# 1. 检测缺失值
print("各列缺失值数量及占比:")
missing_info = orders_df.isnull().sum()
missing_ratio = (missing_info / len(orders_df)) * 100
missing_df = pd.DataFrame({"缺失数量": missing_info, "缺失占比(%)": missing_ratio.round(2)})
print(missing_df)

# 2. 缺失值处理(分场景)
# 场景1:user_id缺失(无法关联用户信息,订单无效)→ 删除行
df_clean1 = orders_df.dropna(subset=["user_id"], how="any")
print("\n删除user_id缺失行后的数据:")
print(df_clean1)

# 场景2:order_amount缺失(假设同商品金额一致,按product_id分组填充均值)
df_clean2 = df_clean1.copy()
df_clean2["order_amount"] = df_clean2.groupby("product_id")["order_amount"].transform(
    lambda x: x.fillna(x.mean())  # 按商品分组,用组内均值填充缺失值
)
print("\n按商品分组填充金额后的数据:")
print(df_clean2[["order_id", "product_id", "order_amount"]])

# 场景3:pay_status缺失(无法确定支付状态,标记为"待确认")
df_clean3 = df_clean2.copy()
df_clean3["pay_status"] = df_clean3["pay_status"].fillna("待确认")
print("\n标记支付状态缺失后的数据:")
print(df_clean3[["order_id", "pay_status"]])

# 场景4:product_id缺失(前向填充,假设连续订单为同一商品)
df_clean4 = df_clean3.copy()
df_clean4["product_id"] = df_clean4["product_id"].fillna(method="ffill")
print("\n前向填充商品ID后的数据:")
print(df_clean4[["order_id", "product_id"]])

2.4 常见陷阱与最佳实践

  • 常见陷阱:① 盲目用均值填充所有缺失值(如对分类字段填充均值、对偏态分布的金额字段填充均值);② 用inplace=True原地修改数据,导致原始数据丢失,无法回滚;③ 链式赋值导致缺失值处理失效(如df[df["col"].isnull()]["col"] = 0)。
  • 最佳实践:① 先分析缺失原因(随机缺失/业务缺失),再决定处理方式:缺失占比>50%且无业务意义可删除列,关键字段缺失可删除行,有规律缺失按业务逻辑填充;② 优先用loc显式赋值避免链式警告(如df.loc[df["col"].isnull(), "col"] = 0);③ 填充前保留原始数据副本,便于验证;④ 分类字段用众数填充,时间序列数据用前向/后向填充,数值型字段根据分布用均值(正态)或中位数(偏态)填充。

第三章 重复值与异常值检测与处理

3.1 业务背景

重复值(如订单重复录入、用户重复注册)会导致统计结果虚高;异常值(如订单金额为负、单价远超正常范围、年龄大于100岁)会干扰分析结论,需精准识别并处理。

3.2 核心方法与参数详解

3.2.1 重复值处理

  • df.duplicated():检测重复行,核心参数subset(指定判断重复的列,如订单ID)、keep("first"保留第一行、"last"保留最后一行、False标记所有重复行)。
  • df.drop_duplicates():删除重复行,参数同duplicated()inplace控制是否原地修改。

3.2.2 异常值处理

  • 统计法:基于describe()计算分位数,用IQR(四分位距)识别异常值(Q1-1.5IQR ~ Q3+1.5IQR)。
  • 业务法:结合业务规则(如订单金额>0、年龄18-100岁、单价在合理区间)。
  • 方法:删除异常值、修正异常值(如录入错误)、标记异常值后单独分析。

3.3 带注释代码示例


import pandas as pd
import numpy as np

# 模拟数据(含重复值、异常值)
data = {
    "order_id": ["O001", "O002", "O002", "O003", "O004", "O005", "O006"],
    "user_id": ["U001", "U002", "U002", "U003", "U004", "U005", "U006"],
    "order_amount": [199.0, 299.0, 299.0, -50.0, 99999.0, 399.0, 499.0],
    "age": [25, 30, 30, 40, 150, 35, np.nan]
}
orders_df = pd.DataFrame(data)

# 一、重复值处理
# 1. 检测重复行(按order_id判断,订单ID唯一)
duplicated_rows = orders_df.duplicated(subset=["order_id"], keep="first")
print("重复行标记:")
print(orders_df[duplicated_rows])

# 2. 删除重复行,保留第一行
df_no_dup = orders_df.drop_duplicates(subset=["order_id"], keep="first", inplace=False)
print("\n删除重复值后的数据:")
print(df_no_dup[["order_id", "user_id", "order_amount"]])

# 二、异常值处理
# 场景1:订单金额异常(负金额、极大值)→ 基于业务规则过滤
df_amount_clean = df_no_dup.copy()
# 保留金额在1-10000之间的数据(业务定义合理区间)
df_amount_clean = df_amount_clean[(df_amount_clean["order_amount"] > 0) & (df_amount_clean["order_amount"] <= 10000)]
print("\n过滤金额异常后的数据:")
print(df_amount_clean[["order_id", "order_amount"]])

# 场景2:年龄异常(>100岁)→ 用中位数修正
df_age_clean = df_amount_clean.copy()
# 计算合理年龄的中位数(排除异常值)
age_median = df_age_clean[df_age_clean["age"] <= 100]["age"].median()
# 替换异常年龄
df_age_clean.loc[df_age_clean["age"] > 100, "age"] = age_median
print("\n修正年龄异常后的数据:")
print(df_age_clean[["order_id", "age"]])

# 场景3:用IQR方法检测金额异常(适用于无明确业务区间的场景)
df_iqr = df_no_dup.copy()
Q1 = df_iqr["order_amount"].quantile(0.25)
Q3 = df_iqr["order_amount"].quantile(0.75)
IQR = Q3 - Q1
# 定义正常范围
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 筛选正常数据
df_iqr_clean = df_iqr[(df_iqr["order_amount"] >= lower_bound) & (df_iqr["order_amount"] <= upper_bound)]
print("\nIQR方法过滤异常后的数据:")
print(df_iqr_clean[["order_id", "order_amount"]])

3.4 常见陷阱与最佳实践

  • 常见陷阱:① 仅按单一字段判断重复值(如用户ID重复可能是正常复购,需结合订单ID);② 直接删除所有异常值,忽略异常值背后的业务原因(如大额订单可能是企业客户采购);③ 用固定阈值处理不同品类数据(如低价商品和高价商品共用同一金额阈值)。
  • 最佳实践:① 重复值判断需结合业务主键(如订单表用order_id,用户表用user_id);② 异常值先验证原因,再决定处理方式:录入错误可修正,真实异常(如大额订单)可单独标记分析,无意义异常可删除;③ 分品类/分群体设定异常值阈值,避免一刀切;④ 处理异常值后需验证数据分布,确保不影响整体分析。

第四章 数据类型优化(提升性能与效率)

4.1 业务背景

原始数据中常存在数据类型不合理的问题(如类别字段为object、日期为字符串、数值字段为object),导致内存占用过大、运算效率低、无法使用对应字段的分析方法(如日期加减)。

4.2 核心方法与参数详解

  • df.dtypes:查看各列数据类型。
  • df.astype():类型转换,核心参数 dtype(目标类型,如"category"、"int64"、"float64")。
  • pd.to_datetime():字符串转datetime类型,核心参数 format(日期格式,如"%Y-%m-%d %H:%M:%S")、errors("coerce"无法解析的转为缺失值)。
  • pd.to_numeric():字符串转数值类型,errors参数控制异常处理。
  • category类型:适用于低基数分类字段(如性别、支付方式),大幅减少内存占用。

4.3 带注释代码示例


import pandas as pd
import numpy as np

# 模拟数据(数据类型不合理)
data = {
    "user_id": ["U001", "U002", "U003", "U004", "U005"],
    "gender": ["男", "女", "男", "女", "男"],  # 低基数分类,object类型
    "age": ["25", "30", "35", "40", "45"],  # 数值型,字符串格式
    "register_time": ["2026-01-01", "2026-01-02", "2026-01-03", "2026-01-04", "2026-01-05"],  # 日期字符串
    "consume_amount": ["199.5", "299.0", "399.8", "499.2", "599.5"],  # 数值型,字符串格式
    "pay_method": ["支付宝", "微信", "支付宝", "微信", "支付宝"]  # 低基数分类,object类型
}
user_df = pd.DataFrame(data)

# 1. 查看原始数据类型及内存占用
print("原始数据类型:")
print(user_df.dtypes)
print(f"\n原始数据内存占用:{user_df.memory_usage(deep=True).sum() / 1024:.2f} KB")

# 2. 数据类型优化
df_optimized = user_df.copy()

# 场景1:字符串数值 → 数值类型
df_optimized["age"] = pd.to_numeric(df_optimized["age"], errors="coerce")  # 无法解析的转为NaN
df_optimized["consume_amount"] = pd.to_numeric(df_optimized["consume_amount"], errors="coerce")

# 场景2:日期字符串 → datetime类型
df_optimized["register_time"] = pd.to_datetime(df_optimized["register_time"], format="%Y-%m-%d")

# 场景3:低基数分类字段 → category类型
df_optimized["gender"] = df_optimized["gender"].astype("category")
df_optimized["pay_method"] = df_optimized["pay_method"].astype("category")

# 3. 查看优化后结果
print("\n优化后数据类型:")
print(df_optimized.dtypes)
print(f"\n优化后数据内存占用:{df_optimized.memory_usage(deep=True).sum() / 1024:.2f} KB")

# 4. 验证优化效果(日期运算、数值统计)
print("\n注册时间加7天:")
df_optimized["next_week"] = df_optimized["register_time"] + pd.Timedelta(days=7)
print(df_optimized[["user_id", "register_time", "next_week"]])

print("\n年龄均值和消费金额总和:")
print(f"平均年龄:{df_optimized['age'].mean():.1f}岁")
print(f"总消费金额:{df_optimized['consume_amount'].sum():.2f}元")

4.4 常见陷阱与最佳实践

  • 常见陷阱:① 直接用astype()转换含异常值的字符串数值(如"25岁"转int失败);② 高基数分类字段(如用户ID)转为category类型,反而增加内存占用;③ 日期转换时未指定format,导致解析效率低或解析错误。
  • 最佳实践:① 转换数值类型优先用pd.to_numeric(),并设置errors="coerce"处理异常;② category类型仅用于基数<10%总行数的分类字段;③ 日期转换时明确指定format,提升解析速度;④ 优化后验证数据完整性,避免转换过程中丢失信息;⑤ 大数据集优先优化类型,再进行后续分析,减少内存压力。

第五章 字符串清洗(正则与格式统一)

5.1 业务背景

字符串字段常存在格式混乱问题(如手机号含空格/特殊字符、姓名含错别字、地址格式不统一、商品名称冗余),需通过字符串处理实现标准化,便于后续分析和关联。

5.2 核心方法与参数详解

  • 基础清洗:str.strip()(去首尾空格)、str.replace()(替换字符)、str.lower()/str.upper()(大小写转换)。
  • 正则处理:str.extract()(提取匹配内容)、str.match()(判断是否匹配)、str.findall()(查找所有匹配项),正则表达式(如手机号:r'1[3-9]\d{9}')。
  • 格式统一:str.pad()(补全长度)、str.zfill()(前补零)。

5.3 带注释代码示例


import pandas as pd
import re

# 模拟数据(字符串格式混乱)
data = {
    "user_id": ["U001", "U002", "U003", "U004", "U005"],
    "phone": ["138 1234 5678", "139-9876-5432", "18612345678", "137.6543.2109", "136 7890 12345"],  # 手机号格式混乱
    "name": [" 张三 ", "李四_", "王老五", "王芳?", "王 小 明"],  # 姓名含空格、特殊字符
    "address": ["北京市海淀区", "上海 市浦东新区", "广东省深圳市南山区", "浙江省杭州市西湖区", "江苏省 南京市玄武区"],  # 地址含空格
    "product_name": ["iPhone 15 Pro", "华为Mate 60", "小米14 标准版", "OPPO Find X7", "vivo X100 Pro+"]  # 商品名称需提取品牌
}
user_df = pd.DataFrame(data)

# 1. 手机号清洗(统一为无分隔符纯数字,过滤无效号码)
df_clean = user_df.copy()
# 用正则提取纯数字
df_clean["phone_clean"] = df_clean["phone"].str.extract(r'(\d+)')[0]
# 过滤长度为11位的有效手机号
df_clean = df_clean[df_clean["phone_clean"].str.len() == 11]
print("手机号清洗结果:")
print(df_clean[["user_id", "phone", "phone_clean"]])

# 2. 姓名清洗(去除空格、特殊字符,统一格式)
df_clean["name_clean"] = df_clean["name"].str.strip()  # 去首尾空格
df_clean["name_clean"] = df_clean["name_clean"].str.replace(r'[_?\s]', '', regex=True)  # 去除特殊字符和中间空格
print("\n姓名清洗结果:")
print(df_clean[["user_id", "name", "name_clean"]])

# 3. 地址清洗(去除多余空格,统一格式)
df_clean["address_clean"] = df_clean["address"].str.replace(r'\s+', '', regex=True)  # 去除所有空格
print("\n地址清洗结果:")
print(df_clean[["user_id", "address", "address_clean"]])

# 4. 从商品名称提取品牌(正则匹配)
def extract_brand(name):
    brands = ["iPhone", "华为", "小米", "OPPO", "vivo"]
    for brand in brands:
        if re.search(brand, name):
            return brand
    return "其他"

df_clean["brand"] = df_clean["product_name"].apply(extract_brand)
print("\n商品品牌提取结果:")
print(df_clean[["product_name", "brand"]])

# 5. 统一品牌名称格式(小写转大写,统一表述)
df_clean["brand"] = df_clean["brand"].str.replace("iPhone", "苹果")
df_clean["brand"] = df_clean["brand"].str.upper()
print("\n品牌名称统一结果:")
print(df_clean[["product_name", "brand"]])

5.4 常见陷阱与最佳实践

  • 常见陷阱:① 正则表达式编写错误,导致提取内容失真(如手机号提取漏匹配);② 盲目替换特殊字符,误删有效信息(如姓名中的复姓"欧阳"被拆分);③ 未考虑边界情况(如无效手机号、未知品牌),导致数据缺失。
  • 最佳实践:① 编写正则后先小范围测试,验证提取准确性;② 特殊字符替换需结合业务场景,保留有效信息;③ 对边界情况(如未知品牌、无效手机号)标记为"其他"或过滤,避免数据丢失;④ 复杂字符串处理用自定义函数封装,提升代码可复用性;⑤ 清洗后对比原始数据,验证清洗效果。

第六章 日期时间处理(解析与时间维度分析)

6.1 业务背景

时间维度分析是数据分析师核心工作(如日/周/月销售额、用户留存率、订单高峰时段),需对日期时间字段进行解析、提取、重采样等处理,转化为可分析的时间特征。

6.2 核心方法与参数详解

  • 解析:pd.to_datetime()(字符串转datetime)。
  • 提取特征:dt.year/dt.month/dt.day(年月日)、dt.hour(小时)、dt.weekday(星期,0=周一)、dt.is_month_end(是否月末)。
  • 时间计算:pd.Timedelta()(时间差)、日期加减。
  • 重采样:df.resample()(按时间频率聚合,如日、周、月),需先将datetime列设为索引。
  • 时区处理:dt.tz_localize()(设置时区)、dt.tz_convert()(转换时区)。

6.3 带注释代码示例


import pandas as pd
import numpy as np

# 模拟电商订单时间数据
data = {
    "order_id": ["O001", "O002", "O003", "O004", "O005", "O006", "O007", "O008"],
    "order_time": ["2026-01-01 09:30:00", "2026-01-01 14:15:00", "2026-01-02 10:00:00", "2026-01-03 16:45:00",
                   "2026-01-05 08:20:00", "2026-01-07 20:10:00", "2026-01-10 11:50:00", "2026-01-15 13:25:00"],
    "order_amount": [199.0, 299.0, 399.0, 499.0, 599.0, 699.0, 799.0, 899.0]
}
orders_df = pd.DataFrame(data)

# 1. 解析日期时间字段
orders_df["order_time"] = pd.to_datetime(orders_df["order_time"], format="%Y-%m-%d %H:%M:%S")
print("解析后的日期时间数据:")
print(orders_df[["order_id", "order_time", "order_amount"]])

# 2. 提取时间特征
orders_df["year"] = orders_df["order_time"].dt.year
orders_df["month"] = orders_df["order_time"].dt.month
orders_df["day"] = orders_df["order_time"].dt.day
orders_df["hour"] = orders_df["order_time"].dt.hour
orders_df["weekday"] = orders_df["order_time"].dt.weekday  # 0=周一,6=周日
orders_df["is_weekend"] = orders_df["weekday"].apply(lambda x: 1 if x >=5 else 0)  # 是否周末
print("\n提取时间特征后的数据:")
print(orders_df[["order_id", "order_time", "hour", "weekday", "is_weekend"]])

# 3. 时间计算(计算订单间隔、预估送达时间)
# 按用户排序后计算订单间隔(假设同一用户订单按时间排序)
orders_df_sorted = orders_df.sort_values("order_time").reset_index(drop=True)
orders_df_sorted["next_order_time"] = orders_df_sorted["order_time"].shift(-1)  # 下一笔订单时间
orders_df_sorted["order_interval"] = (orders_df_sorted["next_order_time"] - orders_df_sorted["order_time"]).dt.total_seconds() / 3600  # 间隔(小时)
print("\n订单间隔计算结果:")
print(orders_df_sorted[["order_id", "order_time", "next_order_time", "order_interval"]])

# 预估送达时间(下单后2小时)
orders_df["estimated_delivery"] = orders_df["order_time"] + pd.Timedelta(hours=2)
print("\n预估送达时间:")
print(orders_df[["order_id", "order_time", "estimated_delivery"]])

# 4. 时间重采样(按日、周聚合销售额)
# 将order_time设为索引
df_resample = orders_df.set_index("order_time")
# 按日聚合销售额
daily_sales = df_resample["order_amount"].resample("D").sum()  # "D"=日,"W"=周,"M"=月
# 按周聚合销售额
weekly_sales = df_resample["order_amount"].resample("W").sum()
print("\n日销售额:")
print(daily_sales)
print("\n周销售额:")
print(weekly_sales)

6.4 常见陷阱与最佳实践

  • 常见陷阱:① 日期格式不统一导致解析失败,未设置errors="coerce";② 重采样时未将datetime列设为索引,导致聚合错误;③ 忽略时区问题,不同时区数据直接合并分析;④ 时间间隔计算时未处理NaN值(如最后一笔订单无下一笔订单)。
  • 最佳实践:① 日期解析时明确格式,设置errors="coerce"处理无效日期;② 重采样前务必将datetime列设为索引,确保聚合准确性;③ 跨时区数据统一转换为同一时区后再分析;④ 时间间隔计算后用fillna()处理NaN值;⑤ 提取时间特征时结合业务需求(如电商关注周末、高峰时段),避免冗余。

第七章 条件筛选与布尔索引(复杂业务规则实现)

7.1 业务背景

实际工作中需根据复杂业务规则筛选数据(如筛选高价值用户、有效订单、特定时间段的交易),布尔索引是pandas中高效实现筛选的核心方式。

7.2 核心方法与参数详解

  • 单条件筛选:df[df["col"] == value]df[df["col"] > value]
  • 多条件筛选:逻辑与(&)、逻辑或(|)、逻辑非(~),每个条件需用括号包裹。
  • 范围筛选:df[df["col"].between(left, right)](闭区间)、df[df["col"].isin(list)](属于列表)。
  • 显式索引筛选:df.loc[条件, 列名](推荐,避免链式赋值警告)。

7.3 带注释代码示例


import pandas as pd
import numpy as np

# 模拟用户订单数据
data = {
    "user_id": ["U001", "U002", "U003", "U004", "U005", "U006", "U007", "U008"],
    "order_id": ["O001", "O002", "O003", "O004", "O005", "O006", "O007", "O008"],
    "order_amount": [199.0, 2990.0, 399.0, 4990.0, 599.0, 699.0, 7990.0, 899.0],
    "pay_status": ["已支付", "已支付", "未支付", "已支付", "已支付", "未支付", "已支付", "已支付"],
    "order_time": pd.date_range("2026-01-01", periods=8, freq="D"),
    "user_level": ["普通", "VIP", "普通", "VIP", "普通", "VIP", "钻石", "普通"]
}
orders_df = pd.DataFrame(data)

# 1. 单条件筛选
# 筛选已支付订单
paid_orders = orders_df[orders_df["pay_status"] == "已支付"]
print("已支付订单:")
print(paid_orders[["order_id", "pay_status", "order_amount"]])

# 筛选金额大于1000元的订单
high_amount_orders = orders_df[orders_df["order_amount"] > 1000]
print("\n金额>1000元的订单:")
print(high_amount_orders[["order_id", "order_amount"]])

# 2. 多条件筛选(逻辑与/或)
# 筛选已支付且金额>1000元的VIP用户订单(逻辑与)
vip_high_paid = orders_df[(orders_df["pay_status"] == "已支付") & 
                          (orders_df["order_amount"] > 1000) & 
                          (orders_df["user_level"] == "VIP")]
print("\n已支付、金额>1000元的VIP订单:")
print(vip_high_paid[["user_id", "order_id", "user_level", "order_amount"]])

# 筛选钻石用户订单或金额>5000元的订单(逻辑或)
diamond_or_high = orders_df[(orders_df["user_level"] == "钻石") | 
                            (orders_df["order_amount"] > 5000)]
print("\n钻石用户或金额>5000元的订单:")
print(diamond_or_high[["user_id", "user_level", "order_amount"]])

# 3. 范围与列表筛选
# 筛选金额在500-5000元之间的订单
amount_range = orders_df[orders_df["order_amount"].between(500, 5000)]
print("\n金额500-5000元的订单:")
print(amount_range[["order_id", "order_amount"]])

# 筛选VIP和钻石等级用户的订单
high_level = orders_df[orders_df["user_level"].isin(["VIP", "钻石"])]
print("\nVIP和钻石用户订单:")
print(high_level[["user_id", "user_level"]])

# 4. 显式索引筛选(loc),避免链式赋值警告
# 筛选2026年1月3日后的已支付订单,并标记为"重点订单"
orders_df.loc[(orders_df["order_time"] >= "2026-01-03") & 
              (orders_df["pay_status"] == "已支付"), 
              "order_tag"] = "重点订单"
# 其他订单标记为"普通订单"
orders_df["order_tag"] = orders_df["order_tag"].fillna("普通订单")
print("\n标记订单类型后的数据:")
print(orders_df[["order_id", "order_time", "pay_status", "order_tag"]])

7.4 常见陷阱与最佳实践

  • 常见陷阱:① 多条件筛选未用括号包裹,导致逻辑错误;② 用链式赋值修改数据(如df[df["col"]>0]["col2"] = 1),出现警告且修改可能失效;③ 字符串条件筛选时忽略大小写(如"VIP"和"vip"视为不同值);④ 日期条件筛选时,datetime列与字符串直接比较(虽可运行,但易出错)。
  • 最佳实践:① 多条件筛选时,每个条件单独用括号包裹,明确逻辑关系;② 优先用loc显式赋值和筛选,避免链式赋值;③ 字符串筛选前统一大小写,或用str.contains(flags=re.IGNORECASE)忽略大小写;④ 日期条件筛选时,将字符串转为datetime类型后再比较;⑤ 复杂筛选逻辑用变量存储条件,提升代码可读性(如condition = (df["a"]>0) & (df["b"]=="x"),再用df[condition])。

第八章 分组聚合分析(业务指标计算核心)

8.1 业务背景

分组聚合是计算业务指标的核心手段(如按用户分组计算消费总额、按品类分组计算销量、按日期分组计算日销售额),可结合自定义函数实现复杂指标(留存率、复购率、客单价)计算。

8.2 核心方法与参数详解

  • df.groupby():分组,核心参数 by(分组列,可多个)、as_index(是否将分组列设为索引,默认True)。
  • 聚合函数:sum()mean()count()max()min(),可通过agg()指定多列不同聚合方式。
  • 自定义聚合:apply()(对每组应用自定义函数)、transform()(返回与原数据长度一致的结果,用于分组填充)。
  • 常见业务指标:客单价(消费总额/订单数)、复购率(复购用户数/总用户数)、留存率(次日留存=次日活跃用户数/当日新增用户数)。

8.3 带注释代码示例


import pandas as pd
import numpy as np

# 模拟电商订单数据(含用户、商品、金额、时间)
data = {
    "order_id": ["O001", "O002", "O003", "O004", "O005", "O006", "O007", "O008", "O009", "O010"],
    "user_id": ["U001", "U002", "U001", "U003", "U002", "U004", "U003", "U001", "U005", "U004"],
    "product_category": ["手机", "电脑", "手机", "家电", "电脑", "手机", "家电", "电脑", "手机", "家电"],
    "order_amount": [1999.0, 5999.0, 2999.0, 3999.0, 4999.0, 1599.0, 2599.0, 3599.0, 1799.0, 4599.0],
    "order_time": pd.date_range("2026-01-01", periods=10, freq="D"),
    "pay_status": ["已支付"]*10  # 简化为全已支付
}
orders_df = pd.DataFrame(data)

# 1. 基础分组聚合
# 按用户分组:计算消费总额、订单数、平均客单价
user_agg = orders_df.groupby("user_id", as_index=False).agg({
    "order_amount": ["sum", "count", "mean"],  # 金额求和、计数、均值
    "order_time": "min"  # 首次下单时间
}).round(2)
# 重命名列
user_agg.columns = ["user_id", "total_consume", "order_count", "avg_amount", "first_order_time"]
print("用户分组聚合结果:")
print(user_agg)

# 按商品品类分组:计算销售额、订单数、最高单笔金额
category_agg = orders_df.groupby("product_category", as_index=False).agg({
    "order_amount": ["sum", "count", "max"]
}).round(2)
category_agg.columns = ["category", "sales_volume", "order_count", "max_single_amount"]
print("\n品类分组聚合结果:")
print(category_agg)

# 2. 自定义函数聚合(计算复购率、客单价分档)
# 场景1:计算用户复购率(订单数>=2视为复购)
user_stats = orders_df.groupby("user_id", as_index=False).agg({
    "order_id": "count",  # 订单数
    "order_amount": "sum"  # 消费总额
}).rename(columns={"order_id": "order_count", "order_amount": "total_consume"})
# 标记复购用户
user_stats["is_repurchase"] = user_stats["order_count"].apply(lambda x: 1 if x >=2 else 0)
# 计算整体复购率
repurchase_rate = (user_stats["is_repurchase"].sum() / len(user_stats)) * 100
print(f"\n整体用户复购率:{repurchase_rate:.2f}%")

# 场景2:按用户订单数分档(自定义函数)
def order_level(count):
    if count == 1:
        return "一次性用户"
    elif count == 2:
        return "复购用户"
    else:
        return "高频用户"

user_stats["user_level"] = user_stats["order_count"].apply(order_level)
# 按用户等级分组统计
level_agg = user_stats.groupby("user_level", as_index=False).agg({
    "user_id": "count",
    "total_consume": "sum"
}).rename(columns={"user_id": "user_count"})
print("\n用户等级分组统计:")
print(level_agg)

# 3. 多列分组聚合(按用户+品类)
user_category_agg = orders_df.groupby(["user_id", "product_category"], as_index=False).agg({
    "order_amount": "sum",
    "order_time": "count"
}).rename(columns={"order_amount": "category_consume", "order_time": "category_order_count"})
print("\n用户-品类多列分组聚合:")
print(user_category_agg)

# 4. transform方法(分组内标准化金额)
# 按品类分组,计算每组金额的均值和标准差,标准化金额((金额-均值)/标准差)
def normalize_amount(group):
    mean = group["order_amount"].mean()
    std = group["order_amount"].std()
    group["normalized_amount"] = (group["order_amount"] - mean) / std
    return group

orders_df_normalized = orders_df.groupby("product_category").apply(normalize_amount).round(2)
print("\n分组标准化后的数据:")
print(orders_df_normalized[["user_id", "product_category", "order_amount", "normalized_amount"]])

8.4 常见陷阱与最佳实践

  • 常见陷阱:① 分组后索引混乱,未设置as_index=False导致后续合并困难;② 多列聚合后列名层级复杂,未及时重命名;③ 自定义函数效率低,处理大数据集时耗时久;④ 混淆apply()transform()用法(transform()需返回与原数据长度一致的结果)。
  • 最佳实践:① 分组时优先设置as_index=False,保持数据结构清晰;② 多列聚合后立即重命名列,避免层级列名;③ 简单聚合用内置函数(sum()mean()),复杂逻辑才用自定义函数,且尽量优化函数效率;④ 需保留原数据行数时用transform(),需聚合为分组结果时用apply();⑤ 大数据分组聚合可使用pd.Grouper()按时间频率分组(如pd.Grouper(key="order_time", freq="W"))。

第九章 多表合并(多源数据整合)

9.1 业务背景

实际分析中数据常分散在多张表中(如订单表、用户表、商品表),需通过主键关联合并,形成完整的分析宽表(如订单表关联用户表获取用户属性,关联商品表获取品类信息)。

9.2 核心方法与参数详解

  • pd.merge():基于主键合并,核心参数 left/right(左右表)、on(关联主键,需两表列名一致)、left_on/right_on(左右表不同名主键)、how(合并方式:"inner"内连接、"left"左连接、"right"右连接、"outer"外连接)、suffixes(同名列后缀)。

其他合并方式:df.concat()(纵向/横向拼接,适用于同结构数据)、df.join()(基于索引合并,简化主键关联)。

合并优化:validate(验证合并类型,如"one_to_one"避免一对多重复)、indicator(标记数据来源)。

9.3 带注释代码示例



import pandas as pd
import numpy as np

# 模拟多表数据(订单表、用户表、商品表)
# 1. 订单表(核心业务表)
orders_data = {
    "order_id": ["O001", "O002", "O003", "O004", "O005", "O006"],
    "user_id": ["U001", "U002", "U001", "U003", "U004", "U005"],
    "product_id": ["P001", "P002", "P003", "P002", "P001", "P004"],
    "order_amount": [1999.0, 5999.0, 299.0, 4999.0, 1599.0, 899.0],
    "order_time": pd.date_range("2026-01-01", periods=6, freq="D"),
    "pay_status": ["已支付"]*6
}
orders_df = pd.DataFrame(orders_data)

# 2. 用户表(用户属性)
user_data = {
    "user_id": ["U001", "U002", "U003", "U004", "U006"],  # U005无用户信息,U006无订单
    "user_name": ["张三", "李四", "王五", "赵六", "孙七"],
    "user_level": ["VIP", "普通", "VIP", "钻石", "普通"],
    "register_time": pd.date_range("2025-12-01", periods=5, freq="3D")
}
user_df = pd.DataFrame(user_data)

# 3. 商品表(商品属性)
product_data = {
    "product_id": ["P001", "P002", "P003", "P004"],
    "product_name": ["iPhone 15", "华为Mate 60", "小米手环", "AirPods Pro"],
    "product_category": ["手机", "手机", "穿戴设备", "音频设备"],
    "price": [5999.0, 6999.0, 299.0, 1799.0]
}
product_df = pd.DataFrame(product_data)

# 一、基础合并:订单表关联用户表(按user_id)
# 左连接:保留所有订单,无对应用户信息的填充NaN
order_user_df = pd.merge(
    left=orders_df,
    right=user_df,
    on="user_id",
    how="left",
    suffixes=("_order", "_user"),  # 同名列后缀(此处无同名列,仅演示)
    indicator=True  # 标记数据来源
)
print("订单表-用户表左连接结果:")
print(order_user_df[["order_id", "user_id", "user_name", "user_level", "_merge"]])

# 二、多表合并:订单表+用户表+商品表
# 先关联订单与用户,再关联商品表
order_user_product_df = pd.merge(
    left=order_user_df,
    right=product_df,
    on="product_id",
    how="left"
)
# 筛选核心字段形成宽表
analysis_df = order_user_product_df[[
    "order_id", "user_id", "user_name", "user_level",
    "product_id", "product_name", "product_category",
    "order_amount", "price", "order_time"
]]
print("\n三表合并后的分析宽表:")
print(analysis_df)

# 三、不同主键名合并(模拟场景:用户表主键为uid,订单表为user_id)
user_df_rename = user_df.rename(columns={"user_id": "uid"})  # 重命名主键模拟差异
order_user_rename_df = pd.merge(
    left=orders_df,
    right=user_df_rename,
    left_on="user_id",
    right_on="uid",
    how="inner"  # 内连接:仅保留双方都有数据的记录
)
print("\n不同主键名内连接结果:")
print(order_user_rename_df[["order_id", "user_id", "uid", "user_name"]].drop(columns=["uid"]))

# 四、concat拼接(纵向拼接同结构数据)
# 模拟新增订单数据
new_orders_data = {
    "order_id": ["O007", "O008"],
    "user_id": ["U002", "U003"],
    "product_id": ["P001", "P003"],
    "order_amount": [5999.0, 299.0],
    "order_time": pd.date_range("2026-01-07", periods=2, freq="D"),
    "pay_status": ["已支付"]*2
}
new_orders_df = pd.DataFrame(new_orders_data)
# 纵向拼接订单表
all_orders_df = pd.concat(
    [orders_df, new_orders_df],
    ignore_index=True  # 重置索引
)
print("\n纵向拼接后的完整订单表:")
print(all_orders_df[["order_id", "user_id", "order_time"]])

# 五、join合并(基于索引,适用于索引为主键的场景)
# 将用户表索引设为user_id
user_df_index = user_df.set_index("user_id")
# 订单表与用户表基于索引合并
order_user_join_df = orders_df.join(
    user_df_index[["user_name", "user_level"]],  # 仅保留需要的列
    on="user_id",  # 订单表的关联列
    how="left"
)
print("\n基于索引join合并结果:")
print(order_user_join_df[["order_id", "user_id", "user_name", "user_level"]])

9.4 常见陷阱与最佳实践

  • 常见陷阱:① 多表合并时未处理主键重复(如用户表存在重复user_id),导致数据行数暴增;② 盲目使用外连接,引入大量NaN值增加清洗成本;③ 同名列未指定后缀,合并后列名冲突;④ 合并大数据表时未优化,导致内存溢出或效率低下。
  • 最佳实践:① 合并前先检查主键唯一性(df["key"].duplicated().sum()),清理重复主键;② 优先使用内连接或左连接,根据业务需求选择合并方式(如订单分析保留所有订单用左连接);③ 明确指定suffixes处理同名列,合并后及时重命名;④ 大数据合并前筛选必要列(usecols),减少数据量;⑤ 用validate参数验证合并关系(如validate="one_to_many"),避免异常数据。

第十章 数据透视表(快速生成业务报表)

10.1 业务背景

数据透视表是分析师快速生成业务报表的核心工具,可灵活按行、列维度聚合数据(如按用户等级和商品品类统计销售额、按日期和支付状态统计订单数),无需复杂分组代码,适配快速复盘与汇报场景。

10.2 核心方法与参数详解

  • pd.pivot_table():核心参数 data(数据源)、index(行维度)、columns(列维度)、values(聚合字段)、aggfunc(聚合函数,可多个)、fill_value(填充NaN值)、margins(显示总计行/列)。
  • 常见场景:按双维度统计销售额、用户等级分布透视、跨时间维度订单复盘、缺失值自动填充的汇总报表。

10.3 带注释代码示例



import pandas as pd
import numpy as np

# 基于第九章合并后的分析宽表扩展数据
analysis_df = pd.DataFrame({
    "order_id": ["O001", "O002", "O003", "O004", "O005", "O006", "O007", "O008"],
    "user_id": ["U001", "U002", "U001", "U003", "U004", "U005", "U002", "U003"],
    "user_level": ["VIP", "普通", "VIP", "VIP", "钻石", "普通", "普通", "VIP"],
    "product_category": ["手机", "手机", "穿戴设备", "手机", "手机", "音频设备", "手机", "穿戴设备"],
    "order_amount": [1999.0, 5999.0, 299.0, 4999.0, 1599.0, 899.0, 5999.0, 299.0],
    "order_time": pd.date_range("2026-01-01", periods=8, freq="D"),
    "pay_status": ["已支付"]*8
})
# 提取时间维度(日期、周)
analysis_df["order_date"] = analysis_df["order_time"].dt.date
analysis_df["week"] = analysis_df["order_time"].dt.isocalendar().week

# 1. 基础透视表:按用户等级(行)和商品品类(列)统计销售额
pivot_sales = pd.pivot_table(
    data=analysis_df,
    index="user_level",  # 行维度:用户等级
    columns="product_category",  # 列维度:商品品类
    values="order_amount",  # 聚合字段:订单金额
    aggfunc="sum",  # 聚合函数:求和
    fill_value=0,  # 缺失值填充为0
    margins=True,  # 显示总计
    margins_name="总计"  # 总计列/行名称
).round(2)
print("用户等级×商品品类销售额透视表:")
print(pivot_sales)

# 2. 多聚合函数透视表:统计销售额、订单数
pivot_multi_func = pd.pivot_table(
    data=analysis_df,
    index="user_level",
    columns="product_category",
    values=["order_amount", "order_id"],
    aggfunc={
        "order_amount": "sum",  # 金额求和
        "order_id": "count"  # 订单计数
    },
    fill_value=0
).round(2)
print("\n多聚合函数透视表(销售额+订单数):")
print(pivot_multi_func)

# 3. 时间维度透视表:按周(行)和用户等级(列)统计订单数
pivot_time = pd.pivot_table(
    data=analysis_df,
    index="week",
    columns="user_level",
    values="order_id",
    aggfunc="count",
    fill_value=0,
    margins=True,
    margins_name="周总计"
)
print("\n周×用户等级订单数透视表:")
print(pivot_time)

# 4. 透视表筛选与重置索引(适配报表输出)
# 筛选VIP用户的透视数据
pivot_vip = pivot_sales.loc["VIP", :].drop("总计")  # 排除总计行
print("\nVIP用户各品类销售额:")
print(pivot_vip)

# 重置索引,将列维度转为普通列(便于后续导出)
pivot_flat = pivot_sales.reset_index()
print("\n重置索引后的透视表(扁平化):")
print(pivot_flat)

# 5. 复杂透视表:按日期、用户等级双行维度
pivot_double_index = pd.pivot_table(
    data=analysis_df,
    index=["order_date", "user_level"],  # 双行维度
    columns="product_category",
    values="order_amount",
    aggfunc="sum",
    fill_value=0
).round(2)
print("\n日期×用户等级×品类销售额透视表:")
print(pivot_double_index)

10.4 常见陷阱与最佳实践

  • 常见陷阱:① 聚合字段与聚合函数不匹配(如对分类字段用sum());② 未设置fill_value,NaN值影响报表可读性;③ 多维度透视后索引层级复杂,不便导出;④ 盲目添加总计行,导致业务指标统计失真(如部分维度无需总计)。
  • 最佳实践:① 针对数值型字段用求和/均值,分类字段用计数,确保聚合逻辑合理;② 始终设置fill_value=0或自定义填充值,优化报表展示;③ 复杂透视后用reset_index()扁平化索引,便于Excel导出;④ 根据业务需求决定是否显示总计(margins),避免冗余;⑤ 透视表结果可结合style方法美化(如高亮最大值),适配汇报场景。

第十一章 数据导出与报表自动化(落地交付)

11.1 业务背景

分析完成后需将结果导出为标准化文件(Excel、CSV、JSON),交付给业务部门或用于自动化报表。需掌握不同格式的导出方法,适配多场景交付需求,同时实现简单报表自动化,提升工作效率。

11.2 核心方法与参数详解

  • df.to_csv():导出为CSV文件,核心参数 path_or_buf(文件路径)、index(是否导出索引,默认True)、encoding(编码,推荐"utf-8-sig"避免中文乱码)、na_rep(缺失值替换)。
  • df.to_excel():导出为Excel文件,需安装openpyxl库,核心参数 excel_writer(文件路径/写入器)、sheet_name(工作表名称)、startrow/startcol(起始行列)、engine(引擎,如"openpyxl")。
  • 报表自动化:结合datetime生成动态文件名,批量导出多工作表,适配定时任务。

11.3 带注释代码示例



import pandas as pd
import numpy as np
from datetime import datetime
import os

# 基于前文分析宽表,模拟多份待导出数据
# 1. 销售额汇总表
sales_summary = pd.DataFrame({
    "user_level": ["VIP", "普通", "钻石", "总计"],
    "手机销售额": [11997.0, 11998.0, 1599.0, 25594.0],
    "穿戴设备销售额": [598.0, 0.0, 0.0, 598.0],
    "音频设备销售额": [0.0, 899.0, 0.0, 899.0],
    "总销售额": [12595.0, 12897.0, 1599.0, 27091.0]
})

# 2. 用户复购统计表
repurchase_stats = pd.DataFrame({
    "user_level": ["VIP", "普通", "钻石"],
    "用户数": [3, 2, 1],
    "复购用户数": [2, 1, 0],
    "复购率(%)": [66.67, 50.00, 0.00]
})

# 3. 每日订单明细表(前文analysis_df筛选)
daily_detail = analysis_df[["order_date", "order_id", "user_name", "product_category", "order_amount"]].head(10)

# 一、导出CSV文件(适配数据备份、简单报表)
csv_path = "销售数据汇总.csv"
sales_summary.to_csv(
    path_or_buf=csv_path,
    index=False,  # 不导出索引
    encoding="utf-8-sig",  # 中文编码
    na_rep="无",  # 缺失值显示为"无"
    float_format="%.2f"  # 浮点数保留2位小数
)
print(f"CSV文件已导出至:{os.path.abspath(csv_path)}")

# 二、导出Excel文件(多工作表、格式优化)
# 动态生成文件名(含日期)
today = datetime.now().strftime("%Y%m%d")
excel_path = f"电商销售报表_{today}.xlsx"

# 多工作表导出
with pd.ExcelWriter(
    path=excel_path,
    engine="openpyxl",  # 支持.xlsx格式
    mode="w"  # 覆盖写入
) as writer:
    # 写入各工作表
    sales_summary.to_excel(writer, sheet_name="销售额汇总", index=False)
    repurchase_stats.to_excel(writer, sheet_name="用户复购统计", index=False)
    daily_detail.to_excel(writer, sheet_name="每日订单明细", index=False, startrow=1)  # 从第2行开始写入

print(f"Excel文件已导出至:{os.path.abspath(excel_path)}")

# 三、Excel格式优化(高亮表头、调整列宽,需openpyxl)
from openpyxl.styles import Font, PatternFill
from openpyxl import load_workbook

# 加载已导出的Excel文件
wb = load_workbook(excel_path)
ws = wb["销售额汇总"]  # 选中工作表

# 1. 表头样式优化(加粗、背景色)
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
for cell in ws[1]:  # 第1行为表头
    cell.font = header_font
    cell.fill = header_fill

# 2. 调整列宽
column_widths = [12, 15, 18, 18, 12]
for i, width in enumerate(column_widths, 1):
    ws.column_dimensions[chr(64+i)].width = width  # 64对应'A',依次类推

# 保存优化后的文件
wb.save(excel_path)
print("Excel格式优化完成")

# 四、报表自动化(批量生成每日报表)
def generate_daily_report(date_str):
    """
    生成指定日期的销售报表
    :param date_str: 日期字符串,格式"YYYY-MM-DD"
    """
    # 筛选指定日期数据
    date_data = analysis_df[analysis_df["order_date"] == pd.to_datetime(date_str).date()]
    if date_data.empty:
        print(f"{date_str}无订单数据,跳过报表生成")
        return
    
    # 生成报表文件名
    report_path = f"每日销售报表_{date_str.replace('-', '')}.xlsx"
    with pd.ExcelWriter(report_path, engine="openpyxl") as writer:
        date_data.to_excel(writer, sheet_name="当日订单明细", index=False)
        # 生成当日汇总
        daily_agg = date_data.groupby("product_category").agg({
            "order_id": "count",
            "order_amount": "sum"
        }).rename(columns={"order_id": "订单数", "order_amount": "销售额"}).reset_index()
        daily_agg.to_excel(writer, sheet_name="当日品类汇总", index=False)
    
    print(f"{date_str}报表已生成:{os.path.abspath(report_path)}")

# 批量生成多日报表
dates = ["2026-01-01", "2026-01-02", "2026-01-03"]
for date in dates:
    generate_daily_report(date)

11.4 常见陷阱与最佳实践

  • 常见陷阱:① 导出中文数据时编码错误(未用utf-8-sig),导致乱码;② 导出Excel时未指定引擎,兼容旧版本格式失败;③ 自动化报表未处理空数据场景,导致程序报错;④ 导出时保留索引,干扰报表可读性。
  • 最佳实践:① 中文数据导出CSV必设encoding="utf-8-sig",Excel用openpyxl引擎;② 导出前统一处理索引(index=False),仅在需要时保留;③ 自动化报表添加异常捕获(空数据、文件占用),确保程序稳定;④ 重要报表优化格式(表头样式、列宽),提升交付体验;⑤ 批量导出时用动态文件名(含日期),避免覆盖旧数据。

第十二章 综合实战案例(整合全流程技能)

12.1 实战背景

模拟电商平台月度销售分析实战,整合前文所有技能(数据读取、清洗、处理、分析、导出),完成从原始数据到最终报表的全流程落地,适配企业真实业务场景,助力求职面试场景演示。

12.2 实战需求

    1. 读取电商订单、用户、商品三张原始表,完成数据探查与清洗(缺失值、异常值、重复值);
    1. 实现多表合并,生成分析宽表,提取时间、用户、商品维度特征;
    1. 计算核心业务指标(销售额、客单价、复购率、品类占比);
    1. 生成多维度分析报表(时间趋势、用户分层、品类分析);
    1. 导出标准化Excel报表,优化格式并提交交付。

12.3 全流程代码实现(带业务注释)



import pandas as pd
import numpy as np
from datetime import datetime
from openpyxl.styles import Font, PatternFill
import openpyxl

# ---------------------- 第一步:数据读取与初步探查 ----------------------
# 读取三张原始表(模拟真实场景下的CSV/Excel文件)
orders_df = pd.read_csv(
    "原始数据/订单表.csv",
    encoding="utf-8-sig",
    parse_dates=["order_time"],
    na_values=["无", "未知"],
    dtype={"order_id": str, "user_id": str, "product_id": str}
)
user_df = pd.read_excel(
    "原始数据/用户表.xlsx",
    sheet_name="用户信息",
    dtype={"user_id": str},
    parse_dates=["register_time"]
)
product_df = pd.read_csv(
    "原始数据/商品表.csv",
    encoding="utf-8-sig",
    dtype={"product_id": str},
    na_values=["0"]  # 商品单价为0视为缺失
)

# 初步探查数据
print("=== 数据初步探查结果 ===")
print("订单表信息:")
print(orders_df.info())
print("\n用户表缺失值:")
print(user_df.isnull().sum())
print("\n商品表描述性统计:")
print(product_df.describe())

# ---------------------- 第二步:数据清洗 ----------------------
# 1. 订单表清洗(缺失值、异常值、重复值)
# 缺失值处理:user_id/product_id缺失的订单删除(无效订单)
orders_clean = orders_df.dropna(subset=["user_id", "product_id"], how="any")
# 异常值处理:订单金额>0且<10000(业务合理区间)
orders_clean = orders_clean[(orders_clean["order_amount"] > 0) & (orders_clean["order_amount"] <= 10000)]
# 重复值处理:按order_id去重
orders_clean = orders_clean.drop_duplicates(subset=["order_id"], keep="first")

# 2. 用户表清洗:填充缺失的用户等级为"普通"
user_clean = user_df.copy()
user_clean["user_level"] = user_clean["user_level"].fillna("普通")

# 3. 商品表清洗:填充缺失单价为品类均值
product_clean = product_df.copy()
product_clean["price"] = product_clean.groupby("product_category")["price"].transform(
    lambda x: x.fillna(x.mean())
)

print(f"\n=== 清洗后数据量 ===")
print(f"订单表:{len(orders_clean)} 条(原始:{len(orders_df)} 条)")
print(f"用户表:{len(user_clean)} 条(原始:{len(user_df)} 条)")
print(f"商品表:{len(product_clean)} 条(原始:{len(product_df)} 条)")

# ---------------------- 第三步:多表合并与特征提取 ----------------------
# 1. 多表合并:订单表→用户表→商品表
analysis_df = pd.merge(
    left=orders_clean,
    right=user_clean[["user_id", "user_name", "user_level", "register_time"]],
    on="user_id",
    how="left"
)
analysis_df = pd.merge(
    left=analysis_df,
    right=product_clean[["product_id", "product_name", "product_category", "price"]],
    on="product_id",
    how="left"
)

# 2. 特征提取(时间、用户、业务特征)
# 时间特征
analysis_df["order_date"] = analysis_df["order_time"].dt.date
analysis_df["month"] = analysis_df["order_time"].dt.month
analysis_df["weekday"] = analysis_df["order_time"].dt.weekday
analysis_df["is_weekend"] = analysis_df["weekday"].apply(lambda x: 1 if x >=5 else 0)
# 用户特征:用户生命周期(下单时已注册天数)
analysis_df["user_life_days"] = (analysis_df["order_time"] - analysis_df["register_time"]).dt.days
# 业务特征:是否溢价下单(订单金额>商品单价)
analysis_df["is_premium"] = analysis_df.apply(lambda x: 1 if x["order_amount"] > x["price"] else 0, axis=1)

print("\n=== 分析宽表结构 ===")
print(analysis_df[["order_id", "user_id", "product_category", "order_amount", "order_date", "user_level"]].head())

# ---------------------- 第四步:核心业务指标计算 ----------------------
print("\n=== 核心业务指标计算结果 ===")
# 1. 整体指标
total_orders = len(analysis_df)  # 总订单数
total_sales = analysis_df["order_amount"].sum()  # 总销售额
total_users = analysis_df["user_id"].nunique()  # 总用户数
avg_amount = total_sales / total_orders  # 客单价
print(f"总订单数:{total_orders} 笔")
print(f"总销售额:{total_sales:.2f} 元")
print(f"下单用户数:{total_users} 人")
print(f"客单价:{avg_amount:.2f} 元")

# 2. 复购率(订单数≥2的用户/总下单用户)
user_order_count = analysis_df.groupby("user_id")["order_id"].count().reset_index(name="order_count")
repurchase_users = len(user_order_count[user_order_count["order_count"] >=2])
repurchase_rate = (repurchase_users / total_users) * 100
print(f"用户复购率:{repurchase_rate:.2f}%")

# 3. 品类指标
category_stats = analysis_df.groupby("product_category").agg({
    "order_id": "count",  # 品类订单数
    "order_amount": "sum"  # 品类销售额
}).rename(columns={"order_id": "order_count", "order_amount": "sales_volume"})
category_stats["sales_ratio"] = (category_stats["sales_volume"] / total_sales) * 100  # 品类占比
print("\n各品类销售统计:")
print(category_stats.round(2))

# 4. 用户分层指标(按用户等级)
user_level_stats = analysis_df.groupby("user_level").agg({
    "user_id": "nunique",
    "order_amount": "sum",
    "order_id": "count"
}).rename(columns={"user_id": "user_count", "order_id": "order_count"})
user_level_stats["avg_user_sales"] = user_level_stats["order_amount"] / user_level_stats["user_count"]  # 人均消费
print("\n各用户等级统计:")
print(user_level_stats.round(2))

# ---------------------- 第五步:多维度报表生成 ----------------------
# 1. 时间趋势报表(日销售额)
daily_sales = analysis_df.groupby("order_date").agg({
    "order_id": "count",
    "order_amount": "sum"
}).rename(columns={"order_id": "daily_orders", "order_amount": "daily_sales"}).reset_index()

# 2. 透视表报表(用户等级×品类销售额)
level_category_pivot = pd.pivot_table(
    data=analysis_df,
    index="user_level",
    columns="product_category",
    values="order_amount",
    aggfunc="sum",
    fill_value=0,
    margins=True,
    margins_name="总计"
).round(2)

# ---------------------- 第六步:报表导出与格式优化 ----------------------
# 生成动态文件名
today = datetime.now().strftime("%Y%m%d")
report_path = f"电商月度销售分析报表_{today}.xlsx"

# 多工作表导出
with pd.ExcelWriter(report_path, engine="openpyxl") as writer:
    # 核心指标表
    metrics_df = pd.DataFrame({
        "指标名称": ["总订单数", "总销售额(元)", "下单用户数", "客单价(元)", "用户复购率(%)"],
        "指标值": [total_orders, round(total_sales,2), total_users, round(avg_amount,2), round(repurchase_rate,2)]
    })
    metrics_df.to_excel(writer, sheet_name="核心指标", index=False)
    
    # 其他工作表
    daily_sales.to_excel(writer, sheet_name="日销售趋势", index=False)
    category_stats.reset_index().to_excel(writer, sheet_name="品类分析", index=False)
    user_level_stats.reset_index().to_excel(writer, sheet_name="用户分层分析", index=False)
    level_category_pivot.reset_index().to_excel(writer, sheet_name="用户-品类透视", index=False)
    analysis_df.to_excel(writer, sheet_name="分析宽表", index=False)

# 格式优化
wb = load_workbook(report_path)
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="2E86AB", end_color="2E86AB", fill_type="solid")

for sheet_name in wb.sheetnames:
    ws = wb[sheet_name]
    # 表头样式
    for cell in ws[1]:
        cell.font = header_font
        cell.fill = header_fill
    # 调整列宽
    for column in ws.columns:
        max_length = 0
        column_letter = column[0].column_letter
        for cell in column:
            try:
                if len(str(cell.value)) > max_length:
                    max_length = len(str(cell.value))
            except:
                pass
        adjusted_width = min(max_length + 2, 20)  # 最大宽度限制为20
        ws.column_dimensions[column_letter].width = adjusted_width

wb.save(report_path)
print(f"\n=== 报表交付完成 ===")
print(f"最终报表路径:{report_path}")
print("包含工作表:核心指标、日销售趋势、品类分析、用户分层分析、用户-品类透视、分析宽表")

12.4 实战总结与面试要点

本实战案例覆盖了企业数据分析师日常工作的全流程,核心亮点在于“业务导向”——所有数据处理动作均围绕业务指标展开,而非单纯的技术堆砌。求职面试中可重点突出以下要点:

    1. 数据清洗的业务合理性:如无效订单删除、缺失值填充结合品类特征,而非机械处理;
    1. 指标设计的逻辑性:从整体(销售额、客单价)到分层(品类、用户等级),覆盖业务核心诉求;
    1. 代码的工程化思维:包含异常处理、格式优化、动态命名,适配企业交付标准;
    1. 全流程整合能力:串联数据读取、清洗、分析、导出,展示端到端解决问题的能力。

可基于本案例扩展不同业务场景(如用户留存分析、商品库存分析),灵活适配面试中的场景题考察。

附录:求职高频考点与代码速查

A.1 高频面试考点总结

  • 基础操作:数据读取参数(dtype、parse_dates、na_values)、数据类型优化(category类型);
  • 数据清洗:缺失值/异常值处理的业务逻辑、重复值去重的主键选择;
  • 核心分析:groupby与agg的多函数用法、pivot_table与业务报表的结合、多表合并的连接方式选择;
  • 性能优化:大数据集的内存优化(category、usecols)、自定义函数的效率提升;
  • 业务理解:指标计算(复购率、客单价)、分析宽表的构建逻辑。

A.2 核心代码速查(面试应急)



# 1. 数据读取与探查
import pandas as pd
df = pd.read_csv("data.csv", dtype={"id":str}, parse_dates=["time"], na_values=["无"])
df.info()  # 数据信息
df.isnull().sum()  # 缺失值统计
df.describe()  # 数值统计

# 2. 缺失值处理
df.dropna(subset=["key"], how="any")  # 删除关键列缺失行
df["col"].fillna(df.groupby("cat")["col"].transform("mean"))  # 分组填充均值

# 3. 重复值与异常值
df.drop_duplicates(subset=["key"], keep="first")  # 去重
Q1 = df["col"].quantile(0.25)
Q3 = df["col"].quantile(0.75)
df = df[(df["col"] >= Q1-1.5*(Q3-Q1)) & (df["col"] <= Q3+1.5*(Q3-Q1))]  # IQR异常值过滤

# 4. 分组聚合
df.groupby("cat").agg({"col1":"sum", "col2":"count"}).reset_index()  # 多列聚合
df.groupby("cat")["col"].apply(lambda x: x.mean()).reset_index()  # 自定义聚合

# 5. 多表合并
pd.merge(df1, df2, on="key", how="left", suffixes=("_1", "_2"))  # 左连接

# 6. 透视表
pd.pivot_table(df, index="row", columns="col", values="val", aggfunc="sum", fill_value=0)

# 7. 数据导出
df.to_excel("report.xlsx", sheet_name="数据", index=False, engine="openpyxl")
df.to_csv("data_clean.csv", encoding="utf-8-sig", index=False)

本手册聚焦求职导向,所有内容均经过企业实战场景验证,可作为数据分析师面试前的核心复习资料。建议结合实际数据集反复演练,强化代码熟练度与业务理解能力,提升求职竞争力。

(注:文档部分内容可能由 AI 生成)

添加新评论