Python 采集每日一词到博客中
Python 采集每日一词到博客中
每天将词霸的每日一句采集到自己的博客中
接口地址及参数
# 获取金山每日一词作为元数据并保存到MySQL
import requests
import pymysql
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional, Dict, List
# 数据库配置 - 请修改为您的实际配置
DB_CONFIG = {
'host': '****',
'port': 3306,
'user': '**',
'password': '***',
'database': '***',
'charset': 'utf8mb4'
}
class DailyImageCrawler:
def __init__(self, db_config: Dict):
self.db_config = db_config
self.connection = None
def connect_db(self):
"""建立数据库连接"""
try:
self.connection = pymysql.connect(**self.db_config)
print("✅ 数据库连接成功")
except Exception as e:
print(f"❌ 数据库连接失败: {e}")
raise
def close_db(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
print("数据库连接已关闭")
def parse_api_response(self, data: Dict, date_str: str) -> Optional[Dict]:
"""解析API返回数据,映射到数据库字段"""
try:
# 提取图片URL(优先使用高清图)
image_url = data.get('picture3') or data.get('picture2') or data.get('picture') or ''
# 构建完整的数据记录
record = {
'date': date_str,
'image_url': image_url,
'share_url': data.get('fenxiang_img', ''),
'caption': data.get('caption', ''),
'content': data.get('content', ''),
'note': data.get('note', ''),
'author': data.get('author', ''),
'width': data.get('width'),
'height': data.get('height'),
'created': int(datetime.now().timestamp())
}
# 确保width和height是整数或None
if record['width'] is not None:
record['width'] = int(record['width'])
if record['height'] is not None:
record['height'] = int(record['height'])
return record
except Exception as e:
print(f"解析数据失败 {date_str}: {e}")
return None
def save_to_db(self, record: Dict) -> bool:
"""保存单条记录到数据库"""
sql = """
INSERT INTO t_daily_images
(date, image_url, share_url, caption, content, note, author, width, height, created)
VALUES (%(date)s, %(image_url)s, %(share_url)s, %(caption)s, %(content)s,
%(note)s, %(author)s, %(width)s, %(height)s, %(created)s) ON DUPLICATE KEY \
UPDATE \
image_url = \
VALUES (image_url), share_url = \
VALUES (share_url), caption = \
VALUES (caption), content = \
VALUES (content), note = \
VALUES (note), author = \
VALUES (author), width = \
VALUES (width), height = \
VALUES (height) \
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, record)
self.connection.commit()
return True
except Exception as e:
print(f"保存失败 {record.get('date')}: {e}")
self.connection.rollback()
return False
def check_existing_dates(self, date_list: List[str]) -> List[str]:
"""检查哪些日期已经存在,返回需要抓取的日期列表"""
if not date_list:
return []
placeholders = ','.join(['%s'] * len(date_list))
sql = f"SELECT date FROM t_daily_images WHERE date IN ({placeholders})"
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, date_list)
existing = {row[0] for row in cursor.fetchall()}
missing = [date for date in date_list if date not in existing]
print(f"📊 已存在: {len(existing)} 条,需要抓取: {len(missing)} 条")
return missing
except Exception as e:
print(f"检查已有数据失败: {e}")
return date_list
def fetch_one_day(self, date: str) -> Optional[Dict]:
"""抓取单日数据"""
try:
url = f"https://open.iciba.com/dsapi/?date={date}"
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
data = resp.json()
# 检查是否有有效数据
if data.get('picture') or data.get('content'):
record = self.parse_api_response(data, date)
if record:
content_preview = record.get('content', '')[:30]
print(f"✅ 抓取成功: {date} - {content_preview}")
return record
else:
print(f"⚠️ 解析失败: {date}")
return None
else:
print(f"⚠️ 无有效数据: {date}")
return None
else:
print(f"❌ HTTP {resp.status_code}: {date}")
return None
except requests.Timeout:
print(f"⏰ 请求超时: {date}")
return None
except Exception as e:
print(f"❌ 抓取失败 {date}: {e}")
return None
def crawl_daily_words(self, days: int = 10, skip_existing: bool = True, max_workers: int = 20):
"""
抓取每日一词数据并保存到数据库
Args:
days: 抓取天数(从今天往前)
skip_existing: 是否跳过已存在的日期
max_workers: 并发线程数
"""
today = datetime.now()
# 生成日期列表
date_list = [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(days)]
print(f"📅 计划抓取 {len(date_list)} 天的数据")
print(f"📅 日期范围: {date_list[-1]} 至 {date_list[0]}")
# 连接数据库
self.connect_db()
# 过滤已存在的日期
if skip_existing:
date_list = self.check_existing_dates(date_list)
if not date_list:
print("🎉 所有日期数据已存在,无需抓取")
self.close_db()
return
print(f"🚀 开始并发抓取,线程数: {max_workers}")
print("-" * 50)
# 并发抓取
success_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_date = {executor.submit(self.fetch_one_day, date): date for date in date_list}
for future in as_completed(future_to_date):
date = future_to_date[future]
try:
record = future.result(timeout=15)
if record and self.save_to_db(record):
success_count += 1
else:
failed_count += 1
except Exception as e:
print(f"❌ 处理异常 {date}: {e}")
failed_count += 1
# 输出统计信息
print("-" * 50)
print(f"📊 抓取完成统计:")
print(f" ✅ 成功: {success_count} 条")
print(f" ❌ 失败: {failed_count} 条")
if success_count + failed_count > 0:
print(f" 📈 成功率: {success_count / (success_count + failed_count) * 100:.1f}%")
print("-" * 50)
# 关闭数据库连接
self.close_db()
def main():
"""主函数"""
# 创建爬虫实例并抓取数据
crawler = DailyImageCrawler(DB_CONFIG)
# 抓取最近4000天的数据(约11年)
# 注意:API可能只有部分日期有数据,无数据的日期会自动跳过
crawler.crawl_daily_words(
days=4000,
skip_existing=True, # 跳过已存在的日期
max_workers=20 # 并发线程数,可根据网络情况调整
)
if __name__ == "__main__":
main()
表结构
CREATE TABLE `t_daily_images` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`date` date NOT NULL COMMENT '日期',
`image_url` varchar(500) NOT NULL COMMENT '图片 URL',
`share_url` varchar(500) DEFAULT NULL COMMENT '分享链接',
`caption` text COMMENT '图片说明/文字',
`content` text COMMENT '英文句子',
`note` text COMMENT '中文翻译',
`author` varchar(100) DEFAULT NULL COMMENT '作者',
`width` int(10) DEFAULT NULL COMMENT '图片宽度',
`height` int(10) DEFAULT NULL COMMENT '图片高度',
`created` int(10) unsigned NOT NULL COMMENT '创建时间戳',
PRIMARY KEY (`id`),
UNIQUE KEY `date` (`date`),
KEY `created` (`created`)
) ENGINE=InnoDB AUTO_INCREMENT=5084 DEFAULT CHARSET=utf8mb4 COMMENT='每日一图图片表';