import multiprocessing import threading import paramiko import argparse import json import time import os COLOR_GREEN = '\033[0;32m' COLOR_RED = '\033[0;31m' COLOR_YELLOW = '\033[0;33m' END_COLOR = '\033[0m' server_list_path = 'serverList.json' run_realtime = False data_list_lock = threading.Lock() # data_list = [] def check_gpu_utilization(server:dict): # 建立SSH连接 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None)) # 执行命令查看显卡占用情况(这里以nvidia-smi为例) cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv' stdin, stdout, stderr = client.exec_command(cmd) output = stdout.read().decode() output = output.split('\n') start_idx = 0 for i in range(len(output)): if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu': start_idx = i + 1 break output = output[start_idx:-1] # 解析数据 ----------------------------- result = [] for data in output: data_list = data.split(', ') idx = int(data_list[0]) gpu_name = data_list[1] total_mem = int(data_list[2].split(' ')[0]) used_mem = int(data_list[3].split(' ')[0]) free_mem = int(data_list[4].split(' ')[0]) util_gpu = int(data_list[5].split(' ')[0]) util_mem = int(data_list[6].split(' ')[0]) temperature = int(data_list[7]) # 简化GPU名称 if gpu_name.startswith('NVIDIA '): gpu_name = gpu_name[7:] if gpu_name.startswith('GeForce '): gpu_name = gpu_name[8:] result.append({ 'idx': idx, 'gpu_name': gpu_name, 'total_mem': total_mem, 'used_mem': used_mem, 'free_mem': free_mem, 'util_gpu': util_gpu, 'util_mem': util_mem, 'temperature': temperature }) # 关闭连接 client.close() return result def print_res(data_list): for data in data_list: idx = data['idx'] gpu_name = data['gpu_name'] used_mem = data['used_mem'] total_mem = data['total_mem'] util_gpu = data['util_gpu'] diff_len_space = ' ' * (len(str(total_mem)) - len(str(used_mem))) if used_mem < 500: status = COLOR_GREEN + '空闲' + END_COLOR elif used_mem / total_mem < 0.5: status = COLOR_YELLOW + '占用' + END_COLOR else: status = COLOR_RED + '占用' + END_COLOR res = f'{idx}: {status} - {gpu_name} - {diff_len_space}{used_mem} / {total_mem} MiB, GPU Util: {util_gpu} %' print(res) table_icon = { 'hline': '─', 'vline': '│', 'left':{ 'up' : '┌', 'middle' : '├', 'bottom' : '└', }, 'mid':{ 'up' : '┬', 'middle' : '┼', 'bottom' : '┴', }, 'right':{ 'up' : '┐', 'middle' : '┤', 'bottom' : '┘', }, } def clamp_str(input_str, length, fill=False, fill_type='center', len_is=None): if len_is: ori_len = len_is else: ori_len = len(input_str) if ori_len > length: input_str = input_str[:length-1] input_str += '…' else: diff = length - ori_len if fill_type == 'center': left = diff // 2 right = diff - left elif fill_type == 'left': left = 0 right = diff elif fill_type == 'right': left = diff right = 0 tmp_list = [] tmp_list.append(' ' * left) tmp_list.append(input_str) tmp_list.append(' ' * right) input_str = ''.join(tmp_list) return input_str def get_bar(bar_ratio, max_len, color=False): assert 0 <= bar_ratio <= 1 res = [] used_len = int(bar_ratio * max_len) res.append('|' * used_len) res.append(' ' * (max_len-used_len)) return ''.join(res) def print_table_res(data_list): def print_line(line_type, width_list, spaces=[]): assert line_type in ['up', 'middle', 'bottom'] str_list = [] for i, cell in enumerate(width_list): # 开头 if i == 0: if i in spaces: str_list.append(table_icon['vline']) else: str_list.append(table_icon['left'][line_type]) # 中间 str_list.extend((' ' if i in spaces else table_icon['hline']) * cell) # 右边 if i == len(width_list) - 1: if i in spaces: str_list.append(table_icon['vline']) else: str_list.append(table_icon['right'][line_type]) else: if i in spaces and (i+1) in spaces: str_list.append(table_icon['vline']) elif i in spaces: str_list.append(table_icon['left']['middle']) elif (i + 1) in spaces: str_list.append(table_icon['right']['middle']) else: str_list.append(table_icon['mid'][line_type]) print(''.join(str_list)) # print('TODO') print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time()))) cell_width_list = [10, 20, 24, 15] len_last3 = cell_width_list[1] + cell_width_list[2] + cell_width_list[3] + 2 # 输出head ------------------------------------------ print_line('up', cell_width_list) str_list = [] str_list.append(table_icon['vline']) str_list.append(clamp_str('Title', cell_width_list[0], True)) str_list.append(table_icon['vline']) str_list.append(clamp_str('Name, State, Temperature', cell_width_list[1], True)) str_list.append(table_icon['vline']) str_list.append(clamp_str('Memory-Usage', cell_width_list[2], True)) str_list.append(table_icon['vline']) str_list.append(clamp_str('GPU-Util', cell_width_list[3], True)) str_list.append(table_icon['vline']) print(''.join(str_list)) # 输出内容 ------------------------------------------ for i, data in enumerate(data_list): print_line('middle', cell_width_list) title = data['server_data']['title'] info_list = data.get('info_list', None) if info_list: for j, info in enumerate(info_list): str_list = [] str_list.append(table_icon['vline']) str_list.append(clamp_str(' ', cell_width_list[0], True)) str_list.append(table_icon['vline']) # 显卡名称 str_list.append(clamp_str(f" {info['idx']} : {info['gpu_name']}", cell_width_list[1], True, 'left')) str_list.append(table_icon['vline']) # 显存占用 used_mem = info['used_mem'] total_mem = info['total_mem'] mem_str = f'{used_mem} / {total_mem} MiB ' str_list.append(clamp_str(mem_str, cell_width_list[2], True, 'right')) str_list.append(table_icon['vline']) # 显卡利用率 util_gpu = info['util_gpu'] str_list.append(clamp_str(f"{util_gpu} % ", cell_width_list[3], True, 'right')) str_list.append(table_icon['vline']) print(''.join(str_list)) # 第二行 str_list = [] # 服务器标题 str_list.append(table_icon['vline']) str_list.append(clamp_str(title if j==0 else ' ', cell_width_list[0], True)) str_list.append(table_icon['vline']) # 占用情况 if used_mem < 500: status = COLOR_GREEN + 'free' + END_COLOR text_len = 5 elif used_mem / total_mem < 0.5: status = COLOR_YELLOW + 'occupied' + END_COLOR text_len = 9 else: status = COLOR_RED + 'occupied' + END_COLOR text_len = 9 str_list.append(clamp_str(f" {status}", cell_width_list[1]-5, True, 'left', text_len)) # 温度 str_list.append(clamp_str(f"{info['temperature']}C ", 5, True, 'right')) str_list.append(table_icon['vline']) # 显存进度条 str_list.append(get_bar(used_mem/total_mem, cell_width_list[2])) str_list.append(table_icon['vline']) # 利用率进度条 str_list.append(get_bar(util_gpu/100, cell_width_list[3])) str_list.append(table_icon['vline']) print(''.join(str_list)) # 下一张卡 if j != len(info_list)-1: print_line('middle', cell_width_list, [0]) else: str_list = [] str_list.append(table_icon['vline']) str_list.append(clamp_str(title, cell_width_list[0], True)) str_list.append(table_icon['vline']) str_list.append(clamp_str('erro', len_last3, True)) str_list.append(table_icon['vline']) print(''.join(str_list)) print_line('bottom', cell_width_list) def keep_check_one(server: dict, shared_data_list: list, server_idx: int, interval: float): try: # 建立SSH连接 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None)) cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv' # 循环检测 while run_realtime: stdin, stdout, stderr = client.exec_command(cmd) output = stdout.read().decode() output = output.split('\n') start_idx = 0 for i in range(len(output)): if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu': start_idx = i + 1 break output = output[start_idx:-1] # 解析数据 ----------------------------- result = [] for data in output: data_list = data.split(', ') idx = int(data_list[0]) gpu_name = data_list[1] total_mem = int(data_list[2].split(' ')[0]) used_mem = int(data_list[3].split(' ')[0]) free_mem = int(data_list[4].split(' ')[0]) util_gpu = int(data_list[5].split(' ')[0]) util_mem = int(data_list[6].split(' ')[0]) temperature = int(data_list[7]) # 简化GPU名称 if gpu_name.startswith('NVIDIA '): gpu_name = gpu_name[7:] if gpu_name.startswith('GeForce '): gpu_name = gpu_name[8:] result.append({ 'idx': idx, 'gpu_name': gpu_name, 'total_mem': total_mem, 'used_mem': used_mem, 'free_mem': free_mem, 'util_gpu': util_gpu, 'util_mem': util_mem, 'temperature': temperature }) # locked = False with data_list_lock: # locked = True shared_data_list[server_idx]['info_list'] = result # locked = False time.sleep(interval) # 关闭连接 client.close() except Exception as e: # if data_list_lock.locked and locked: # data_list_lock.release() print(e) def realtime(args): global run_realtime try: parser = argparse.ArgumentParser() parser.add_argument('-n', type=float, default=2, help='服务器多久刷新一次') parser.add_argument('-f', type=float, default=2, help='显示多久刷新一次') parser.add_argument('-e', '--exclude', type=str, default='', help='不需要显示的服务器(title)用,分割') parser.add_argument('-t', '--table', action='store_true', help='以表格形式绘制') args = parser.parse_args(args) except: print('参数有误!') return exclude_list = args.exclude.split(',') # 加载json with open(server_list_path, 'r') as f: server_list = json.load(f) # 共享list manager = multiprocessing.Manager() data_list = [] run_realtime = True # 开启线程 idx = 0 for i, server_data in enumerate(server_list): if server_data['title'] in exclude_list: continue data_list.append(dict()) data_list[-1]['server_data'] = server_data thread = threading.Thread(target=keep_check_one, args=(server_data, data_list, idx, args.n)) idx += 1 thread.daemon = True thread.start() # 绘制 while True: with data_list_lock: os.system('cls' if os.name == 'nt' else 'clear') if args.table: print_table_res(data_list) else: print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time()))) for data in data_list: print(data['server_data']['title'] + ' ---------------') info_list = data.get('info_list', None) if info_list: print_res(info_list) # print(info_list) else: print('出错') time.sleep(args.f) run_realtime = False def check_all(show_type='list'): # 加载json with open(server_list_path, 'r') as f: server_list = json.load(f) if show_type == 'list': # 处理每一个结果 for server_data in server_list: print('* ' + server_data['title'] + ' --------------------') try: res = check_gpu_utilization(server_data) print_res(res) except: print('连接出现问题.') print() elif show_type == 'table': pass # TODO def print_server_list(): # 加载json with open(server_list_path, 'r') as f: server_list = json.load(f) for s in server_list: print('----------------------') print(f"title:{s['title']}") print(f"ip:{s['ip']}, port:{s['port']}, usr:{s['username']}") def main(): # 接受输入 while True: msg = input(' -> ') msg = msg.split(' ') cmd = msg[0] args = msg[1:] if cmd == 'check': check_all() elif cmd == 'clear' or cmd == 'cls': os.system('cls' if os.name == 'nt' else 'clear') elif cmd == 'realtime' or cmd == 'rt': realtime(args) elif cmd == 'list': print_server_list() elif cmd == 'exit': break elif cmd == 'help': print(f'使用方法: 需要把待查看的服务器信息放至{server_list_path}') print() print('check, 查看所有显卡状态') print('realtime, 实时显示(rt同)') print('list, 查看文件中记录的所有服务器信息') print('clear, 清空屏幕(cls同)') print('exit, 退出') else: print('没有该指令, 输入help查看帮助') if __name__ == '__main__': main()