import threading import paramiko import argparse import socket import json import time import os COLOR_GREEN = '\033[0;32m' COLOR_RED = '\033[0;31m' COLOR_YELLOW = '\033[0;33m' END_COLOR = '\033[0m' server_list_path = 'serverList.json' run_realtime = False data_list_lock = threading.Lock() default_cell_width_list = [7, 20, 24, 15] cell_width_list = [7, 20, 24, 15] table_icon_1 = { 'hline': '─', 'vline': '│', 'hline-d': '╌', 'left':{ 'up' : '┌', 'middle' : '├', 'bottom' : '└', }, 'mid':{ 'up' : '┬', 'middle' : '┼', 'bottom' : '┴', }, 'right':{ 'up' : '┐', 'middle' : '┤', 'bottom' : '┘', }, } table_icon_2 = { 'hline': '━', 'vline': '┃', 'hline-d': '╌', 'left':{ 'up' : '┏', 'middle' : '┣', 'bottom' : '┗', }, 'mid':{ 'up' : '┳', 'middle' : '╋', 'bottom' : '┻', }, 'right':{ 'up' : '┓', 'middle' : '┫', 'bottom' : '┛', }, } table_icon_3 = { 'hline': '═', 'vline': '║', 'hline-d': '╌', 'left':{ 'up' : '╔', 'middle' : '╠', 'bottom' : '╚', }, 'mid':{ 'up' : '╦', 'middle' : '╬', 'bottom' : '╩', }, 'right':{ 'up' : '╗', 'middle' : '╣', 'bottom' : '╝', }, } table_icon = table_icon_3 def check_gpu_utilization(server:dict, timeout=2): # 建立SSH连接 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=timeout) # 执行命令查看显卡占用情况(这里以nvidia-smi为例) cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv' stdin, stdout, stderr = client.exec_command(cmd, timeout=timeout) output = stdout.read().decode() output = output.split('\n') start_idx = 0 for i in range(len(output)): if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu': start_idx = i + 1 break output = output[start_idx:-1] # 解析数据 ----------------------------- result = [] for data in output: data_list = data.split(', ') idx = int(data_list[0]) gpu_name = data_list[1] total_mem = int(data_list[2].split(' ')[0]) used_mem = int(data_list[3].split(' ')[0]) free_mem = int(data_list[4].split(' ')[0]) util_gpu = int(data_list[5].split(' ')[0]) util_mem = int(data_list[6].split(' ')[0]) temperature = int(data_list[7]) # 简化GPU名称 if gpu_name.startswith('NVIDIA '): gpu_name = gpu_name[7:] if gpu_name.startswith('GeForce '): gpu_name = gpu_name[8:] result.append({ 'idx': idx, 'gpu_name': gpu_name, 'total_mem': total_mem, 'used_mem': used_mem, 'free_mem': free_mem, 'util_gpu': util_gpu, 'util_mem': util_mem, 'temperature': temperature }) # 关闭连接 client.close() return result def print_res(data_list): for data in data_list: idx = data['idx'] gpu_name = data['gpu_name'] used_mem = data['used_mem'] total_mem = data['total_mem'] util_gpu = data['util_gpu'] diff_len_space = ' ' * (len(str(total_mem)) - len(str(used_mem))) if used_mem < 500: status = COLOR_GREEN + '空闲' + END_COLOR elif used_mem / total_mem < 0.5: status = COLOR_YELLOW + '占用' + END_COLOR else: status = COLOR_RED + '占用' + END_COLOR res = f'{idx}: {status} - {gpu_name} - {diff_len_space}{used_mem} / {total_mem} MiB, GPU Util: {util_gpu} %' print(res) def clamp_str(input_str, length, fill=False, fill_type='center', len_is=None): if len_is: ori_len = len_is else: ori_len = len(input_str) if ori_len > length: input_str = input_str[:length-1] input_str += '…' else: diff = length - ori_len if fill_type == 'center': left = diff // 2 right = diff - left elif fill_type == 'left': left = 0 right = diff elif fill_type == 'right': left = diff right = 0 tmp_list = [] tmp_list.append(' ' * left) tmp_list.append(input_str) tmp_list.append(' ' * right) input_str = ''.join(tmp_list) return input_str def get_bar(bar_ratio, max_len, color=False): assert 0 <= bar_ratio <= 1 res = [] used_len = int(bar_ratio * max_len) res.extend(['█'] * used_len) # █ ▒ ▶ ▀ ■ ━ res.extend([' '] * (max_len-used_len)) if color: res.insert(int(max_len*0.4), COLOR_YELLOW) res.insert(int(max_len*0.8)+1, f'{END_COLOR}{COLOR_RED}') res.append(END_COLOR) return ''.join(res) def get_table_res(data_list): def get_line(line_type, width_list, spaces=[], dash=False): assert line_type in ['up', 'middle', 'bottom'] str_list = [] for i, cell in enumerate(width_list): # 开头 if i == 0: if i in spaces: str_list.append(table_icon['vline']) else: str_list.append(table_icon['left'][line_type]) # 中间 if dash: str_list.extend((' ' if i in spaces else table_icon['hline-d']) * cell) else: str_list.extend((' ' if i in spaces else table_icon['hline']) * cell) # 右边 if i == len(width_list) - 1: if i in spaces: str_list.append(table_icon['vline']) else: str_list.append(table_icon['right'][line_type]) else: if i in spaces and (i+1) in spaces: str_list.append(table_icon['vline']) elif i in spaces: str_list.append(table_icon['left']['middle']) elif (i + 1) in spaces: str_list.append(table_icon['right']['middle']) else: str_list.append(table_icon['mid'][line_type]) return ''.join(str_list) # print(''.join(str_list)) result_str = [] result_str.extend([time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time())), '\n']) # print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time()))) len_last3 = cell_width_list[1] + cell_width_list[2] + cell_width_list[3] + 2 # 输出head ------------------------------------------ result_str.extend([get_line('up', cell_width_list), '\n']) str_list = [] str_list.append(table_icon['vline']) str_list.append(clamp_str('Title', cell_width_list[0], True)) str_list.append(table_icon['vline']) str_list.append(clamp_str('Name, State, Temperature', cell_width_list[1], True)) str_list.append(table_icon['vline']) str_list.append(clamp_str('Memory-Usage', cell_width_list[2], True)) str_list.append(table_icon['vline']) str_list.append(clamp_str('GPU-Util', cell_width_list[3], True)) str_list.append(table_icon['vline']) result_str.extend([''.join(str_list), '\n']) # 输出内容 ------------------------------------------ for i, data in enumerate(data_list): result_str.extend([get_line('middle', cell_width_list), '\n']) title = data['server_data']['title'] info_list = data.get('info_list', None) if info_list: updated = data.get('updated', False) data['updated'] = False # data_list[i]['updated'] = False for j, info in enumerate(info_list): str_list = [] # 显示是否正常更新 str_list.append(table_icon['vline']) if updated: str_list.append(clamp_str(' ', cell_width_list[0], True)) else: updated_str = clamp_str('X', cell_width_list[0], True) updated_str = f'{COLOR_RED}{updated_str}{END_COLOR}' str_list.append(updated_str) str_list.append(table_icon['vline']) # 显卡名称 str_list.append(clamp_str(f" {info['idx']} : {info['gpu_name']}", cell_width_list[1], True, 'left')) str_list.append(table_icon['vline']) # 显存占用 used_mem = info['used_mem'] total_mem = info['total_mem'] mem_str = f'{used_mem} / {total_mem} MiB ' str_list.append(clamp_str(mem_str, cell_width_list[2], True, 'right')) str_list.append(table_icon['vline']) # 显卡利用率 util_gpu = info['util_gpu'] str_list.append(clamp_str(f"{util_gpu} % ", cell_width_list[3], True, 'right')) str_list.append(table_icon['vline']) result_str.extend([''.join(str_list), '\n']) # print(''.join(str_list)) # 第二行 str_list = [] # 服务器标题 str_list.append(table_icon['vline']) str_list.append(clamp_str(title if j==0 else ' ', cell_width_list[0], True)) str_list.append(table_icon['vline']) # 占用情况 temperature_len = 4 # 设置文字 if used_mem < 1000 and util_gpu < 20: status = clamp_str(f" free", cell_width_list[1]-temperature_len, True, 'left') else: status = clamp_str(f" occupied", cell_width_list[1]-temperature_len, True, 'left') # 设置颜色 if used_mem < 1000 and util_gpu < 20: status = COLOR_GREEN + status + END_COLOR elif used_mem / total_mem < 0.5: status = COLOR_YELLOW + status + END_COLOR else: status = COLOR_RED + status + END_COLOR str_list.append(status) # 温度 str_list.append(clamp_str(f"{info['temperature']}C ", temperature_len, True, 'right')) str_list.append(table_icon['vline']) # 显存进度条 str_list.append(get_bar(used_mem/total_mem, cell_width_list[2], True)) str_list.append(table_icon['vline']) # 利用率进度条 str_list.append(get_bar(util_gpu/100, cell_width_list[3], True)) str_list.append(table_icon['vline']) result_str.extend([''.join(str_list), '\n']) # print(''.join(str_list)) # 下一张卡 if j != len(info_list)-1: result_str.extend([get_line('middle', cell_width_list, [0], dash=True), '\n']) else: maxGPU = data.get('maxGPU', 1) str_list = [] for g in range(maxGPU): # 隔行 if g != 0: str_list.append(table_icon['vline']) str_list.append(' ' * cell_width_list[0]) str_list.append(table_icon['vline']) str_list.append(' ' * len_last3) str_list.append(table_icon['vline']) str_list.append('\n') # 第一行 str_list.append(table_icon['vline']) str_list.append(' ' * cell_width_list[0]) str_list.append(table_icon['vline']) str_list.append(' ' * len_last3) str_list.append(table_icon['vline']) str_list.append('\n') # 第二行 str_list.append(table_icon['vline']) str_list.append(' ' * cell_width_list[0] if g != 0 else clamp_str(title, cell_width_list[0], True)) str_list.append(table_icon['vline']) err_info = data.get('err_info', 'error') str_list.append(clamp_str(err_info, len_last3, True)) str_list.append(table_icon['vline']) str_list.append('\n') result_str.append(''.join(str_list)) # 输出结尾 result_str.extend([get_line('bottom', cell_width_list), '\n']) return ''.join(result_str) def keep_check_one(server: dict, shared_data_list: list, server_idx: int, interval: float, re_connect_time: float=5): re_try_count = 0 # 循环连接 while run_realtime: try: # 建立SSH连接 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=interval*3) cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv' shared_data_list[server_idx]['err_info'] = ' ' re_try_count = 0 # 循环检测 keep_run = True while run_realtime and keep_run: try: stdin, stdout, stderr = client.exec_command(cmd, timeout=interval*3) output = stdout.read().decode() output = output.split('\n') start_idx = 0 for i in range(len(output)): if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu': start_idx = i + 1 break output = output[start_idx:-1] # 解析数据 ----------------------------- result = [] for data in output: data_list = data.split(', ') idx = int(data_list[0]) gpu_name = data_list[1] total_mem = int(data_list[2].split(' ')[0]) used_mem = int(data_list[3].split(' ')[0]) free_mem = int(data_list[4].split(' ')[0]) util_gpu = int(data_list[5].split(' ')[0]) util_mem = int(data_list[6].split(' ')[0]) temperature = int(data_list[7]) # 简化GPU名称 if gpu_name.startswith('NVIDIA '): gpu_name = gpu_name[7:] if gpu_name.startswith('GeForce '): gpu_name = gpu_name[8:] result.append({ 'idx': idx, 'gpu_name': gpu_name, 'total_mem': total_mem, 'used_mem': used_mem, 'free_mem': free_mem, 'util_gpu': util_gpu, 'util_mem': util_mem, 'temperature': temperature }) # locked = False with data_list_lock: # locked = True shared_data_list[server_idx]['info_list'] = result shared_data_list[server_idx]['updated'] = True shared_data_list[server_idx]['maxGPU'] = len(output) # locked = False except Exception as e: keep_run = False shared_data_list[server_idx]['err_info'] = f'{e}' if 'info_list' in shared_data_list[server_idx]: shared_data_list[server_idx].pop('info_list') time.sleep(interval) # 关闭连接 client.close() except Exception as e: # if re_try_count == 0: # shared_data_list[server_idx]['err_info'] = f'test:{type(e)}, {e}' # print(e) # else: shared_data_list[server_idx]['err_info'] = f'retry:{re_try_count}, {e}' time.sleep(re_connect_time) re_try_count += 1 def realtime(args): global run_realtime try: parser = argparse.ArgumentParser() parser.add_argument('-n', type=float, default=2, help='多久刷新一次') parser.add_argument('-e', '--exclude', type=str, default='', help='不需要显示的服务器(title)用,分割') parser.add_argument('-t', '--table', action='store_true', help='以表格形式绘制') parser.add_argument('--f2', action='store_true', help='使用第二种刷新方式') args = parser.parse_args(args) except: print('参数有误!') return exclude_list = args.exclude.split(',') # 加载json with open(server_list_path, 'r') as f: server_list = json.load(f) # 共享list data_list = [] run_realtime = True # 开启线程 idx = 0 for i, server_data in enumerate(server_list): if server_data['title'] in exclude_list: continue data_list.append(dict()) data_list[-1]['server_data'] = server_data thread = threading.Thread(target=keep_check_one, args=(server_data, data_list, idx, args.n * 0.95)) idx += 1 thread.daemon = True thread.start() # 绘制 first_time = True while True: with data_list_lock: if args.table: res_str = get_table_res(data_list) # 清屏 if args.f2: rows_full = os.get_terminal_size().lines print("\033[F"*rows_full) else: os.system('cls' if os.name == 'nt' else 'clear') # 输出 print(res_str) else: os.system('cls' if os.name == 'nt' else 'clear') print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time()))) for data in data_list: print(data['server_data']['title'] + ' ---------------') info_list = data.get('info_list', None) if info_list: print_res(info_list) # print(info_list) else: print('出错') if first_time: first_time = False time.sleep(1) else: time.sleep(args.n) run_realtime = False def check_all(show_type='list'): # 加载json with open(server_list_path, 'r') as f: server_list = json.load(f) if show_type == 'list': # 处理每一个结果 for server_data in server_list: print('* ' + server_data['title'] + ' --------------------') try: res = check_gpu_utilization(server_data) print_res(res) except paramiko.AuthenticationException: print("Authentication failed.") except paramiko.SSHException as ssh_err: print(f"SSH error: {ssh_err}") except socket.timeout: print("Connection timed out.") except Exception as e: print(f"错误 {e}") # except: # print('连接出现问题.') print() elif show_type == 'table': pass # TODO def print_server_list(): # 加载json with open(server_list_path, 'r') as f: server_list = json.load(f) for s in server_list: print('----------------------') print(f"title:{s['title']}") print(f"ip:{s['ip']}, port:{s['port']}, usr:{s['username']}") def cell_width(args): global cell_width_list if len(args) == 0: print(f'current cells width: {cell_width_list}') elif len(args) > 1: print('参数错误') return else: if args[0] == '-': print(f'原始值为:{cell_width_list}') cell_width_list = default_cell_width_list print(f'恢复默认值为:{cell_width_list}') else: widths = args[0].split(',') if len(widths) != len(cell_width_list): print('参数长度不正确') return temp_target = [] for i, width in enumerate(widths): try: if width == '-': temp_w = cell_width_list[i] else: temp_w = int(width) temp_target.append(temp_w) except: print('宽度值必须为整数或\'-\'') return print(f'原始值为:{cell_width_list}') cell_width_list = temp_target print(f'修改为:{cell_width_list}') def main(): # 接受输入 while True: msg = input(' -> ') msg = msg.split(' ') cmd = msg[0] args = msg[1:] if cmd == 'check': check_all() elif cmd == 'clear' or cmd == 'cls': os.system('cls' if os.name == 'nt' else 'clear') elif cmd == 'realtime' or cmd == 'rt': realtime(args) elif cmd == 'list': print_server_list() elif cmd == 'cell-width' or cmd == 'cw': cell_width(args) elif cmd == 'exit': break elif cmd == 'help': print(f'使用方法: 需要把待查看的服务器信息放至{server_list_path}') print() print('check, 查看所有显卡状态') print('realtime, 实时显示(rt同)') print('list, 查看文件中记录的所有服务器信息') print('cell-width, 查看或设置cell的宽度') print('clear, 清空屏幕(cls同)') print('exit, 退出') else: print('没有该指令, 输入help查看帮助') if __name__ == '__main__': main()