Code - Python 生成器/迭代器

核心知识点

  • 迭代器:实现__iter__和__next__方法的对象,支持逐个获取元素(如列表、字符串)
  • 生成器:简化版迭代器,通过yield关键字创建,暂停执行并返回值,再次调用从暂停处继续
  • 核心优势:惰性计算,无需一次性加载所有数据到内存,适合处理大文件/海量数据
  • 企业场景:读取超大CSV/日志文件、处理海量用户数据、生成大批量测试数据

实战案例:生成器处理10万行销售大文件(济南超市销售场景)

案例需求

济南某超市有一个10万行的销售数据CSV文件(sales_data.csv),字段包含:日期、商品ID、商品名称、销售额、销量。需统计每日总销售额,要求内存占用≤100MB(避免一次性加载文件导致内存溢出)。

实现步骤(可跟随操作)

1. 生成模拟大文件(无需手动创建,用代码生成10万行测试数据)
2. 编写生成器函数,逐行读取CSV文件(不一次性加载)
3. 遍历生成器,按日期累加销售额
4. 输出每日总销售额,验证内存占用

完整代码


    import csv
    import random
    import time
    import psutil
    from sympy import product
    
    
    def create_csv_file(file_path: str, num_rows: int = 100000):
        """
        创建一个包含随机数据的CSV文件。
        :param file_path: 要创建的CSV文件路径。
        :param num_rows: 要生成的随机数据行数,默认值为100000。
        """
        products = [
            ("P001", "章丘大葱"),
            ("P002", "济南把子肉"),
            ("P003", "油旋"),
            ("P004", "甜沫"),
            ("P005", "阿胶糕"),
        ]
        # 模拟日期(近30天)
        start_date = time.strptime("2026-01-01", "%Y-%m-%d")
        # end_date = time.strptime("2026-01-30", "%Y-%m-%d")
    
        # dates = [time.strftime("%Y-%m-%d", time.localtime(start_date + i)) for i in range((end_date - start_date).days + 1)]
        dates = [
            time.strftime("%Y-%m-%d", time.localtime(time.mktime(start_date) + i * 86400))
            for i in range(30)
        ]
        with open(file_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            # 写入表头
            writer.writerow(["日期", "产品ID", "产品名称", "销售量"])
            for _ in range(num_rows):
                date = random.choice(dates)
                product_id, product_name = random.choice(products)
                sales = random.randint(100, 1000)
                writer.writerow([date, product_id, product_name, sales])
        print(f"已创建文件:{file_path},包含{num_rows}行随机数据")
    
    
    def read_csv_file(file_path: str):
        """
        读取CSV文件并返回其内容。
        :param file_path: 要读取的CSV文件路径。
        :return: 包含文件内容的列表,每个元素为一行数据(以列表形式表示)。
        """
        with open(file_path, "r", newline="", encoding="utf-8") as f:
            reader = csv.reader(f)
            next(reader)
            for row in reader:
                yield row
    
    
    def process_csv_data(file_path: str):
        """统计每日总销售额"""
        daily_sales = {}
        # 遍历生成器,逐行处理数据
        for row in read_csv_file(file_path):
            date = row[0]
            sales = float(row[3])
            # 按日期累加销售额
            if date in daily_sales:
                daily_sales[date] += sales
            else:
                daily_sales[date] = sales
    
        # 格式化输出结果(按日期排序)
        sorted_dates = sorted(daily_sales.keys())
        print("\n每日总销售额统计(济南超市):")
        for date in sorted_dates:
            print(f"{date}: ¥{round(daily_sales[date], 2)}")
    
        process = psutil.Process()
        memory_usage = process.memory_info().rss
        print(f"\n当前程序内存占用:{round(memory_usage, 2)} MB")
    
    
    if __name__ == "__main__":
        # 1. 生成10万行模拟销售数据(首次运行时执行,后续可注释)
        # create_csv_file("sales_data.csv", num_rows=100000)
        # 2. 统计每日销售额并监控内存
        process_csv_data("sales_data.csv")

验证方法

1. 安装依赖:执行 pip install psutil(用于监控内存)

2. 运行代码,先生成10万行CSV文件,再统计销售额

3. 查看输出的内存占用,应≤50MB(远低于100MB的要求)

4. 打开生成的sales_data.csv文件,随机抽查某日期的销售额,手动计算是否与程序统计结果一致

最后

Python生成器处理大文件,等价于PHP中用fgets()逐行读取文件(而非file_get_contents()一次性加载),但生成器更简洁,无需手动管理指针位置,通过yield自动实现“读取-暂停-继续”的逻辑。

添加新评论