拟电商平台月度销售分析实战,整合前文所有技能(数据读取、清洗、处理、分析、导出),完成从原始数据到最终报表的全流程落地,适配企业真实业务场景,助力求职面试场景演示。
12.2 实战需求
- 1. 读取电商订单、用户、商品三张原始表,完成数据探查与清洗(缺失值、异常值、重复值);
- 2. 实现多表合并,生成分析宽表,提取时间、用户、商品维度特征;
- 3. 计算核心业务指标(销售额、客单价、复购率、品类占比);
- 4. 生成多维度分析报表(时间趋势、用户分层、品类分析);
- 5. 导出标准化Excel报表,优化格式并提交交付。
import datetime
import pandas as pd
import numpy as np
from openpyxl.styles import Font, PatternFill
import openpyxl
orders_df = pd.read_csv(
"原始数据/订单表.csv",
encoding="utf-8-sig",
parse_dates=["order_time"], #
na_values=["无", "未知"],
dtype={"order_id": str, "user_id": str, "product_id": str},)
user_df = pd.read_excel(
"原始数据/用户表.xlsx",
sheet_name="用户信息",
dtype={"user_id": str},
parse_dates=["register_time"],)
product_df = pd.read_csv(
"原始数据/商品表.csv",
encoding="utf-8-sig",
dtype={"product_id": str},
na_values=["0"], # 商品单价为0视为缺失)
初步探查数据
print("=== 数据初步探查结果 ===")
print("订单表信息:")
print(orders_df.info())
print("n用户表缺失值:")
print(user_df.isnull().sum())
print("n商品表描述性统计:")
print(product_df.describe())
---------------------- 第二步:数据清洗 ----------------------
1. 订单表清洗(缺失值、异常值、重复值)
缺失值处理:user_id/product_id缺失的订单删除(无效订单)
orders_clean = orders_df.dropna(subset=["user_id", "product_id"], how=any)
异常值处理:订单金额>0且<10000(业务合理区间)
orders_clean = orders_clean[
(orders_clean["order_amount"] > 0) & (orders_clean["order_amount"] < 10000)]
重复值处理:按order_id去重
orders_clean = orders_clean.drop_duplicates(subset=["order_id"], keep="first")
2. 用户表清洗:填充缺失的用户等级为"普通"
user_clean = user_df.copy()
user_clean["user_level"] = user_clean["user_level"].fillna("普通")
3. 商品表清洗:填充缺失单价为品类均值
product_clean = product_df.copy()
product_clean["price"] = product_clean.groupby("product_category")["price"].transform(
lambda x: x.fillna(x.mean()))
print(f"n=== 清洗后数据量 ===")
print(f"订单表:{len(orders_clean)} 条(原始:{len(orders_df)} 条)")
print(f"用户表:{len(user_clean)} 条(原始:{len(user_df)} 条)")
print(f"商品表:{len(product_clean)} 条(原始:{len(product_df)} 条)")
---------------------- 第三步:多表合并与特征提取 ----------------------
1. 多表合并:订单表→用户表→商品表
analysis_df = pd.merge(
left=orders_clean,
right=user_clean[["user_id", "user_name", "user_level", "register_time"]],
on="user_id",
how="left",)
analysis_df = pd.merge(
left=analysis_df,
right=product_clean[["product_id", "product_name", "product_category", "price"]],
on="product_id",
how="left",)
2. 特征提取(时间、用户、业务特征)
时间特征
analysis_df["order_date"] = analysis_df["order_time"].dt.date
analysis_df["month"] = analysis_df["order_time"].dt.month
analysis_df["weekday"] = analysis_df["order_time"].dt.weekday
analysis_df["is_weekend"] = analysis_df["weekday"].apply(lambda x: 1 if x >= 5 else 0)
用户特征:用户生命周期(下单时已注册天数)
analysis_df["user_life_days"] = (
analysis_df["order_time"] - analysis_df["register_time"]).dt
业务特征:是否溢价下单(订单金额>商品单价)
analysis_df["is_premium"] = analysis_df.apply(
lambda x: 1 if x["order_amount"] > x["price"] else 0, axis=1)
print("n=== 分析宽表结构 ===")
print(
analysis_df[
[
"order_id",
"user_id",
"product_category",
"order_amount",
"order_date",
"user_level",
]
].head())
---------------------- 第四步:核心业务指标计算 ----------------------
1. 整体指标
total_orders = len(analysis_df) # 总订单数
total_sales = analysis_df["order_amount"].sum() # 总销售额
total_users = analysis_df["user_id"].nunique() # 总用户数
avg_amount = total_sales / total_orders # 客单价
print(f"总订单数:{total_orders} 笔")
print(f"总销售额:{total_sales:.2f} 元")
print(f"下单用户数:{total_users} 人")
print(f"客单价:{avg_amount:.2f} 元")
2. 复购率(订单数≥2的用户/总下单用户)
user_order_count = (
analysis_df.groupby("user_id")["order_id"].count().reset_index(name="order_count"))
repurchase_users = len(user_order_count[user_order_count["order_count"] >= 2])
repurchase_rate = (repurchase_users / total_users) * 100
print(f"用户复购率:{repurchase_rate:.2f}%")
3. 品类指标
category_stats = (
analysis_df.groupby("product_category")
.agg({"order_id": "count", "order_amount": "sum"}) # 品类订单数 # 品类销售额
.rename(columns={"order_id": "order_count", "order_amount": "sales_volume"}))
category_stats["sales_ratio"] = (
category_stats["sales_volume"] / total_sales) * 100 # 品类占比
print("n各品类销售统计:")
print(category_stats.round(2))
4. 用户分层指标(按用户等级)
user_level_stats = (
analysis_df.groupby("user_level")
.agg({"user_id": "nunique", "order_amount": "sum", "order_id": "count"})
.rename(columns={"user_id": "user_count", "order_id": "order_count"}))
user_level_stats["avg_user_sales"] = (
user_level_stats["order_amount"] / user_level_stats["user_count"]) # 人均消费
print("n各用户等级统计:")
print(user_level_stats.round(2))
---------------------- 第五步:多维度报表生成 ----------------------
1. 时间趋势报表(日销售额)
daily_sales = (
analysis_df.groupby("order_date")
.agg({"order_id": "count", "order_amount": "sum"})
.rename(columns={"order_id": "daily_orders", "order_amount": "daily_sales"})
.reset_index())
2. 透视表报表(用户等级×品类销售额)
level_category_pivot = pd.pivot_table(
data=analysis_df,
index="user_level",
columns="product_category",
values="order_amount",
aggfunc="sum",
fill_value=0,
margins=True,
margins_name="总计",).round(2)
---------------------- 第六步:报表导出与格式优化 ----------------------
---------------------- 第六步:报表导出与格式优化 ----------------------
生成动态文件名
today = datetime.now().strftime("%Y%m%d")
report_path = f"电商月度销售分析报表_{today}.xlsx"
多工作表导出
with pd.ExcelWriter(report_path, engine="openpyxl") as writer:
# 核心指标表
metrics_df = pd.DataFrame(
{
"指标名称": [
"总订单数",
"总销售额(元)",
"下单用户数",
"客单价(元)",
"用户复购率(%)",
],
"指标值": [
total_orders,
round(total_sales, 2),
total_users,
round(avg_amount, 2),
round(repurchase_rate, 2),
],
}
)
metrics_df.to_excel(writer, sheet_name="核心指标", index=False)
# 其他工作表
daily_sales.to_excel(writer, sheet_name="日销售趋势", index=False)
category_stats.reset_index().to_excel(writer, sheet_name="品类分析", index=False)
user_level_stats.reset_index().to_excel(
writer, sheet_name="用户分层分析", index=False
)
level_category_pivot.reset_index().to_excel(
writer, sheet_name="用户-品类透视", index=False
)
analysis_df.to_excel(writer, sheet_name="分析宽表", index=False)
格式优化
wb = openpyxl.load_workbook(report_path)
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="2E86AB", end_color="2E86AB", fill_type="solid")
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
# 表头样式
for cell in ws[1]:
cell.font = header_font
cell.fill = header_fill
# 调整列宽
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 20) # 最大宽度限制为20
ws.column_dimensions[column_letter].width = adjusted_width
wb.save(report_path)
print(f"n=== 报表交付完成 ===")
print(f"最终报表路径:{report_path}")
print(
"包含工作表:核心指标、日销售趋势、品类分析、用户分层分析、用户-品类透视、分析宽表")