Python 采集每日一词到博客中

Python 采集每日一词到博客中

每天将词霸的每日一句采集到自己的博客中

接口地址及参数

# 获取金山每日一词作为元数据并保存到MySQL
import requests
import pymysql
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional, Dict, List

# 数据库配置 - 请修改为您的实际配置
DB_CONFIG = {
    'host': '****',
    'port': 3306,
    'user': '**',
    'password': '***',
    'database': '***',
    'charset': 'utf8mb4'
}


class DailyImageCrawler:
    def __init__(self, db_config: Dict):
        self.db_config = db_config
        self.connection = None

    def connect_db(self):
        """建立数据库连接"""
        try:
            self.connection = pymysql.connect(**self.db_config)
            print("✅ 数据库连接成功")
        except Exception as e:
            print(f"❌ 数据库连接失败: {e}")
            raise

    def close_db(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
            print("数据库连接已关闭")

    def parse_api_response(self, data: Dict, date_str: str) -> Optional[Dict]:
        """解析API返回数据,映射到数据库字段"""
        try:
            # 提取图片URL(优先使用高清图)
            image_url = data.get('picture3') or data.get('picture2') or data.get('picture') or ''

            # 构建完整的数据记录
            record = {
                'date': date_str,
                'image_url': image_url,
                'share_url': data.get('fenxiang_img', ''),
                'caption': data.get('caption', ''),
                'content': data.get('content', ''),
                'note': data.get('note', ''),
                'author': data.get('author', ''),
                'width': data.get('width'),
                'height': data.get('height'),
                'created': int(datetime.now().timestamp())
            }

            # 确保width和height是整数或None
            if record['width'] is not None:
                record['width'] = int(record['width'])
            if record['height'] is not None:
                record['height'] = int(record['height'])

            return record
        except Exception as e:
            print(f"解析数据失败 {date_str}: {e}")
            return None

    def save_to_db(self, record: Dict) -> bool:
        """保存单条记录到数据库"""
        sql = """
              INSERT INTO t_daily_images
              (date, image_url, share_url, caption, content, note, author, width, height, created)
              VALUES (%(date)s, %(image_url)s, %(share_url)s, %(caption)s, %(content)s,
                      %(note)s, %(author)s, %(width)s, %(height)s, %(created)s) ON DUPLICATE KEY \
              UPDATE \
                  image_url = \
              VALUES (image_url), share_url = \
              VALUES (share_url), caption = \
              VALUES (caption), content = \
              VALUES (content), note = \
              VALUES (note), author = \
              VALUES (author), width = \
              VALUES (width), height = \
              VALUES (height) \
              """

        try:
            with self.connection.cursor() as cursor:
                cursor.execute(sql, record)
                self.connection.commit()
                return True
        except Exception as e:
            print(f"保存失败 {record.get('date')}: {e}")
            self.connection.rollback()
            return False

    def check_existing_dates(self, date_list: List[str]) -> List[str]:
        """检查哪些日期已经存在,返回需要抓取的日期列表"""
        if not date_list:
            return []

        placeholders = ','.join(['%s'] * len(date_list))
        sql = f"SELECT date FROM t_daily_images WHERE date IN ({placeholders})"

        try:
            with self.connection.cursor() as cursor:
                cursor.execute(sql, date_list)
                existing = {row[0] for row in cursor.fetchall()}
                missing = [date for date in date_list if date not in existing]
                print(f"📊 已存在: {len(existing)} 条,需要抓取: {len(missing)} 条")
                return missing
        except Exception as e:
            print(f"检查已有数据失败: {e}")
            return date_list

    def fetch_one_day(self, date: str) -> Optional[Dict]:
        """抓取单日数据"""
        try:
            url = f"https://open.iciba.com/dsapi/?date={date}"
            resp = requests.get(url, timeout=10)

            if resp.status_code == 200:
                data = resp.json()
                # 检查是否有有效数据
                if data.get('picture') or data.get('content'):
                    record = self.parse_api_response(data, date)
                    if record:
                        content_preview = record.get('content', '')[:30]
                        print(f"✅ 抓取成功: {date} - {content_preview}")
                        return record
                    else:
                        print(f"⚠️ 解析失败: {date}")
                        return None
                else:
                    print(f"⚠️ 无有效数据: {date}")
                    return None
            else:
                print(f"❌ HTTP {resp.status_code}: {date}")
                return None
        except requests.Timeout:
            print(f"⏰ 请求超时: {date}")
            return None
        except Exception as e:
            print(f"❌ 抓取失败 {date}: {e}")
            return None

    def crawl_daily_words(self, days: int = 10, skip_existing: bool = True, max_workers: int = 20):
        """
        抓取每日一词数据并保存到数据库

        Args:
            days: 抓取天数(从今天往前)
            skip_existing: 是否跳过已存在的日期
            max_workers: 并发线程数
        """
        today = datetime.now()

        # 生成日期列表
        date_list = [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(days)]
        print(f"📅 计划抓取 {len(date_list)} 天的数据")
        print(f"📅 日期范围: {date_list[-1]} 至 {date_list[0]}")

        # 连接数据库
        self.connect_db()

        # 过滤已存在的日期
        if skip_existing:
            date_list = self.check_existing_dates(date_list)
            if not date_list:
                print("🎉 所有日期数据已存在,无需抓取")
                self.close_db()
                return

        print(f"🚀 开始并发抓取,线程数: {max_workers}")
        print("-" * 50)

        # 并发抓取
        success_count = 0
        failed_count = 0

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_date = {executor.submit(self.fetch_one_day, date): date for date in date_list}

            for future in as_completed(future_to_date):
                date = future_to_date[future]
                try:
                    record = future.result(timeout=15)
                    if record and self.save_to_db(record):
                        success_count += 1
                    else:
                        failed_count += 1
                except Exception as e:
                    print(f"❌ 处理异常 {date}: {e}")
                    failed_count += 1

        # 输出统计信息
        print("-" * 50)
        print(f"📊 抓取完成统计:")
        print(f"   ✅ 成功: {success_count} 条")
        print(f"   ❌ 失败: {failed_count} 条")
        if success_count + failed_count > 0:
            print(f"   📈 成功率: {success_count / (success_count + failed_count) * 100:.1f}%")
        print("-" * 50)

        # 关闭数据库连接
        self.close_db()


def main():
    """主函数"""
    # 创建爬虫实例并抓取数据
    crawler = DailyImageCrawler(DB_CONFIG)

    # 抓取最近4000天的数据(约11年)
    # 注意:API可能只有部分日期有数据,无数据的日期会自动跳过
    crawler.crawl_daily_words(
        days=4000,
        skip_existing=True,  # 跳过已存在的日期
        max_workers=20  # 并发线程数,可根据网络情况调整
    )


if __name__ == "__main__":
    main()
              

表结构

CREATE TABLE `t_daily_images` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `date` date NOT NULL COMMENT '日期',
  `image_url` varchar(500) NOT NULL COMMENT '图片 URL',
  `share_url` varchar(500) DEFAULT NULL COMMENT '分享链接',
  `caption` text COMMENT '图片说明/文字',
  `content` text COMMENT '英文句子',
  `note` text COMMENT '中文翻译',
  `author` varchar(100) DEFAULT NULL COMMENT '作者',
  `width` int(10) DEFAULT NULL COMMENT '图片宽度',
  `height` int(10) DEFAULT NULL COMMENT '图片高度',
  `created` int(10) unsigned NOT NULL COMMENT '创建时间戳',
  PRIMARY KEY (`id`),
  UNIQUE KEY `date` (`date`),
  KEY `created` (`created`)
) ENGINE=InnoDB AUTO_INCREMENT=5084 DEFAULT CHARSET=utf8mb4 COMMENT='每日一图图片表';