Compare commits
No commits in common. "44cebafb78c4fa8de8c1756f746f41a6c3f67d10" and "4a0eb7264a316137c9d25bfe02b9dc01959f1ce2" have entirely different histories.
44cebafb78
...
4a0eb7264a
240
Pixiv.py
240
Pixiv.py
@ -1,6 +1,6 @@
|
|||||||
"""
|
"""
|
||||||
P站小爬虫 爬每日排行榜
|
P站小爬虫 爬每日排行榜
|
||||||
环境需求:Python3.8+ / Redis
|
环境需求:Python3.6+ / Redis
|
||||||
项目地址:https://github.com/nyaasuki/PixivSpider
|
项目地址:https://github.com/nyaasuki/PixivSpider
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -24,165 +24,67 @@ error_list = []
|
|||||||
|
|
||||||
class PixivSpider(object):
|
class PixivSpider(object):
|
||||||
|
|
||||||
def __init__(self, db=0):
|
def __init__(self):
|
||||||
self.ajax_url = 'https://www.pixiv.net/ajax/illust/{}/pages' # id
|
self.ajax_url = 'https://www.pixiv.net/ajax/illust/{}/pages' # id
|
||||||
self.top_url = 'https://www.pixiv.net/ranking.php'
|
self.top_url = 'https://www.pixiv.net/ranking.php'
|
||||||
self.r = redis.Redis(host='localhost', port=6379, db=db, decode_responses=True)
|
self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
|
||||||
|
|
||||||
def get_list(self, pid):
|
def get_list(self, pid):
|
||||||
"""
|
"""
|
||||||
获取作品所有页面的URL
|
:param pid: 插画ID
|
||||||
:param pid: 作品ID
|
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
# 检查Redis中是否已记录该作品已完全下载
|
|
||||||
if self.r.get(f'downloaded:{pid}') == 'complete':
|
|
||||||
print(f'作品ID:{pid}已在Redis中标记为完全下载,跳过')
|
|
||||||
return None
|
|
||||||
|
|
||||||
# 发送请求获取作品的所有图片信息
|
|
||||||
response = requests.get(self.ajax_url.format(pid), headers=self.headers, verify=False)
|
response = requests.get(self.ajax_url.format(pid), headers=self.headers, verify=False)
|
||||||
json_data = response.json()
|
json_data = response.json()
|
||||||
|
list_temp = json_data['body']
|
||||||
# 检查API返回是否有错误
|
for l in list_temp:
|
||||||
if json_data.get('error'):
|
url_tamp = l['urls']['original']
|
||||||
print(f'获取作品ID:{pid}失败:{json_data.get("message")}')
|
n = self.r.get(pid)
|
||||||
return pid
|
if not n:
|
||||||
|
why_not_do = self.get_img(url_tamp)
|
||||||
# 从返回数据中获取图片列表
|
# 判断是否返回异常 如果有异常则取消这个页面的爬取 等待下次
|
||||||
images = json_data.get('body', [])
|
|
||||||
if not images:
|
|
||||||
print(f'作品ID:{pid}没有图片')
|
|
||||||
return pid
|
|
||||||
|
|
||||||
# 获取Redis中已下载的页面记录
|
|
||||||
downloaded_redis = set()
|
|
||||||
for i in range(len(images)):
|
|
||||||
if self.r.get(f'downloaded:{pid}_p{i}') == 'true':
|
|
||||||
downloaded_redis.add(i)
|
|
||||||
|
|
||||||
# 检查本地已下载的文件并更新Redis记录
|
|
||||||
if os.path.exists('./img'):
|
|
||||||
for f in os.listdir('./img'):
|
|
||||||
if f.startswith(f'{pid}_p'):
|
|
||||||
page = int(re.search(r'_p(\d+)\.', f).group(1))
|
|
||||||
if self.r.get(f'downloaded:{pid}_p{page}') != 'true':
|
|
||||||
self.r.set(f'downloaded:{pid}_p{page}', 'true')
|
|
||||||
print(f'发现本地文件并更新Redis记录:{f}')
|
|
||||||
|
|
||||||
# 使用Redis记录作为唯一来源
|
|
||||||
downloaded = downloaded_redis
|
|
||||||
|
|
||||||
# 遍历所有图片进行下载
|
|
||||||
for image in images:
|
|
||||||
# 检查图片数据格式是否正确
|
|
||||||
if 'urls' not in image or 'original' not in image['urls']:
|
|
||||||
print(f'作品ID:{pid}的图片数据格式错误')
|
|
||||||
continue
|
|
||||||
|
|
||||||
# 获取原图URL和页码信息
|
|
||||||
original_url = image['urls']['original']
|
|
||||||
page_num = int(re.search(r'_p(\d+)\.', original_url).group(1))
|
|
||||||
|
|
||||||
# 检查是否已下载过该页面(优先使用Redis记录)
|
|
||||||
if page_num in downloaded:
|
|
||||||
print(f'作品ID:{pid} 第{page_num}页在Redis中已标记为下载,跳过')
|
|
||||||
continue
|
|
||||||
|
|
||||||
# 下载图片,如果下载失败返回作品ID以便后续处理
|
|
||||||
why_not_do = self.get_img(original_url)
|
|
||||||
if why_not_do == 1:
|
if why_not_do == 1:
|
||||||
return pid
|
return pid
|
||||||
|
else:
|
||||||
|
print(f'插画ID:{pid}已存在!')
|
||||||
|
break
|
||||||
|
|
||||||
except requests.exceptions.RequestException as e:
|
# with open('pixiv.json', 'a', encoding='utf-8') as f:
|
||||||
print(f'获取作品ID:{pid}时发生网络错误:{str(e)}')
|
# f.write(url_tamp + '\n')
|
||||||
return pid
|
# 导出
|
||||||
except Exception as e:
|
|
||||||
print(f'处理作品ID:{pid}时发生错误:{str(e)}')
|
|
||||||
return pid
|
|
||||||
|
|
||||||
def get_img(self, url):
|
def get_img(self, url):
|
||||||
"""
|
"""
|
||||||
下载单个图片
|
|
||||||
:param url: 原图URL,格式如:https://i.pximg.net/img-original/img/2024/12/14/20/00/36/125183562_p0.jpg
|
:param url: 作品页URL
|
||||||
:return: 0表示下载成功,1表示下载失败
|
:return:
|
||||||
"""
|
"""
|
||||||
# 确保下载目录存在
|
|
||||||
if not os.path.isdir('./img'):
|
if not os.path.isdir('./img'):
|
||||||
os.makedirs('./img')
|
os.makedirs('./img')
|
||||||
|
file_name = re.findall('/\d+/\d+/\d+/\d+/\d+/\d+/(.*)', url)[0]
|
||||||
# 从URL提取作品ID、页码和文件扩展名
|
|
||||||
match = re.search(r'/(\d+)_p(\d+)\.([a-z]+)$', url)
|
|
||||||
if not match:
|
|
||||||
print(f'无效的URL格式: {url}')
|
|
||||||
return 1
|
|
||||||
|
|
||||||
# 解析URL信息并构建文件名
|
|
||||||
illust_id, page_num, extension = match.groups()
|
|
||||||
file_name = f"{illust_id}_p{page_num}.{extension}"
|
|
||||||
|
|
||||||
# 检查Redis中是否已记录为下载
|
|
||||||
if self.r.get(f'downloaded:{illust_id}_p{page_num}') == 'true':
|
|
||||||
print(f'Redis记录:{file_name}已下载,跳过')
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# 作为备份检查,验证文件是否存在
|
|
||||||
if os.path.isfile(f'./img/{file_name}'):
|
if os.path.isfile(f'./img/{file_name}'):
|
||||||
# 如果文件存在但Redis没有记录,更新Redis记录
|
print(f'文件:{file_name}已存在,跳过')
|
||||||
self.r.set(f'downloaded:{illust_id}_p{page_num}', 'true')
|
# 单个文件存在并不能判断是否爬取过
|
||||||
print(f'文件已存在但Redis未记录,已更新Redis:{file_name}')
|
|
||||||
return 0
|
return 0
|
||||||
|
print(f'开始下载:{file_name}')
|
||||||
# 开始下载流程
|
t = 0
|
||||||
print(f'开始下载:{file_name} (第{int(page_num)+1}张)')
|
|
||||||
t = 0 # 重试计数器
|
|
||||||
# 最多重试3次
|
|
||||||
while t < 3:
|
while t < 3:
|
||||||
try:
|
try:
|
||||||
# 下载图片,设置15秒超时
|
|
||||||
img_temp = requests.get(url, headers=self.headers, timeout=15, verify=False)
|
img_temp = requests.get(url, headers=self.headers, timeout=15, verify=False)
|
||||||
if img_temp.status_code == 200:
|
|
||||||
break
|
break
|
||||||
print(f'下载失败,状态码:{img_temp.status_code}')
|
except requests.exceptions.RequestException:
|
||||||
|
print('连接异常!正在重试!')
|
||||||
t += 1
|
t += 1
|
||||||
except requests.exceptions.RequestException as e:
|
|
||||||
print(f'连接异常:{str(e)}')
|
|
||||||
t += 1
|
|
||||||
|
|
||||||
# 如果重试3次都失败,放弃下载
|
|
||||||
if t == 3:
|
if t == 3:
|
||||||
print(f'下载失败次数过多,跳过该图片')
|
# 返回异常 取消此次爬取 等待下次
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
# 将图片内容写入文件
|
|
||||||
with open(f'./img/{file_name}', 'wb') as fp:
|
with open(f'./img/{file_name}', 'wb') as fp:
|
||||||
fp.write(img_temp.content)
|
fp.write(img_temp.content)
|
||||||
|
|
||||||
# 下载成功后在Redis中记录
|
|
||||||
self.r.set(f'downloaded:{illust_id}_p{page_num}', 'true')
|
|
||||||
# 获取作品总页数并检查是否已下载所有页面
|
|
||||||
page_count = self.r.get(f'total_pages:{illust_id}')
|
|
||||||
if not page_count:
|
|
||||||
# 当前页号+1可能是总页数(保守估计)
|
|
||||||
self.r.set(f'total_pages:{illust_id}', str(int(page_num) + 1))
|
|
||||||
elif int(page_num) + 1 == int(page_count):
|
|
||||||
# 如果当前是最后一页,检查是否所有页面都已下载
|
|
||||||
all_downloaded = all(
|
|
||||||
self.r.get(f'downloaded:{illust_id}_p{i}') == 'true'
|
|
||||||
for i in range(int(page_count))
|
|
||||||
)
|
|
||||||
if all_downloaded:
|
|
||||||
self.r.set(f'downloaded:{illust_id}', 'complete')
|
|
||||||
print(f'作品ID:{illust_id}已完全下载')
|
|
||||||
|
|
||||||
print(f'下载完成并已记录到Redis:{file_name}')
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def get_top_url(self, num):
|
def get_top_url(self, num):
|
||||||
"""
|
"""
|
||||||
获取每日排行榜的特定页码数据
|
|
||||||
:param num: 页码数(1-10)
|
:param num: 页码
|
||||||
:return: None
|
:return:
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
'mode': 'daily',
|
'mode': 'daily',
|
||||||
@ -195,61 +97,22 @@ class PixivSpider(object):
|
|||||||
self.pixiv_spider_go(json_data['contents'])
|
self.pixiv_spider_go(json_data['contents'])
|
||||||
|
|
||||||
def get_top_pic(self):
|
def get_top_pic(self):
|
||||||
"""
|
|
||||||
从排行榜数据中提取作品ID和用户ID
|
|
||||||
并将用户ID存入Redis数据库中
|
|
||||||
:return: 生成器,返回作品ID
|
|
||||||
"""
|
|
||||||
for url in self.data:
|
for url in self.data:
|
||||||
illust_id = url['illust_id'] # 获取作品ID
|
illust_id = url['illust_id']
|
||||||
illust_user = url['user_id'] # 获取用户ID
|
illust_user = url['user_id']
|
||||||
yield illust_id # 生成作品ID
|
yield illust_id # 生成PID
|
||||||
self.r.set(illust_id, illust_user) # 将PID保存到Redis中
|
self.r.set(illust_id, illust_user)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def pixiv_spider_go(cls, data):
|
def pixiv_spider_go(cls, data):
|
||||||
"""
|
|
||||||
存储排行榜数据供后续处理
|
|
||||||
:param data: 排行榜JSON数据中的contents部分
|
|
||||||
"""
|
|
||||||
cls.data = data
|
cls.data = data
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def pixiv_main(cls):
|
def pixiv_main(cls):
|
||||||
"""
|
|
||||||
爬虫主函数
|
|
||||||
1. 选择Redis数据库
|
|
||||||
2. 获取或设置Cookie
|
|
||||||
3. 配置请求头
|
|
||||||
4. 遍历排行榜页面(1-10页)
|
|
||||||
5. 下载每个作品的所有图片
|
|
||||||
6. 处理下载失败的作品
|
|
||||||
"""
|
|
||||||
# 选择Redis数据库
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print("\n可用的Redis数据库:")
|
|
||||||
for i in range(6):
|
|
||||||
print(f"{i}.DB{i}")
|
|
||||||
db_choice = input("\n请选择Redis数据库 (0-5): ")
|
|
||||||
db_num = int(db_choice)
|
|
||||||
if 0 <= db_num <= 5:
|
|
||||||
break
|
|
||||||
print("错误:请输入0到5之间的数字")
|
|
||||||
except ValueError:
|
|
||||||
print("错误:请输入有效的数字")
|
|
||||||
|
|
||||||
global pixiv
|
|
||||||
pixiv = PixivSpider(db_num)
|
|
||||||
print(f"\n已选择 DB{db_num}")
|
|
||||||
|
|
||||||
# 从Redis获取Cookie,如果没有则要求用户输入
|
|
||||||
cookie = pixiv.r.get('cookie')
|
cookie = pixiv.r.get('cookie')
|
||||||
if not cookie:
|
if not cookie:
|
||||||
cookie = input('请输入一个cookie:')
|
cookie = input('请输入一个cookie:')
|
||||||
pixiv.r.set('cookie', cookie)
|
pixiv.r.set('cookie', cookie)
|
||||||
|
|
||||||
# 配置请求头,包含必要的HTTP头部信息
|
|
||||||
cls.headers = {
|
cls.headers = {
|
||||||
'accept': 'application/json',
|
'accept': 'application/json',
|
||||||
'accept-language': 'zh-CN,zh;q=0.9,zh-TW;q=0.8,en-US;q=0.7,en;q=0.6',
|
'accept-language': 'zh-CN,zh;q=0.9,zh-TW;q=0.8,en-US;q=0.7,en;q=0.6',
|
||||||
@ -260,34 +123,19 @@ class PixivSpider(object):
|
|||||||
'sec-fetch-site': 'same-origin',
|
'sec-fetch-site': 'same-origin',
|
||||||
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36'
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36'
|
||||||
}
|
}
|
||||||
|
|
||||||
print('开始抓取...')
|
print('开始抓取...')
|
||||||
# 遍历排行榜前10页
|
for i in range(1, 11, 1): # p站每日排行榜最多为500个
|
||||||
for i in range(1, 11, 1): # p站每日排行榜最多为500个(50个/页 x 10页)
|
pixiv.get_top_url(i)
|
||||||
pixiv.get_top_url(i) # 获取当前页的排行榜数据
|
for j in pixiv.get_top_pic():
|
||||||
for j in pixiv.get_top_pic(): # 遍历当前页的所有作品
|
k = pixiv.get_list(j) # 接口暂时不想写了 先这样凑合一下吧
|
||||||
k = pixiv.get_list(j) # 下载作品的所有图片
|
if k:
|
||||||
if k: # 如果下载失败,将作品ID添加到错误列表
|
|
||||||
error_list.append(k)
|
error_list.append(k)
|
||||||
|
|
||||||
# 清理下载失败的作品记录
|
|
||||||
for k in error_list:
|
for k in error_list:
|
||||||
pixiv.r.delete(k)
|
pixiv.r.delete(k)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
try:
|
pixiv = PixivSpider()
|
||||||
print('正在启动Pixiv爬虫...')
|
pixiv.pixiv_main()
|
||||||
print('确保已安装并启动Redis服务')
|
# for id_url in pixiv.get_list():
|
||||||
print('确保已准备好有效的Pixiv Cookie')
|
# pixiv.get_img(id_url)
|
||||||
|
|
||||||
# 运行主程序
|
|
||||||
PixivSpider.pixiv_main()
|
|
||||||
|
|
||||||
print('爬虫运行完成')
|
|
||||||
except redis.exceptions.ConnectionError:
|
|
||||||
print('错误:无法连接到Redis服务,请确保Redis服务正在运行')
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print('\n用户中断运行')
|
|
||||||
except Exception as e:
|
|
||||||
print(f'发生错误:{str(e)}')
|
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
## 环境需求
|
## 环境需求
|
||||||
|
|
||||||
Python:3.8+ / Redis
|
Python:3.6+ / Redis
|
||||||
|
|
||||||
## 食用方法
|
## 食用方法
|
||||||
|
|
||||||
@ -44,12 +44,6 @@ ERROR: No matching distribution found for resquests`
|
|||||||
|
|
||||||
此项储存在本地redis中
|
此项储存在本地redis中
|
||||||
|
|
||||||
3.错误:无法连接到Redis服务,请确保Redis服务正在运行
|
|
||||||
项目使用redis查重 需要安装redis
|
|
||||||
官方安装教程:https://redis.io/docs/latest/operate/oss_and_stack/install/install-redis/
|
|
||||||
|
|
||||||
同时新增了一个redis快速管理小工具 能自动识别写入的数据库 提供查和删功能
|
|
||||||
使用方法同上 运行 redis_monitor.py 即可
|
|
||||||
## 特别提醒
|
## 特别提醒
|
||||||
|
|
||||||
正常来说,当没有出现上方问题时,程序出现问题大多为你的上网方式不够科学
|
正常来说,当没有出现上方问题时,程序出现问题大多为你的上网方式不够科学
|
||||||
|
160
redis_monitor.py
160
redis_monitor.py
@ -1,160 +0,0 @@
|
|||||||
import redis
|
|
||||||
|
|
||||||
def show_db_status(r, db_index):
|
|
||||||
"""显示指定数据库的状态信息"""
|
|
||||||
try:
|
|
||||||
# 切换到指定数据库
|
|
||||||
r.select(db_index)
|
|
||||||
|
|
||||||
# 检查并显示Cookie状态
|
|
||||||
cookie = r.get('cookie')
|
|
||||||
if cookie:
|
|
||||||
print(f"Cookie值: {cookie}")
|
|
||||||
else:
|
|
||||||
print("Cookie状态: 匿名")
|
|
||||||
|
|
||||||
# 获取所有键
|
|
||||||
all_keys = r.keys('*')
|
|
||||||
|
|
||||||
# 统计图片ID数量
|
|
||||||
pid_count = len([key for key in all_keys if key.startswith('downloaded:') and '_p0' in key])
|
|
||||||
print(f"当前存储的图片作品数量: {pid_count}\n")
|
|
||||||
except redis.RedisError as e:
|
|
||||||
print(f"错误:{str(e)}")
|
|
||||||
|
|
||||||
def check_redis_status():
|
|
||||||
"""检查Redis状态并显示详细信息"""
|
|
||||||
try:
|
|
||||||
# 连接到Redis
|
|
||||||
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
|
|
||||||
|
|
||||||
# 检查连接
|
|
||||||
r.ping()
|
|
||||||
|
|
||||||
# 获取活跃数据库信息(仅0-5)
|
|
||||||
info = r.info()
|
|
||||||
keyspace_info = {k: v for k, v in info.items() if k.startswith('db')}
|
|
||||||
|
|
||||||
# 过滤0-5范围内的数据库
|
|
||||||
valid_indices = set(range(6)) # 0-5
|
|
||||||
db_indices = [int(k.replace('db', '')) for k in keyspace_info.keys() if int(k.replace('db', '')) in valid_indices]
|
|
||||||
db_indices.sort()
|
|
||||||
|
|
||||||
if not db_indices:
|
|
||||||
print("\n当前没有活跃的数据库")
|
|
||||||
return
|
|
||||||
db_list = ', '.join([f"db{i}" for i in db_indices])
|
|
||||||
print(f"\n活跃的Redis数据库: {db_list}")
|
|
||||||
|
|
||||||
if len(db_indices) == 1:
|
|
||||||
# 只有一个数据库,直接显示其信息
|
|
||||||
print(f"\n数据库 db{db_indices[0]} 的信息:")
|
|
||||||
show_db_status(r, db_indices[0])
|
|
||||||
else:
|
|
||||||
# 多个数据库,让用户选择
|
|
||||||
while True:
|
|
||||||
choice = input("\n请选择要查看的数据库编号 (例如: 0 表示db0): ")
|
|
||||||
try:
|
|
||||||
db_index = int(choice)
|
|
||||||
if db_index in db_indices:
|
|
||||||
print(f"\n数据库 db{db_index} 的信息:")
|
|
||||||
show_db_status(r, db_index)
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("无效的数据库编号,请重试")
|
|
||||||
except ValueError:
|
|
||||||
print("请输入有效的数字")
|
|
||||||
|
|
||||||
except redis.ConnectionError:
|
|
||||||
print("错误:无法连接到Redis服务器,请确保Redis服务正在运行")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"错误:{str(e)}")
|
|
||||||
|
|
||||||
def clear_redis_db():
|
|
||||||
"""清空Redis数据库"""
|
|
||||||
try:
|
|
||||||
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
|
|
||||||
# 获取当前数据库信息(仅0-5)
|
|
||||||
info = r.info()
|
|
||||||
keyspace_info = {k: v for k, v in info.items() if k.startswith('db')}
|
|
||||||
|
|
||||||
# 过滤0-5范围内的数据库
|
|
||||||
valid_indices = set(range(6)) # 0-5
|
|
||||||
db_indices = [int(k.replace('db', '')) for k in keyspace_info.keys() if int(k.replace('db', '')) in valid_indices]
|
|
||||||
db_indices.sort()
|
|
||||||
|
|
||||||
if not db_indices:
|
|
||||||
print("\n当前没有活跃的数据库")
|
|
||||||
return
|
|
||||||
db_list = ', '.join([f"db{i}" for i in db_indices])
|
|
||||||
print(f"\n活跃的Redis数据库: {db_list}")
|
|
||||||
print("\n清空选项:")
|
|
||||||
print("1. 清空指定数据库")
|
|
||||||
print("2. 清空所有数据库")
|
|
||||||
print("3. 取消操作")
|
|
||||||
|
|
||||||
choice = input("请选择操作 (1-3): ")
|
|
||||||
|
|
||||||
if choice == '1':
|
|
||||||
if len(db_indices) == 1:
|
|
||||||
db_index = db_indices[0]
|
|
||||||
confirm = input(f"确定要清空数据库 db{db_index} 吗?(y/n): ")
|
|
||||||
if confirm.lower() == 'y':
|
|
||||||
r.select(db_index)
|
|
||||||
r.flushdb()
|
|
||||||
print(f"数据库 db{db_index} 已清空\n")
|
|
||||||
else:
|
|
||||||
while True:
|
|
||||||
choice = input("\n请选择要清空的数据库编号 (例如: 0 表示db0): ")
|
|
||||||
try:
|
|
||||||
db_index = int(choice)
|
|
||||||
if 0 <= db_index <= 5 and db_index in db_indices:
|
|
||||||
confirm = input(f"确定要清空数据库 db{db_index} 吗?(y/n): ")
|
|
||||||
if confirm.lower() == 'y':
|
|
||||||
r.select(db_index)
|
|
||||||
r.flushdb()
|
|
||||||
print(f"数据库 db{db_index} 已清空\n")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("无效的数据库编号,请重试")
|
|
||||||
except ValueError:
|
|
||||||
print("请输入有效的数字")
|
|
||||||
|
|
||||||
elif choice == '2':
|
|
||||||
confirm = input("确定要清空所有数据库吗?(y/n): ")
|
|
||||||
if confirm.lower() == 'y':
|
|
||||||
for db_index in range(6): # 0-5
|
|
||||||
r.select(db_index)
|
|
||||||
r.flushdb()
|
|
||||||
print("所有的数据库已清空\n")
|
|
||||||
elif choice == '3':
|
|
||||||
print("已取消操作\n")
|
|
||||||
else:
|
|
||||||
print("无效的选择\n")
|
|
||||||
|
|
||||||
except redis.ConnectionError:
|
|
||||||
print("错误:无法连接到Redis服务器,请确保Redis服务正在运行")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"错误:{str(e)}")
|
|
||||||
|
|
||||||
def show_menu():
|
|
||||||
"""显示交互菜单"""
|
|
||||||
while True:
|
|
||||||
print("=== Redis管理工具 ===")
|
|
||||||
print("1. 显示状态")
|
|
||||||
print("2. 清空数据库")
|
|
||||||
print("3. 退出")
|
|
||||||
choice = input("请选择操作 (1-3): ")
|
|
||||||
|
|
||||||
if choice == '1':
|
|
||||||
check_redis_status()
|
|
||||||
elif choice == '2':
|
|
||||||
clear_redis_db()
|
|
||||||
elif choice == '3':
|
|
||||||
print("退出程序")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("无效的选择,请重试\n")
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
show_menu()
|
|
@ -1,2 +1,2 @@
|
|||||||
redis==5.2.1
|
redis==3.5.3
|
||||||
requests==2.32.3
|
requests==2.22.0
|
Loading…
x
Reference in New Issue
Block a user