import pandas as pd
import numpy as np
import time
def clean_device_log(input_path: str, output_path: str):
"""
清洗设备日志数据
"""
start_time = time.time()
print(f"开始清洗设备日志:{input_path}")
# 1. 读取数据(修正 parse_dates)
df = pd.read_csv(
input_path,
parse_dates=["timestamp"], # ✅ 假设时间列为 "timestamp"
dtype={"device_id": str, "status": str},
)
# 1.2 过滤无效行(设备ID非JN-开头 或 时间为空)
df = df[
df["device_id"].notna()
& df["device_id"].str.startswith("DEV", na=False)
& df["timestamp"].notna()
].copy()
# 重置索引
df.reset_index(drop=True, inplace=True)
# 2. 异常值修复(先于缺失值填充!)
# 温度:20-100℃
TEMP_MIN, TEMP_MAX = 20, 100
temp_mask = df["temperature"].between(TEMP_MIN, TEMP_MAX)
device_temp_mean = df[temp_mask].groupby("device_id")["temperature"].mean()
global_temp_mean = df[temp_mask]["temperature"].mean()
df.loc[~temp_mask, "temperature"] = (
df.loc[~temp_mask, "device_id"].map(device_temp_mean).fillna(global_temp_mean)
)
# 转速:0-6000 rpm
RPM_MIN, RPM_MAX = 0, 6000
rpm_mask = df["rpm"].between(RPM_MIN, RPM_MAX)
device_rpm_mean = df[rpm_mask].groupby("device_id")["rpm"].mean()
global_rpm_mean = df[rpm_mask]["rpm"].mean() # ✅ 修正:原来是 "temperature"
df.loc[~rpm_mask, "rpm"] = (
df.loc[~rpm_mask, "device_id"].map(device_rpm_mean).fillna(global_rpm_mean)
)
# # 电压:假设 200-240V(按需调整)
# VOLTAGE_MIN, VOLTAGE_MAX = 200, 240
# voltage_mask = df["voltage"].between(VOLTAGE_MIN, VOLTAGE_MAX)
# device_voltage_mean = df[voltage_mask].groupby("device_id")["voltage"].mean()
# global_voltage_mean = df[voltage_mask]["voltage"].mean()
# df.loc[~voltage_mask, "voltage"] = (
# df.loc[~voltage_mask, "device_id"].map(device_voltage_mean).fillna(global_voltage_mean)
# )
# 3. 缺失值填充(此时异常值已修复,均值更准确)
numeric_cols = ["temperature", "rpm"]
for col in numeric_cols:
group_means = df.groupby("device_id")[col].mean()
df[col] = df[col].fillna(df["device_id"].map(group_means))
# 如果还有 NaN(全异常设备),用全局均值兜底
global_mean = df[col].mean()
df[col] = df[col].fillna(global_mean)
# 4. 新增故障标记列
df["temp_exceed"] = df["temperature"] > 80
df["rpm_exceed"] = df["rpm"] > 4500
df["fault_flag"] = df["temp_exceed"] | df["rpm_exceed"]
# 5. 保存结果
df.to_csv(output_path, index=False, encoding="utf-8")
end_time = time.time()
print(f"清洗完成,耗时:{end_time - start_time:.2f} 秒")
print(f"清洗后数据行数:{df.shape[0]},列数:{df.shape[1]}")
print(f"故障数据行数:{df['fault_flag'].sum()}")
print(f"清洗结果已保存到:{output_path}")
if name == "__main__":
input_large_csv = "jinan_car_device_log_10w.csv"
output_clean_csv = "jinan_car_device_log_clean1.csv"
clean_device_log(input_large_csv, output_clean_csv)