""" Pixiv爬虫 - 每日排行榜下载 环境需求:Python3.8+ / Redis """ from typing import Generator, List, Dict, Any import requests import time from rich.console import Console from rich.progress import ( Progress, BarColumn, TaskProgressColumn, TextColumn, SpinnerColumn ) from rich.live import Live from rich.layout import Layout from rich.panel import Panel from rich.console import Group from config import PIXIV_CONFIG from redis_client import RedisClient from pixiv_download import PixivDownloader requests.packages.urllib3.disable_warnings() class PixivSpider: """Pixiv每日排行榜爬虫""" TOTAL_IMAGES = 500 # 每日排行榜总图片数 def __init__(self, db: int = 0): """ 初始化爬虫 参数: db: Redis数据库编号(0-5) """ # 设置Redis self.redis = RedisClient() if not self.redis.select_db(db): raise ValueError(f"无效的Redis数据库编号: {db}") # 设置界面组件 self.console = Console() self._setup_ui() # 初始化状态 self.headers = None self.current_ranking_data = [] self.failed_works = [] self.log_messages = [] def _setup_ui(self) -> None: """设置Rich界面组件""" # 创建布局 self.layout = Layout() self.layout.split( Layout(name="header", size=3), Layout(name="main", size=None), Layout(name="progress", size=3) ) # 设置标题 self.layout["header"].update( Panel("PixivSpider", style="bold magenta", border_style="bright_blue") ) # 创建进度条 - 固定在底部 self.progress = Progress( SpinnerColumn(), TextColumn("[bold blue]{task.description}"), BarColumn(), TextColumn("{task.percentage:>3.0f}%"), console=self.console, expand=True, transient=False ) # 设置进度条任务 self.main_task_id = self.progress.add_task( "总体进度", total=self.TOTAL_IMAGES ) def _update_log(self, message: str, speed: float = 0.0) -> None: """更新日志信息""" if not self.log_messages or message != self.log_messages[-1]: self.log_messages.insert(0, message) if len(self.log_messages) > 100: self.log_messages = self.log_messages[:100] messages = self.log_messages[:10] # 清空控制台 self.console.clear() # 重新渲染布局 self.console.print(self.layout) # 更新日志面板 log_content = "\n".join(messages) log_panel = Panel( log_content, title="下载状态", subtitle=f"显示最新 {len(messages)}/{len(self.log_messages)} 条消息, 速度: {speed:.2f} t/s", border_style="green", padding=(1, 2), expand=False ) self.layout["main"].update(log_panel) def _setup_session(self) -> None: """设置请求会话""" cookie = self.redis.get_cookie() if not cookie: cookie = input('请输入Pixiv Cookie :') self.redis.set_cookie(cookie) self.headers = PIXIV_CONFIG.headers.copy() self.headers['cookie'] = cookie def get_ranking_page(self, page: int) -> None: """ 获取排行榜单页数据 参数: page: 页码(1-10) """ params = { 'mode': 'daily', 'content': 'illust', 'p': str(page), 'format': 'json' } response = requests.get( PIXIV_CONFIG.top_url, params=params, headers=self.headers, verify=False ) data = response.json() self.current_ranking_data = data['contents'] def process_ranking_data(self) -> Generator[str, None, None]: """ 处理当前排行榜数据 生成: str: 作品ID """ for item in self.current_ranking_data: work_id = str(item['illust_id']) user_id = str(item['user_id']) self.redis.store_user_id(work_id, user_id) yield work_id def run(self) -> None: """运行爬虫""" self._setup_session() downloader = PixivDownloader(self, self.headers, self.progress) with Live(self.layout, refresh_per_second=20, auto_refresh=True, console=self.console): self.layout["progress"].update(self.progress) self._update_log('[cyan]开始抓取...[/cyan]') # 处理排行榜页面 # 保存开始时间用于计算速度 start_time = time.time() last_update_time = start_time completed_works = 0 for page in range(1, 11): try: self.get_ranking_page(page) for work_id in self.process_ranking_data(): if not downloader.download_work(work_id): self.failed_works.append(work_id) else: completed_works += 1 # 计算实际速度(作品/秒) current_time = time.time() elapsed_time = current_time - start_time # 每秒更新一次速度 completed_works += 1 # 计算实际速度(作品/秒) current_time = time.time() elapsed_time = current_time - start_time # 每次下载图片后更新速度 if elapsed_time > 0: # 避免除以零错误 speed = completed_works / elapsed_time self._update_log(f"当前速度: {speed:.2f} t / s", speed) # 更新进度 self.progress.update( self.main_task_id, completed=completed_works ) # 更新UI显示 self.layout["progress"].update(self.progress) except requests.RequestException as e: self._update_log(f'[red]获取排行榜第{page}页时发生错误:{str(e)}[/red]') continue # 清理失败作品的记录 for work_id in self.failed_works: self.redis.client().delete(work_id) self._update_log('[green]爬虫运行完成[/green]')