Merge pull request 'dev' (#9) from dev into master

Reviewed-on: #9
This commit is contained in:
岛风 2024-12-19 21:30:19 +08:00
commit f655d50b9b
4 changed files with 371 additions and 53 deletions

242
Pixiv.py
View File

@ -1,6 +1,6 @@
""" """
P站小爬虫 爬每日排行榜 P站小爬虫 爬每日排行榜
环境需求Python3.6+ / Redis 环境需求Python3.8+ / Redis
项目地址https://github.com/nyaasuki/PixivSpider 项目地址https://github.com/nyaasuki/PixivSpider
""" """
@ -24,67 +24,165 @@ error_list = []
class PixivSpider(object): class PixivSpider(object):
def __init__(self): def __init__(self, db=0):
self.ajax_url = 'https://www.pixiv.net/ajax/illust/{}/pages' # id self.ajax_url = 'https://www.pixiv.net/ajax/illust/{}/pages' # id
self.top_url = 'https://www.pixiv.net/ranking.php' self.top_url = 'https://www.pixiv.net/ranking.php'
self.r = redis.Redis(host='localhost', port=6379, decode_responses=True) self.r = redis.Redis(host='localhost', port=6379, db=db, decode_responses=True)
def get_list(self, pid): def get_list(self, pid):
""" """
:param pid: 插画ID 获取作品所有页面的URL
:param pid: 作品ID
""" """
try:
# 检查Redis中是否已记录该作品已完全下载
if self.r.get(f'downloaded:{pid}') == 'complete':
print(f'作品ID:{pid}已在Redis中标记为完全下载跳过')
return None
# 发送请求获取作品的所有图片信息
response = requests.get(self.ajax_url.format(pid), headers=self.headers, verify=False) response = requests.get(self.ajax_url.format(pid), headers=self.headers, verify=False)
json_data = response.json() json_data = response.json()
list_temp = json_data['body']
for l in list_temp: # 检查API返回是否有错误
url_tamp = l['urls']['original'] if json_data.get('error'):
n = self.r.get(pid) print(f'获取作品ID:{pid}失败:{json_data.get("message")}')
if not n: return pid
why_not_do = self.get_img(url_tamp)
# 判断是否返回异常 如果有异常则取消这个页面的爬取 等待下次 # 从返回数据中获取图片列表
images = json_data.get('body', [])
if not images:
print(f'作品ID:{pid}没有图片')
return pid
# 获取Redis中已下载的页面记录
downloaded_redis = set()
for i in range(len(images)):
if self.r.get(f'downloaded:{pid}_p{i}') == 'true':
downloaded_redis.add(i)
# 检查本地已下载的文件并更新Redis记录
if os.path.exists('./img'):
for f in os.listdir('./img'):
if f.startswith(f'{pid}_p'):
page = int(re.search(r'_p(\d+)\.', f).group(1))
if self.r.get(f'downloaded:{pid}_p{page}') != 'true':
self.r.set(f'downloaded:{pid}_p{page}', 'true')
print(f'发现本地文件并更新Redis记录{f}')
# 使用Redis记录作为唯一来源
downloaded = downloaded_redis
# 遍历所有图片进行下载
for image in images:
# 检查图片数据格式是否正确
if 'urls' not in image or 'original' not in image['urls']:
print(f'作品ID:{pid}的图片数据格式错误')
continue
# 获取原图URL和页码信息
original_url = image['urls']['original']
page_num = int(re.search(r'_p(\d+)\.', original_url).group(1))
# 检查是否已下载过该页面优先使用Redis记录
if page_num in downloaded:
print(f'作品ID:{pid}{page_num}页在Redis中已标记为下载跳过')
continue
# 下载图片如果下载失败返回作品ID以便后续处理
why_not_do = self.get_img(original_url)
if why_not_do == 1: if why_not_do == 1:
return pid return pid
else:
print(f'插画ID:{pid}已存在!')
break
# with open('pixiv.json', 'a', encoding='utf-8') as f: except requests.exceptions.RequestException as e:
# f.write(url_tamp + '\n') print(f'获取作品ID:{pid}时发生网络错误:{str(e)}')
# 导出 return pid
except Exception as e:
print(f'处理作品ID:{pid}时发生错误:{str(e)}')
return pid
def get_img(self, url): def get_img(self, url):
""" """
下载单个图片
:param url: 作品页URL :param url: 原图URL格式如https://i.pximg.net/img-original/img/2024/12/14/20/00/36/125183562_p0.jpg
:return: :return: 0表示下载成功1表示下载失败
""" """
# 确保下载目录存在
if not os.path.isdir('./img'): if not os.path.isdir('./img'):
os.makedirs('./img') os.makedirs('./img')
file_name = re.findall('/\d+/\d+/\d+/\d+/\d+/\d+/(.*)', url)[0]
if os.path.isfile(f'./img/{file_name}'): # 从URL提取作品ID、页码和文件扩展名
print(f'文件:{file_name}已存在,跳过') match = re.search(r'/(\d+)_p(\d+)\.([a-z]+)$', url)
# 单个文件存在并不能判断是否爬取过 if not match:
print(f'无效的URL格式: {url}')
return 1
# 解析URL信息并构建文件名
illust_id, page_num, extension = match.groups()
file_name = f"{illust_id}_p{page_num}.{extension}"
# 检查Redis中是否已记录为下载
if self.r.get(f'downloaded:{illust_id}_p{page_num}') == 'true':
print(f'Redis记录{file_name}已下载,跳过')
return 0 return 0
print(f'开始下载:{file_name}')
t = 0 # 作为备份检查,验证文件是否存在
if os.path.isfile(f'./img/{file_name}'):
# 如果文件存在但Redis没有记录更新Redis记录
self.r.set(f'downloaded:{illust_id}_p{page_num}', 'true')
print(f'文件已存在但Redis未记录已更新Redis{file_name}')
return 0
# 开始下载流程
print(f'开始下载:{file_name} (第{int(page_num)+1}张)')
t = 0 # 重试计数器
# 最多重试3次
while t < 3: while t < 3:
try: try:
# 下载图片设置15秒超时
img_temp = requests.get(url, headers=self.headers, timeout=15, verify=False) img_temp = requests.get(url, headers=self.headers, timeout=15, verify=False)
if img_temp.status_code == 200:
break break
except requests.exceptions.RequestException: print(f'下载失败,状态码:{img_temp.status_code}')
print('连接异常!正在重试!')
t += 1 t += 1
except requests.exceptions.RequestException as e:
print(f'连接异常:{str(e)}')
t += 1
# 如果重试3次都失败放弃下载
if t == 3: if t == 3:
# 返回异常 取消此次爬取 等待下次 print(f'下载失败次数过多,跳过该图片')
return 1 return 1
# 将图片内容写入文件
with open(f'./img/{file_name}', 'wb') as fp: with open(f'./img/{file_name}', 'wb') as fp:
fp.write(img_temp.content) fp.write(img_temp.content)
# 下载成功后在Redis中记录
self.r.set(f'downloaded:{illust_id}_p{page_num}', 'true')
# 获取作品总页数并检查是否已下载所有页面
page_count = self.r.get(f'total_pages:{illust_id}')
if not page_count:
# 当前页号+1可能是总页数保守估计
self.r.set(f'total_pages:{illust_id}', str(int(page_num) + 1))
elif int(page_num) + 1 == int(page_count):
# 如果当前是最后一页,检查是否所有页面都已下载
all_downloaded = all(
self.r.get(f'downloaded:{illust_id}_p{i}') == 'true'
for i in range(int(page_count))
)
if all_downloaded:
self.r.set(f'downloaded:{illust_id}', 'complete')
print(f'作品ID:{illust_id}已完全下载')
print(f'下载完成并已记录到Redis{file_name}')
return 0
def get_top_url(self, num): def get_top_url(self, num):
""" """
获取每日排行榜的特定页码数据
:param num: 页码 :param num: 页码1-10
:return: :return: None
""" """
params = { params = {
'mode': 'daily', 'mode': 'daily',
@ -97,22 +195,61 @@ class PixivSpider(object):
self.pixiv_spider_go(json_data['contents']) self.pixiv_spider_go(json_data['contents'])
def get_top_pic(self): def get_top_pic(self):
"""
从排行榜数据中提取作品ID和用户ID
并将用户ID存入Redis数据库中
:return: 生成器返回作品ID
"""
for url in self.data: for url in self.data:
illust_id = url['illust_id'] illust_id = url['illust_id'] # 获取作品ID
illust_user = url['user_id'] illust_user = url['user_id'] # 获取用户ID
yield illust_id # 生成PID yield illust_id # 生成作品ID
self.r.set(illust_id, illust_user) self.r.set(illust_id, illust_user) # 将PID保存到Redis中
@classmethod @classmethod
def pixiv_spider_go(cls, data): def pixiv_spider_go(cls, data):
"""
存储排行榜数据供后续处理
:param data: 排行榜JSON数据中的contents部分
"""
cls.data = data cls.data = data
@classmethod @classmethod
def pixiv_main(cls): def pixiv_main(cls):
"""
爬虫主函数
1. 选择Redis数据库
2. 获取或设置Cookie
3. 配置请求头
4. 遍历排行榜页面1-10
5. 下载每个作品的所有图片
6. 处理下载失败的作品
"""
# 选择Redis数据库
while True:
try:
print("\n可用的Redis数据库:")
for i in range(6):
print(f"{i}.DB{i}")
db_choice = input("\n请选择Redis数据库 (0-5): ")
db_num = int(db_choice)
if 0 <= db_num <= 5:
break
print("错误请输入0到5之间的数字")
except ValueError:
print("错误:请输入有效的数字")
global pixiv
pixiv = PixivSpider(db_num)
print(f"\n已选择 DB{db_num}")
# 从Redis获取Cookie如果没有则要求用户输入
cookie = pixiv.r.get('cookie') cookie = pixiv.r.get('cookie')
if not cookie: if not cookie:
cookie = input('请输入一个cookie') cookie = input('请输入一个cookie')
pixiv.r.set('cookie', cookie) pixiv.r.set('cookie', cookie)
# 配置请求头包含必要的HTTP头部信息
cls.headers = { cls.headers = {
'accept': 'application/json', 'accept': 'application/json',
'accept-language': 'zh-CN,zh;q=0.9,zh-TW;q=0.8,en-US;q=0.7,en;q=0.6', 'accept-language': 'zh-CN,zh;q=0.9,zh-TW;q=0.8,en-US;q=0.7,en;q=0.6',
@ -123,19 +260,34 @@ class PixivSpider(object):
'sec-fetch-site': 'same-origin', 'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36' 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36'
} }
print('开始抓取...') print('开始抓取...')
for i in range(1, 11, 1): # p站每日排行榜最多为500个 # 遍历排行榜前10页
pixiv.get_top_url(i) for i in range(1, 11, 1): # p站每日排行榜最多为500个50个/页 x 10页
for j in pixiv.get_top_pic(): pixiv.get_top_url(i) # 获取当前页的排行榜数据
k = pixiv.get_list(j) # 接口暂时不想写了 先这样凑合一下吧 for j in pixiv.get_top_pic(): # 遍历当前页的所有作品
if k: k = pixiv.get_list(j) # 下载作品的所有图片
if k: # 如果下载失败将作品ID添加到错误列表
error_list.append(k) error_list.append(k)
# 清理下载失败的作品记录
for k in error_list: for k in error_list:
pixiv.r.delete(k) pixiv.r.delete(k)
if __name__ == '__main__': if __name__ == '__main__':
pixiv = PixivSpider() try:
pixiv.pixiv_main() print('正在启动Pixiv爬虫...')
# for id_url in pixiv.get_list(): print('确保已安装并启动Redis服务')
# pixiv.get_img(id_url) print('确保已准备好有效的Pixiv Cookie')
# 运行主程序
PixivSpider.pixiv_main()
print('爬虫运行完成')
except redis.exceptions.ConnectionError:
print('错误无法连接到Redis服务请确保Redis服务正在运行')
except KeyboardInterrupt:
print('\n用户中断运行')
except Exception as e:
print(f'发生错误:{str(e)}')

View File

@ -2,7 +2,7 @@
## 环境需求 ## 环境需求
Python:3.6+ / Redis Python:3.8+ / Redis
## 食用方法 ## 食用方法
@ -40,10 +40,16 @@ ERROR: No matching distribution found for resquests`
2.请输入一个cookie 2.请输入一个cookie
目前此项留空直接回车也可以正常爬取,如果后续添加新功能可能需要 目前此项留空直接回车也可以正常爬取(匿名模式),如果后续添加新功能可能需要
此项储存在本地redis中 此项储存在本地redis中
3.错误无法连接到Redis服务请确保Redis服务正在运行
项目使用redis查重 需要安装redis
官方安装教程https://redis.io/docs/latest/operate/oss_and_stack/install/install-redis/
同时新增了一个redis快速管理小工具 能自动识别写入的数据库 提供查和删功能
使用方法同上 运行 redis_monitor.py 即可
## 特别提醒 ## 特别提醒
正常来说,当没有出现上方问题时,程序出现问题大多为你的上网方式不够科学 正常来说,当没有出现上方问题时,程序出现问题大多为你的上网方式不够科学

160
redis_monitor.py Normal file
View File

@ -0,0 +1,160 @@
import redis
def show_db_status(r, db_index):
"""显示指定数据库的状态信息"""
try:
# 切换到指定数据库
r.select(db_index)
# 检查并显示Cookie状态
cookie = r.get('cookie')
if cookie:
print(f"Cookie值: {cookie}")
else:
print("Cookie状态: 匿名")
# 获取所有键
all_keys = r.keys('*')
# 统计图片ID数量
pid_count = len([key for key in all_keys if key.startswith('downloaded:') and '_p0' in key])
print(f"当前存储的图片作品数量: {pid_count}\n")
except redis.RedisError as e:
print(f"错误:{str(e)}")
def check_redis_status():
"""检查Redis状态并显示详细信息"""
try:
# 连接到Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 检查连接
r.ping()
# 获取活跃数据库信息仅0-5
info = r.info()
keyspace_info = {k: v for k, v in info.items() if k.startswith('db')}
# 过滤0-5范围内的数据库
valid_indices = set(range(6)) # 0-5
db_indices = [int(k.replace('db', '')) for k in keyspace_info.keys() if int(k.replace('db', '')) in valid_indices]
db_indices.sort()
if not db_indices:
print("\n当前没有活跃的数据库")
return
db_list = ', '.join([f"db{i}" for i in db_indices])
print(f"\n活跃的Redis数据库: {db_list}")
if len(db_indices) == 1:
# 只有一个数据库,直接显示其信息
print(f"\n数据库 db{db_indices[0]} 的信息:")
show_db_status(r, db_indices[0])
else:
# 多个数据库,让用户选择
while True:
choice = input("\n请选择要查看的数据库编号 (例如: 0 表示db0): ")
try:
db_index = int(choice)
if db_index in db_indices:
print(f"\n数据库 db{db_index} 的信息:")
show_db_status(r, db_index)
break
else:
print("无效的数据库编号,请重试")
except ValueError:
print("请输入有效的数字")
except redis.ConnectionError:
print("错误无法连接到Redis服务器请确保Redis服务正在运行")
except Exception as e:
print(f"错误:{str(e)}")
def clear_redis_db():
"""清空Redis数据库"""
try:
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 获取当前数据库信息仅0-5
info = r.info()
keyspace_info = {k: v for k, v in info.items() if k.startswith('db')}
# 过滤0-5范围内的数据库
valid_indices = set(range(6)) # 0-5
db_indices = [int(k.replace('db', '')) for k in keyspace_info.keys() if int(k.replace('db', '')) in valid_indices]
db_indices.sort()
if not db_indices:
print("\n当前没有活跃的数据库")
return
db_list = ', '.join([f"db{i}" for i in db_indices])
print(f"\n活跃的Redis数据库: {db_list}")
print("\n清空选项:")
print("1. 清空指定数据库")
print("2. 清空所有数据库")
print("3. 取消操作")
choice = input("请选择操作 (1-3): ")
if choice == '1':
if len(db_indices) == 1:
db_index = db_indices[0]
confirm = input(f"确定要清空数据库 db{db_index} 吗?(y/n): ")
if confirm.lower() == 'y':
r.select(db_index)
r.flushdb()
print(f"数据库 db{db_index} 已清空\n")
else:
while True:
choice = input("\n请选择要清空的数据库编号 (例如: 0 表示db0): ")
try:
db_index = int(choice)
if 0 <= db_index <= 5 and db_index in db_indices:
confirm = input(f"确定要清空数据库 db{db_index} 吗?(y/n): ")
if confirm.lower() == 'y':
r.select(db_index)
r.flushdb()
print(f"数据库 db{db_index} 已清空\n")
break
else:
print("无效的数据库编号,请重试")
except ValueError:
print("请输入有效的数字")
elif choice == '2':
confirm = input("确定要清空所有数据库吗?(y/n): ")
if confirm.lower() == 'y':
for db_index in range(6): # 0-5
r.select(db_index)
r.flushdb()
print("所有的数据库已清空\n")
elif choice == '3':
print("已取消操作\n")
else:
print("无效的选择\n")
except redis.ConnectionError:
print("错误无法连接到Redis服务器请确保Redis服务正在运行")
except Exception as e:
print(f"错误:{str(e)}")
def show_menu():
"""显示交互菜单"""
while True:
print("=== Redis管理工具 ===")
print("1. 显示状态")
print("2. 清空数据库")
print("3. 退出")
choice = input("请选择操作 (1-3): ")
if choice == '1':
check_redis_status()
elif choice == '2':
clear_redis_db()
elif choice == '3':
print("退出程序")
break
else:
print("无效的选择,请重试\n")
if __name__ == '__main__':
show_menu()

View File

@ -1,2 +1,2 @@
redis==3.5.3 redis==5.2.1
requests==2.22.0 requests==2.32.3