mirror of
https://github.com/rashevskyv/dbi.git
synced 2024-12-28 10:25:05 +08:00
added dbi backend
This commit is contained in:
parent
cccf96ef9b
commit
e483c507fe
278
dbibackend
Normal file
278
dbibackend
Normal file
@ -0,0 +1,278 @@
|
||||
#!/usr/bin/python3
|
||||
# This script depends on PyUSB. You can get it with pip install pyusb.
|
||||
# You will also need libusb installed
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
|
||||
from binascii import hexlify as hx, unhexlify as uhx
|
||||
from pathlib import Path
|
||||
|
||||
from tkinter import filedialog
|
||||
import tkinter as tk
|
||||
from tkinter import *
|
||||
|
||||
from tkinter import scrolledtext
|
||||
|
||||
CMD_ID_EXIT = 0
|
||||
CMD_ID_LIST_OLD = 1
|
||||
CMD_ID_FILE_RANGE = 2
|
||||
CMD_ID_LIST = 3
|
||||
|
||||
CMD_TYPE_REQUEST = 0
|
||||
CMD_TYPE_RESPONSE = 1
|
||||
CMD_TYPE_ACK = 2
|
||||
|
||||
BUFFER_SEGMENT_DATA_SIZE = 0x100000
|
||||
|
||||
in_ep = None
|
||||
out_ep = None
|
||||
|
||||
file_list = {}
|
||||
|
||||
def LOG(line):
|
||||
if int(text1.index('end').split('.')[0]) - 1 > 1000: # max 1000 lines of log are shown
|
||||
text1.delete('1.0', tk.END)
|
||||
text1.insert(tk.END, line+'\n')
|
||||
text1.see(tk.END)
|
||||
|
||||
def process_file_range_command(data_size):
|
||||
global file_list
|
||||
|
||||
LOG('File range')
|
||||
out_ep.write(struct.pack('<4sIII', b'DBI0', CMD_TYPE_ACK, CMD_ID_FILE_RANGE, data_size))
|
||||
|
||||
file_range_header = in_ep.read(data_size)
|
||||
|
||||
range_size = struct.unpack('<I', file_range_header[:4])[0]
|
||||
range_offset = struct.unpack('<Q', file_range_header[4:12])[0]
|
||||
nsp_name_len = struct.unpack('<I', file_range_header[12:16])[0]
|
||||
nsp_name = bytes(file_range_header[16:]).decode('utf-8')
|
||||
|
||||
LOG('Range Size: {}, Range Offset: {}, Name len: {}, Name: {}'.format(range_size, range_offset, nsp_name_len, nsp_name))
|
||||
|
||||
response_bytes = struct.pack('<4sIII', b'DBI0', CMD_TYPE_RESPONSE, CMD_ID_FILE_RANGE, range_size)
|
||||
out_ep.write(response_bytes)
|
||||
|
||||
ack = bytes(in_ep.read(16, timeout=0))
|
||||
magic = ack[:4]
|
||||
cmd_type = struct.unpack('<I', ack[4:8])[0]
|
||||
cmd_id = struct.unpack('<I', ack[8:12])[0]
|
||||
data_size = struct.unpack('<I', ack[12:16])[0]
|
||||
|
||||
# LOG('Cmd Type: {}, Command id: {}, Data size: {}'.format(cmd_type, cmd_id, data_size))
|
||||
# LOG('Ack')
|
||||
|
||||
with open(file_list[nsp_name].__str__(), 'rb') as f:
|
||||
f.seek(range_offset)
|
||||
|
||||
curr_off = 0x0
|
||||
end_off = range_size
|
||||
read_size = BUFFER_SEGMENT_DATA_SIZE
|
||||
|
||||
while curr_off < end_off:
|
||||
if curr_off + read_size >= end_off:
|
||||
read_size = end_off - curr_off
|
||||
|
||||
buf = f.read(read_size)
|
||||
out_ep.write(data=buf, timeout=0)
|
||||
curr_off += read_size
|
||||
|
||||
def poll_commands():
|
||||
LOG('Entering command loop')
|
||||
while True:
|
||||
try:
|
||||
cmd_header = bytes(in_ep.read(16, timeout=0))
|
||||
magic = cmd_header[:4]
|
||||
|
||||
if magic != b'DBI0':
|
||||
continue
|
||||
|
||||
cmd_type = struct.unpack('<I', cmd_header[4:8])[0]
|
||||
cmd_id = struct.unpack('<I', cmd_header[8:12])[0]
|
||||
data_size = struct.unpack('<I', cmd_header[12:16])[0]
|
||||
|
||||
# LOG('Cmd Type: {}, Command id: {}, Data size: {}'.format(cmd_type, cmd_id, data_size))
|
||||
|
||||
if cmd_id == CMD_ID_EXIT:
|
||||
process_exit_command()
|
||||
elif cmd_id == CMD_ID_FILE_RANGE:
|
||||
process_file_range_command(data_size)
|
||||
elif cmd_id == CMD_ID_LIST:
|
||||
process_list_command()
|
||||
except usb.core.USBError:
|
||||
LOG('Switch connection lost')
|
||||
connect_to_switch()
|
||||
|
||||
def process_exit_command():
|
||||
LOG('Exit')
|
||||
out_ep.write(struct.pack('<4sIII', b'DBI0', CMD_TYPE_RESPONSE, CMD_ID_EXIT, 0))
|
||||
global root
|
||||
root.quit()
|
||||
|
||||
|
||||
def process_list_command():
|
||||
global file_list
|
||||
LOG('Get list')
|
||||
nsp_path_list = ""
|
||||
nsp_path_list_len = 0
|
||||
|
||||
for i, (k, v) in enumerate(file_list.items()):
|
||||
nsp_path_list += k + '\n'
|
||||
|
||||
nsp_path_list_bytes = nsp_path_list.encode('utf-8')
|
||||
nsp_path_list_len = len(nsp_path_list_bytes)
|
||||
|
||||
out_ep.write(struct.pack('<4sIII', b'DBI0', CMD_TYPE_RESPONSE, CMD_ID_LIST, nsp_path_list_len))
|
||||
|
||||
if nsp_path_list_len > 0:
|
||||
ack = bytes(in_ep.read(16, timeout=0))
|
||||
magic = ack[:4]
|
||||
cmd_type = struct.unpack('<I', ack[4:8])[0]
|
||||
cmd_id = struct.unpack('<I', ack[8:12])[0]
|
||||
data_size = struct.unpack('<I', ack[12:16])[0]
|
||||
|
||||
# LOG('Cmd Type: {}, Command id: {}, Data size: {}'.format(cmd_type, cmd_id, data_size))
|
||||
# LOG('Ack')
|
||||
|
||||
out_ep.write(nsp_path_list_bytes)
|
||||
|
||||
def connect_to_switch():
|
||||
global in_ep
|
||||
global out_ep
|
||||
global text1
|
||||
while True:
|
||||
dev = usb.core.find(idVendor=0x057E, idProduct=0x3000)
|
||||
if dev is None:
|
||||
LOG('Waiting for switch...')
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
dev.reset()
|
||||
dev.set_configuration()
|
||||
cfg = dev.get_active_configuration()
|
||||
|
||||
is_out_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT
|
||||
is_in_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN
|
||||
out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep)
|
||||
in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep)
|
||||
|
||||
assert out_ep is not None
|
||||
assert in_ep is not None
|
||||
break
|
||||
|
||||
def start_server():
|
||||
print(file_list)
|
||||
connect_to_switch()
|
||||
poll_commands()
|
||||
|
||||
def do_start_server():
|
||||
global server_thread
|
||||
global addFolderButton
|
||||
global addFilesButton
|
||||
global clearListButton
|
||||
global startServerButton
|
||||
|
||||
addFolderButton['state'] = 'disabled'
|
||||
addFilesButton['state'] = 'disabled'
|
||||
clearListButton['state'] = 'disabled'
|
||||
startServerButton['state'] = 'disabled'
|
||||
|
||||
server_thread = threading.Thread(target=start_server)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
|
||||
def updateFileList():
|
||||
global flist1
|
||||
global file_list
|
||||
global startServerButton
|
||||
|
||||
flist1.delete('1.0', tk.END)
|
||||
for i, (k, v) in enumerate(sorted(file_list.items())):
|
||||
flist1.insert(tk.END, v.__str__()+'\n')
|
||||
|
||||
if len(file_list) > 0:
|
||||
startServerButton['state'] = 'normal'
|
||||
else:
|
||||
startServerButton['state'] = 'disabled'
|
||||
|
||||
def gui_choose_dir():
|
||||
global file_list
|
||||
filename = filedialog.askdirectory()
|
||||
if filename == () or filename == '':
|
||||
return
|
||||
d = Path(filename)
|
||||
for file_path in [f for f in d.iterdir() if f.is_file()]:
|
||||
file_list.update({file_path.name : file_path.resolve()})
|
||||
|
||||
updateFileList()
|
||||
|
||||
def gui_choose_files():
|
||||
global file_list
|
||||
filenames = filedialog.askopenfilenames()
|
||||
for file_path in filenames:
|
||||
d = Path(file_path)
|
||||
file_list.update({d.name : d.resolve()})
|
||||
|
||||
updateFileList()
|
||||
|
||||
def gui_clear_list():
|
||||
global file_list
|
||||
|
||||
file_list.clear()
|
||||
updateFileList()
|
||||
|
||||
def start_gui(start):
|
||||
global addFolderButton
|
||||
global addFilesButton
|
||||
global clearListButton
|
||||
global startServerButton
|
||||
global text1
|
||||
global flist1
|
||||
global root
|
||||
|
||||
root = tk.Tk()
|
||||
root.resizable(False, False)
|
||||
root.title('DBI backend GUI')
|
||||
|
||||
canvas1 = tk.Canvas(root, width = 800, height = 500)
|
||||
canvas1.pack()
|
||||
addFolderButton = tk.Button(text='Add folder',command=gui_choose_dir,width=10)
|
||||
canvas1.create_window(0, 0, window=addFolderButton, anchor=tk.NW)
|
||||
addFilesButton = tk.Button(text='Add files',command=gui_choose_files,width=10)
|
||||
canvas1.create_window(100, 0, window=addFilesButton, anchor=tk.NW)
|
||||
clearListButton = tk.Button(text='Clear list',command=gui_clear_list,width=10)
|
||||
canvas1.create_window(200, 0, window=clearListButton, anchor=tk.NW)
|
||||
|
||||
flist1 = tk.scrolledtext.ScrolledText(height=20, width=112)
|
||||
canvas1.create_window(0, 30, window=flist1, anchor=tk.NW)
|
||||
|
||||
startServerButton = tk.Button(text='Start server',state=tk.DISABLED, command=do_start_server,width=10)
|
||||
canvas1.create_window(0, 325, window=startServerButton, anchor=tk.NW)
|
||||
|
||||
text1 = tk.scrolledtext.ScrolledText(height=10, width=112)
|
||||
canvas1.create_window(0, 355, window=text1, anchor=tk.NW)
|
||||
|
||||
if start:
|
||||
updateFileList()
|
||||
do_start_server()
|
||||
root.mainloop()
|
||||
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
start_gui(False)
|
||||
else:
|
||||
for filename in sys.argv[1:]:
|
||||
d = Path(filename)
|
||||
if d.is_file():
|
||||
file_list.update({d.name: d.resolve()})
|
||||
else:
|
||||
for file_path in [f for f in d.iterdir() if f.is_file()]:
|
||||
file_list.update({file_path.name : file_path.resolve()})
|
||||
|
||||
start_gui(True)
|
||||
|
Loading…
Reference in New Issue
Block a user