Code - 济南车企10万行设备日志清洗.py

import pandas as pd
import numpy as np
import time

def clean_device_log(input_path: str, output_path: str):

"""
清洗设备日志数据
"""
start_time = time.time()
print(f"开始清洗设备日志:{input_path}")

# 1. 读取数据(修正 parse_dates)
df = pd.read_csv(
    input_path,
    parse_dates=["timestamp"],  # ✅ 假设时间列为 "timestamp"
    dtype={"device_id": str, "status": str},
)

# 1.2 过滤无效行(设备ID非JN-开头 或 时间为空)
df = df[
    df["device_id"].notna()
    & df["device_id"].str.startswith("DEV", na=False)
    & df["timestamp"].notna()
].copy()

# 重置索引
df.reset_index(drop=True, inplace=True)

# 2. 异常值修复(先于缺失值填充!)
# 温度:20-100℃
TEMP_MIN, TEMP_MAX = 20, 100
temp_mask = df["temperature"].between(TEMP_MIN, TEMP_MAX)
device_temp_mean = df[temp_mask].groupby("device_id")["temperature"].mean()
global_temp_mean = df[temp_mask]["temperature"].mean()
df.loc[~temp_mask, "temperature"] = (
    df.loc[~temp_mask, "device_id"].map(device_temp_mean).fillna(global_temp_mean)
)

# 转速:0-6000 rpm
RPM_MIN, RPM_MAX = 0, 6000
rpm_mask = df["rpm"].between(RPM_MIN, RPM_MAX)
device_rpm_mean = df[rpm_mask].groupby("device_id")["rpm"].mean()
global_rpm_mean = df[rpm_mask]["rpm"].mean()  # ✅ 修正:原来是 "temperature"
df.loc[~rpm_mask, "rpm"] = (
    df.loc[~rpm_mask, "device_id"].map(device_rpm_mean).fillna(global_rpm_mean)
)

# # 电压:假设 200-240V(按需调整)
# VOLTAGE_MIN, VOLTAGE_MAX = 200, 240
# voltage_mask = df["voltage"].between(VOLTAGE_MIN, VOLTAGE_MAX)
# device_voltage_mean = df[voltage_mask].groupby("device_id")["voltage"].mean()
# global_voltage_mean = df[voltage_mask]["voltage"].mean()
# df.loc[~voltage_mask, "voltage"] = (
#     df.loc[~voltage_mask, "device_id"].map(device_voltage_mean).fillna(global_voltage_mean)
# )

# 3. 缺失值填充(此时异常值已修复,均值更准确)
numeric_cols = ["temperature", "rpm"]
for col in numeric_cols:
    group_means = df.groupby("device_id")[col].mean()
    df[col] = df[col].fillna(df["device_id"].map(group_means))
    # 如果还有 NaN(全异常设备),用全局均值兜底
    global_mean = df[col].mean()
    df[col] = df[col].fillna(global_mean)

# 4. 新增故障标记列
df["temp_exceed"] = df["temperature"] > 80
df["rpm_exceed"] = df["rpm"] > 4500
df["fault_flag"] = df["temp_exceed"] | df["rpm_exceed"]

# 5. 保存结果
df.to_csv(output_path, index=False, encoding="utf-8")

end_time = time.time()
print(f"清洗完成,耗时:{end_time - start_time:.2f} 秒")
print(f"清洗后数据行数:{df.shape[0]},列数:{df.shape[1]}")
print(f"故障数据行数:{df['fault_flag'].sum()}")
print(f"清洗结果已保存到:{output_path}")

if name == "__main__":

input_large_csv = "jinan_car_device_log_10w.csv"
output_clean_csv = "jinan_car_device_log_clean1.csv"
clean_device_log(input_large_csv, output_clean_csv)
添加新评论