PixivSpider/pixiv_spider.py

213 lines
7.0 KiB
Python
Raw Permalink Normal View History

"""
Pixiv爬虫 - 每日排行榜下载
环境需求Python3.8+ / Redis
"""
from typing import Generator, List, Dict, Any
import requests
2024-12-22 05:34:21 +08:00
import time
from rich.console import Console
from rich.progress import (
Progress,
BarColumn,
TaskProgressColumn,
TextColumn,
SpinnerColumn
)
from rich.live import Live
from rich.layout import Layout
from rich.panel import Panel
from rich.console import Group
from config import PIXIV_CONFIG
from redis_client import RedisClient
from pixiv_download import PixivDownloader
requests.packages.urllib3.disable_warnings()
class PixivSpider:
"""Pixiv每日排行榜爬虫"""
TOTAL_IMAGES = 500 # 每日排行榜总图片数
def __init__(self, db: int = 0):
"""
初始化爬虫
参数:
db: Redis数据库编号(0-5)
"""
# 设置Redis
self.redis = RedisClient()
if not self.redis.select_db(db):
raise ValueError(f"无效的Redis数据库编号: {db}")
# 设置界面组件
self.console = Console()
self._setup_ui()
# 初始化状态
self.headers = None
self.current_ranking_data = []
self.failed_works = []
2024-12-22 17:45:19 +08:00
self.log_messages = []
def _setup_ui(self) -> None:
"""设置Rich界面组件"""
# 创建布局
self.layout = Layout()
self.layout.split(
2024-12-22 05:34:21 +08:00
Layout(name="header", size=3),
Layout(name="main", size=None),
Layout(name="progress", size=3)
)
2024-12-22 05:34:21 +08:00
# 设置标题
self.layout["header"].update(
Panel("PixivSpider", style="bold magenta", border_style="bright_blue")
)
# 创建进度条 - 固定在底部
self.progress = Progress(
2024-12-22 05:34:21 +08:00
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
2024-12-22 05:34:21 +08:00
BarColumn(),
TextColumn("{task.percentage:>3.0f}%"),
console=self.console,
expand=True,
2024-12-22 17:45:19 +08:00
transient=False
2024-12-22 05:34:21 +08:00
)
# 设置进度条任务
self.main_task_id = self.progress.add_task(
2024-12-22 05:34:21 +08:00
"总体进度",
2024-12-22 17:45:19 +08:00
total=self.TOTAL_IMAGES
)
2024-12-22 17:45:19 +08:00
def _update_log(self, message: str, speed: float = 0.0) -> None:
"""更新日志信息"""
2024-12-22 05:34:21 +08:00
if not self.log_messages or message != self.log_messages[-1]:
2024-12-22 17:45:19 +08:00
self.log_messages.insert(0, message)
2024-12-22 05:34:21 +08:00
if len(self.log_messages) > 100:
2024-12-22 17:45:19 +08:00
self.log_messages = self.log_messages[:100]
messages = self.log_messages[:10]
# 清空控制台
self.console.clear()
# 重新渲染布局
self.console.print(self.layout)
# 更新日志面板
log_content = "\n".join(messages)
log_panel = Panel(
log_content,
title="下载状态",
subtitle=f"显示最新 {len(messages)}/{len(self.log_messages)} 条消息, 速度: {speed:.2f} t/s",
border_style="green",
padding=(1, 2),
2024-12-22 18:27:15 +08:00
expand=True
2024-12-22 17:45:19 +08:00
)
self.layout["main"].update(log_panel)
def _setup_session(self) -> None:
"""设置请求会话"""
cookie = self.redis.get_cookie()
if not cookie:
2024-12-22 05:34:21 +08:00
cookie = input('请输入Pixiv Cookie ')
self.redis.set_cookie(cookie)
self.headers = PIXIV_CONFIG.headers.copy()
self.headers['cookie'] = cookie
def get_ranking_page(self, page: int) -> None:
"""
获取排行榜单页数据
参数:
page: 页码(1-10)
"""
params = {
'mode': 'daily',
'content': 'illust',
'p': str(page),
'format': 'json'
}
response = requests.get(
PIXIV_CONFIG.top_url,
params=params,
headers=self.headers,
verify=False
)
data = response.json()
self.current_ranking_data = data['contents']
def process_ranking_data(self) -> Generator[str, None, None]:
"""
处理当前排行榜数据
生成:
str: 作品ID
"""
for item in self.current_ranking_data:
work_id = str(item['illust_id'])
user_id = str(item['user_id'])
self.redis.store_user_id(work_id, user_id)
yield work_id
def run(self) -> None:
"""运行爬虫"""
self._setup_session()
2024-12-22 05:34:21 +08:00
downloader = PixivDownloader(self, self.headers, self.progress)
2024-12-22 05:34:21 +08:00
with Live(self.layout, refresh_per_second=20, auto_refresh=True, console=self.console):
self.layout["progress"].update(self.progress)
self._update_log('[cyan]开始抓取...[/cyan]')
# 处理排行榜页面
2024-12-22 05:34:21 +08:00
# 保存开始时间用于计算速度
start_time = time.time()
2024-12-22 17:45:19 +08:00
last_update_time = start_time
2024-12-22 05:34:21 +08:00
completed_works = 0
for page in range(1, 11):
try:
self.get_ranking_page(page)
for work_id in self.process_ranking_data():
if not downloader.download_work(work_id):
self.failed_works.append(work_id)
2024-12-22 05:34:21 +08:00
else:
# 计算实际速度(作品/秒)
2024-12-22 17:45:19 +08:00
current_time = time.time()
elapsed_time = current_time - start_time
# 每秒更新一次速度
2024-12-22 18:27:15 +08:00
2024-12-22 17:45:19 +08:00
# 计算实际速度(作品/秒)
current_time = time.time()
elapsed_time = current_time - start_time
2024-12-22 18:27:15 +08:00
completed_works += 1
2024-12-22 17:45:19 +08:00
# 每次下载图片后更新速度
if elapsed_time > 0: # 避免除以零错误
speed = completed_works / elapsed_time
2024-12-22 18:27:15 +08:00
self._update_log(f"[cyan]已爬取[/cyan] {completed_works} [cyan]个页面![/cyan]")
2024-12-22 17:45:19 +08:00
# 更新进度
2024-12-22 05:34:21 +08:00
self.progress.update(
self.main_task_id,
2024-12-22 18:27:15 +08:00
completed=completed_works,
2024-12-22 05:34:21 +08:00
)
# 更新UI显示
self.layout["progress"].update(self.progress)
except requests.RequestException as e:
self._update_log(f'[red]获取排行榜第{page}页时发生错误:{str(e)}[/red]')
continue
# 清理失败作品的记录
for work_id in self.failed_works:
2024-12-22 17:45:19 +08:00
self.redis.client().delete(work_id)
self._update_log('[green]爬虫运行完成[/green]')