PixivSpider/pixiv_download.py

151 lines
4.8 KiB
Python
Raw Permalink Normal View History

"""Pixiv下载组件"""
import os
import re
from typing import Optional, Union
import requests
from rich.progress import Progress
from config import PIXIV_CONFIG
from redis_client import RedisClient
class PixivDownloader:
"""处理Pixiv图片下载"""
def __init__(self, headers: dict, progress: Progress):
"""
初始化下载器
参数:
headers: 带cookie的请求头
progress: Rich进度条实例
"""
self.headers = headers
self.progress = progress
self.redis = RedisClient()
def download_image(self, url: str) -> bool:
"""
下载单张图片
参数:
url: 图片URL
返回:
bool: 成功返回True失败返回False
"""
# 从URL提取图片信息
match = re.search(r'/(\d+)_p(\d+)\.([a-z]+)$', url)
if not match:
return False
illust_id, page_num, extension = match.groups()
file_name = f"{illust_id}_p{page_num}.{extension}"
# 检查是否已下载
if self.redis.is_image_downloaded(illust_id, page_num):
return True
# 确保下载目录存在
if not os.path.isdir('./img'):
os.makedirs('./img')
# 下载重试机制
for attempt in range(3):
try:
response = requests.get(
url,
headers=self.headers,
timeout=15,
verify=False
)
if response.status_code == 200:
# 保存图片
with open(f'./img/{file_name}', 'wb') as fp:
fp.write(response.content)
# 更新Redis记录
self.redis.mark_image_downloaded(illust_id, page_num)
# 更新总页数
total_pages = self.redis.get_total_pages(illust_id)
if not total_pages:
self.redis.set_total_pages(illust_id, int(page_num) + 1)
elif int(page_num) + 1 == total_pages:
# 检查作品是否完成
all_downloaded = all(
self.redis.is_image_downloaded(illust_id, i)
for i in range(total_pages)
)
if all_downloaded:
self.redis.mark_work_complete(illust_id)
return True
except requests.RequestException:
if attempt == 2: # 最后一次尝试失败
return False
continue
return False
def download_work(self, work_id: str) -> bool:
"""
下载作品的所有图片
参数:
work_id: Pixiv作品ID
返回:
bool: 全部成功返回True否则False
"""
# 跳过已完成的作品
if self.redis.is_work_complete(work_id):
return True
try:
# 获取图片URL列表
response = requests.get(
PIXIV_CONFIG.ajax_url.format(work_id),
headers=self.headers,
verify=False
)
data = response.json()
if data.get('error'):
return False
images = data.get('body', [])
if not images:
return False
# 下载每张图片
if len(images) > 1:
# 多图作品
subtask_id = self.progress.add_task(
f"[yellow]PID:{work_id}",
total=len(images)
)
success = True
for image in images:
if 'urls' not in image or 'original' not in image['urls']:
success = False
continue
if not self.download_image(image['urls']['original']):
success = False
self.progress.update(subtask_id, advance=1)
self.progress.remove_task(subtask_id)
return success
else:
# 单图作品
if 'urls' not in images[0] or 'original' not in images[0]['urls']:
return False
return self.download_image(images[0]['urls']['original'])
except (requests.RequestException, KeyError, ValueError):
return False