import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
# 设置随机种子,确保可复现
np.random.seed(42)
random.seed(42)
# 参数配置
N_ROWS = 100_000
N_DEVICES = 50
START_DATE = datetime(2025, 1, 1)
END_DATE = datetime(2026, 1, 1)
# 1. 生成 device_id
device_ids = [f"DEV{str(i).zfill(3)}" for i in range(1, N_DEVICES + 1)]
df = pd.DataFrame({"device_id": np.random.choice(device_ids, size=N_ROWS)})
# 2. 生成 timestamp(均匀分布)
time_delta = (END_DATE - START_DATE).total_seconds()
timestamps = [
START_DATE + timedelta(seconds=random.uniform(0, time_delta)) for _ in range(N_ROWS)
]
df["timestamp"] = pd.to_datetime(timestamps)
# 3. 生成 temperature(正常值 + 异常 + 缺失)
temp_normal = np.random.normal(loc=60, scale=15, size=N_ROWS) # 均值60,标准差15
# 注入异常值(0.8%)
n_abnormal_temp = int(0.008 * N_ROWS)
abnormal_indices_temp = np.random.choice(N_ROWS, size=n_abnormal_temp, replace=False)
temp_normal[abnormal_indices_temp] = np.random.choice(
np.concatenate([np.random.uniform(-50, 0, 500), np.random.uniform(150, 250, 500)]),
size=n_abnormal_temp,
)
# 注入缺失(1%)
n_missing_temp = int(0.01 * N_ROWS)
missing_indices_temp = np.random.choice(N_ROWS, size=n_missing_temp, replace=False)
temp_normal[missing_indices_temp] = np.nan
df["temperature"] = temp_normal
# 4. 生成 rpm(正常值 + 异常 + 缺失)
rpm_normal = np.random.randint(500, 5500, size=N_ROWS).astype(float)
# 注入异常值(0.8%)
n_abnormal_rpm = int(0.008 * N_ROWS)
abnormal_indices_rpm = np.random.choice(N_ROWS, size=n_abnormal_rpm, replace=False)
rpm_normal[abnormal_indices_rpm] = np.random.choice(
np.concatenate(
[np.random.uniform(-500, -1, 200), np.random.uniform(7000, 12000, 600)]
),
size=n_abnormal_rpm,
)
# 注入缺失(1%)
n_missing_rpm = int(0.01 * N_ROWS)
missing_indices_rpm = np.random.choice(N_ROWS, size=n_missing_rpm, replace=False)
rpm_normal[missing_indices_rpm] = np.nan
df["rpm"] = rpm_normal
# 5. 生成 status
statuses = ["normal", "warning", "error"]
status_weights = [0.85, 0.12, 0.03]
df["status"] = np.random.choice(statuses, size=N_ROWS, p=status_weights)
# 注入拼写错误(0.5%)
n_error_status = int(0.005 * N_ROWS)
error_indices = np.random.choice(N_ROWS, size=n_error_status, replace=False)
typos = ["norml", "warng", "err", "nomral", ""]
df.loc[error_indices, "status"] = np.random.choice(typos, size=n_error_status)
# 注入缺失(0.5%)
n_missing_status = int(0.005 * N_ROWS)
missing_status_indices = np.random.choice(N_ROWS, size=n_missing_status, replace=False)
df.loc[missing_status_indices, "status"] = np.nan
# 6. 注入重复行(0.5%,约500行)
n_duplicates = int(0.005 * N_ROWS)
duplicate_indices = np.random.choice(df.index, size=n_duplicates, replace=False)
duplicates = df.loc[duplicate_indices].copy()
df = pd.concat([df, duplicates], ignore_index=True)
# 打乱顺序
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
# 7. 保存为 CSV
output_file = "jinan_car_device_log_10w.csv"
df.to_csv(output_file, index=False)
print(f"✅ 已生成 {len(df)} 行测试数据,保存至: {output_file}")