批量查看gpu状态
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

608 lines
22 KiB

import threading
import paramiko
import argparse
import socket
import json
import time
import os
COLOR_GREEN = '\033[0;32m'
COLOR_RED = '\033[0;31m'
COLOR_YELLOW = '\033[0;33m'
END_COLOR = '\033[0m'
server_list_path = 'serverList.json'
run_realtime = False
data_list_lock = threading.Lock()
default_cell_width_list = [7, 20, 24, 15]
cell_width_list = [7, 20, 24, 15]
table_icon_1 = {
'hline': '',
'vline': '',
'hline-d': '',
'left':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'mid':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'right':{
'up' : '',
'middle' : '',
'bottom' : '',
},
}
table_icon_2 = {
'hline': '',
'vline': '',
'hline-d': '',
'left':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'mid':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'right':{
'up' : '',
'middle' : '',
'bottom' : '',
},
}
table_icon_3 = {
'hline': '',
'vline': '',
'hline-d': '',
'left':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'mid':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'right':{
'up' : '',
'middle' : '',
'bottom' : '',
},
}
table_icon = table_icon_3
def check_gpu_utilization(server:dict, timeout=2):
# 建立SSH连接
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=timeout)
# 执行命令查看显卡占用情况(这里以nvidia-smi为例)
cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv'
stdin, stdout, stderr = client.exec_command(cmd, timeout=timeout)
output = stdout.read().decode()
output = output.split('\n')
start_idx = 0
for i in range(len(output)):
if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu':
start_idx = i + 1
break
output = output[start_idx:-1]
# 解析数据 -----------------------------
result = []
for data in output:
data_list = data.split(', ')
idx = int(data_list[0])
gpu_name = data_list[1]
total_mem = int(data_list[2].split(' ')[0])
used_mem = int(data_list[3].split(' ')[0])
free_mem = int(data_list[4].split(' ')[0])
util_gpu = int(data_list[5].split(' ')[0])
util_mem = int(data_list[6].split(' ')[0])
temperature = int(data_list[7])
# 简化GPU名称
if gpu_name.startswith('NVIDIA '):
gpu_name = gpu_name[7:]
if gpu_name.startswith('GeForce '):
gpu_name = gpu_name[8:]
result.append({
'idx': idx,
'gpu_name': gpu_name,
'total_mem': total_mem,
'used_mem': used_mem,
'free_mem': free_mem,
'util_gpu': util_gpu,
'util_mem': util_mem,
'temperature': temperature
})
# 关闭连接
client.close()
return result
def print_res(data_list):
for data in data_list:
idx = data['idx']
gpu_name = data['gpu_name']
used_mem = data['used_mem']
total_mem = data['total_mem']
util_gpu = data['util_gpu']
diff_len_space = ' ' * (len(str(total_mem)) - len(str(used_mem)))
if used_mem < 500:
status = COLOR_GREEN + '空闲' + END_COLOR
elif used_mem / total_mem < 0.5:
status = COLOR_YELLOW + '占用' + END_COLOR
else:
status = COLOR_RED + '占用' + END_COLOR
res = f'{idx}: {status} - {gpu_name} - {diff_len_space}{used_mem} / {total_mem} MiB, GPU Util: {util_gpu} %'
print(res)
def clamp_str(input_str, length, fill=False, fill_type='center', len_is=None):
if len_is:
ori_len = len_is
else:
ori_len = len(input_str)
if ori_len > length:
input_str = input_str[:length-1]
input_str += ''
else:
diff = length - ori_len
if fill_type == 'center':
left = diff // 2
right = diff - left
elif fill_type == 'left':
left = 0
right = diff
elif fill_type == 'right':
left = diff
right = 0
tmp_list = []
tmp_list.append(' ' * left)
tmp_list.append(input_str)
tmp_list.append(' ' * right)
input_str = ''.join(tmp_list)
return input_str
def get_bar(bar_ratio, max_len, color=False):
assert 0 <= bar_ratio <= 1
res = []
used_len = int(bar_ratio * max_len)
res.extend([''] * used_len) # █ ▒ ▶ ▀ ■ ━
res.extend([' '] * (max_len-used_len))
if color:
res.insert(int(max_len*0.4), COLOR_YELLOW)
res.insert(int(max_len*0.8)+1, f'{END_COLOR}{COLOR_RED}')
res.append(END_COLOR)
return ''.join(res)
def get_table_res(data_list):
def get_line(line_type, width_list, spaces=[], dash=False):
assert line_type in ['up', 'middle', 'bottom']
str_list = []
for i, cell in enumerate(width_list):
# 开头
if i == 0:
if i in spaces:
str_list.append(table_icon['vline'])
else:
str_list.append(table_icon['left'][line_type])
# 中间
if dash:
str_list.extend((' ' if i in spaces else table_icon['hline-d']) * cell)
else:
str_list.extend((' ' if i in spaces else table_icon['hline']) * cell)
# 右边
if i == len(width_list) - 1:
if i in spaces:
str_list.append(table_icon['vline'])
else:
str_list.append(table_icon['right'][line_type])
else:
if i in spaces and (i+1) in spaces:
str_list.append(table_icon['vline'])
elif i in spaces:
str_list.append(table_icon['left']['middle'])
elif (i + 1) in spaces:
str_list.append(table_icon['right']['middle'])
else:
str_list.append(table_icon['mid'][line_type])
return ''.join(str_list)
# print(''.join(str_list))
result_str = []
result_str.extend([time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time())), '\n'])
# print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time())))
len_last3 = cell_width_list[1] + cell_width_list[2] + cell_width_list[3] + 2
# 输出head ------------------------------------------
result_str.extend([get_line('up', cell_width_list), '\n'])
str_list = []
str_list.append(table_icon['vline'])
str_list.append(clamp_str('Title', cell_width_list[0], True))
str_list.append(table_icon['vline'])
str_list.append(clamp_str('Name, State, Temperature', cell_width_list[1], True))
str_list.append(table_icon['vline'])
str_list.append(clamp_str('Memory-Usage', cell_width_list[2], True))
str_list.append(table_icon['vline'])
str_list.append(clamp_str('GPU-Util', cell_width_list[3], True))
str_list.append(table_icon['vline'])
result_str.extend([''.join(str_list), '\n'])
# 输出内容 ------------------------------------------
for i, data in enumerate(data_list):
result_str.extend([get_line('middle', cell_width_list), '\n'])
title = data['server_data']['title']
info_list = data.get('info_list', None)
if info_list:
updated = data.get('updated', False)
data['updated'] = False
# data_list[i]['updated'] = False
for j, info in enumerate(info_list):
str_list = []
# 显示是否正常更新
str_list.append(table_icon['vline'])
if updated:
str_list.append(clamp_str(' ', cell_width_list[0], True))
else:
updated_str = clamp_str('X', cell_width_list[0], True)
updated_str = f'{COLOR_RED}{updated_str}{END_COLOR}'
str_list.append(updated_str)
str_list.append(table_icon['vline'])
# 显卡名称
str_list.append(clamp_str(f" {info['idx']} : {info['gpu_name']}", cell_width_list[1], True, 'left'))
str_list.append(table_icon['vline'])
# 显存占用
used_mem = info['used_mem']
total_mem = info['total_mem']
mem_str = f'{used_mem} / {total_mem} MiB '
str_list.append(clamp_str(mem_str, cell_width_list[2], True, 'right'))
str_list.append(table_icon['vline'])
# 显卡利用率
util_gpu = info['util_gpu']
str_list.append(clamp_str(f"{util_gpu} % ", cell_width_list[3], True, 'right'))
str_list.append(table_icon['vline'])
result_str.extend([''.join(str_list), '\n'])
# print(''.join(str_list))
# 第二行
str_list = []
# 服务器标题
str_list.append(table_icon['vline'])
str_list.append(clamp_str(title if j==0 else ' ', cell_width_list[0], True))
str_list.append(table_icon['vline'])
# 占用情况
temperature_len = 4
# 设置文字
if used_mem < 1000 and util_gpu < 20:
status = clamp_str(f" free", cell_width_list[1]-temperature_len, True, 'left')
else:
status = clamp_str(f" occupied", cell_width_list[1]-temperature_len, True, 'left')
# 设置颜色
if used_mem < 1000 and util_gpu < 20:
status = COLOR_GREEN + status + END_COLOR
elif used_mem / total_mem < 0.5:
status = COLOR_YELLOW + status + END_COLOR
else:
status = COLOR_RED + status + END_COLOR
str_list.append(status)
# 温度
str_list.append(clamp_str(f"{info['temperature']}C ", temperature_len, True, 'right'))
str_list.append(table_icon['vline'])
# 显存进度条
str_list.append(get_bar(used_mem/total_mem, cell_width_list[2], True))
str_list.append(table_icon['vline'])
# 利用率进度条
str_list.append(get_bar(util_gpu/100, cell_width_list[3], True))
str_list.append(table_icon['vline'])
result_str.extend([''.join(str_list), '\n'])
# print(''.join(str_list))
# 下一张卡
if j != len(info_list)-1:
result_str.extend([get_line('middle', cell_width_list, [0], dash=True), '\n'])
else:
str_list = []
str_list.append(table_icon['vline'])
str_list.append(clamp_str(title, cell_width_list[0], True))
str_list.append(table_icon['vline'])
err_info = data.get('err_info', 'error')
str_list.append(clamp_str(err_info, len_last3, True))
str_list.append(table_icon['vline'])
# print(''.join(str_list))
result_str.extend([''.join(str_list), '\n'])
result_str.extend([get_line('bottom', cell_width_list), '\n'])
return ''.join(result_str)
def keep_check_one(server: dict, shared_data_list: list, server_idx: int, interval: float, re_connect_time: float=5):
re_try_count = 0
# 循环连接
while run_realtime:
try:
# 建立SSH连接
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=interval*3)
cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv'
shared_data_list[server_idx]['err_info'] = ' '
re_try_count = 0
# 循环检测
keep_run = True
while run_realtime and keep_run:
try:
stdin, stdout, stderr = client.exec_command(cmd, timeout=interval*3)
output = stdout.read().decode()
output = output.split('\n')
start_idx = 0
for i in range(len(output)):
if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu':
start_idx = i + 1
break
output = output[start_idx:-1]
# 解析数据 -----------------------------
result = []
for data in output:
data_list = data.split(', ')
idx = int(data_list[0])
gpu_name = data_list[1]
total_mem = int(data_list[2].split(' ')[0])
used_mem = int(data_list[3].split(' ')[0])
free_mem = int(data_list[4].split(' ')[0])
util_gpu = int(data_list[5].split(' ')[0])
util_mem = int(data_list[6].split(' ')[0])
temperature = int(data_list[7])
# 简化GPU名称
if gpu_name.startswith('NVIDIA '):
gpu_name = gpu_name[7:]
if gpu_name.startswith('GeForce '):
gpu_name = gpu_name[8:]
result.append({
'idx': idx,
'gpu_name': gpu_name,
'total_mem': total_mem,
'used_mem': used_mem,
'free_mem': free_mem,
'util_gpu': util_gpu,
'util_mem': util_mem,
'temperature': temperature
})
# locked = False
with data_list_lock:
# locked = True
shared_data_list[server_idx]['info_list'] = result
shared_data_list[server_idx]['updated'] = True
# locked = False
except Exception as e:
keep_run = False
shared_data_list[server_idx]['err_info'] = f'{e}'
if 'info_list' in shared_data_list[server_idx]:
shared_data_list[server_idx].pop('info_list')
time.sleep(interval)
# 关闭连接
client.close()
except Exception as e:
# if re_try_count == 0:
# shared_data_list[server_idx]['err_info'] = f'test:{type(e)}, {e}'
# print(e)
# else:
shared_data_list[server_idx]['err_info'] = f'retry:{re_try_count}, {e}'
time.sleep(re_connect_time)
re_try_count += 1
def realtime(args):
global run_realtime
try:
parser = argparse.ArgumentParser()
parser.add_argument('-n', type=float, default=2, help='多久刷新一次')
parser.add_argument('-e', '--exclude', type=str, default='', help='不需要显示的服务器(title)用,分割')
parser.add_argument('-t', '--table', action='store_true', help='以表格形式绘制')
parser.add_argument('--f2', action='store_true', help='使用第二种刷新方式')
args = parser.parse_args(args)
except:
print('参数有误!')
return
exclude_list = args.exclude.split(',')
# 加载json
with open(server_list_path, 'r') as f:
server_list = json.load(f)
# 共享list
data_list = []
run_realtime = True
# 开启线程
idx = 0
for i, server_data in enumerate(server_list):
if server_data['title'] in exclude_list:
continue
data_list.append(dict())
data_list[-1]['server_data'] = server_data
thread = threading.Thread(target=keep_check_one, args=(server_data, data_list, idx, args.n * 0.95))
idx += 1
thread.daemon = True
thread.start()
# 绘制
first_time = True
while True:
with data_list_lock:
if args.table:
res_str = get_table_res(data_list)
# 清屏
if args.f2:
rows_full = os.get_terminal_size().lines
print("\033[F"*rows_full)
else:
os.system('cls' if os.name == 'nt' else 'clear')
# 输出
print(res_str)
else:
os.system('cls' if os.name == 'nt' else 'clear')
print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time())))
for data in data_list:
print(data['server_data']['title'] + ' ---------------')
info_list = data.get('info_list', None)
if info_list:
print_res(info_list)
# print(info_list)
else:
print('出错')
if first_time:
first_time = False
time.sleep(1)
else:
time.sleep(args.n)
run_realtime = False
def check_all(show_type='list'):
# 加载json
with open(server_list_path, 'r') as f:
server_list = json.load(f)
if show_type == 'list':
# 处理每一个结果
for server_data in server_list:
print('* ' + server_data['title'] + ' --------------------')
try:
res = check_gpu_utilization(server_data)
print_res(res)
except paramiko.AuthenticationException:
print("Authentication failed.")
except paramiko.SSHException as ssh_err:
print(f"SSH error: {ssh_err}")
except socket.timeout:
print("Connection timed out.")
except Exception as e:
print(f"错误 {e}")
# except:
# print('连接出现问题.')
print()
elif show_type == 'table':
pass
# TODO
def print_server_list():
# 加载json
with open(server_list_path, 'r') as f:
server_list = json.load(f)
for s in server_list:
print('----------------------')
print(f"title:{s['title']}")
print(f"ip:{s['ip']}, port:{s['port']}, usr:{s['username']}")
def cell_width(args):
global cell_width_list
if len(args) == 0:
print(f'current cells width: {cell_width_list}')
elif len(args) > 1:
print('参数错误')
return
else:
if args[0] == '-':
print(f'原始值为:{cell_width_list}')
cell_width_list = default_cell_width_list
print(f'恢复默认值为:{cell_width_list}')
else:
widths = args[0].split(',')
if len(widths) != len(cell_width_list):
print('参数长度不正确')
return
temp_target = []
for i, width in enumerate(widths):
try:
if width == '-':
temp_w = cell_width_list[i]
else:
temp_w = int(width)
temp_target.append(temp_w)
except:
print('宽度值必须为整数或\'-\'')
return
print(f'原始值为:{cell_width_list}')
cell_width_list = temp_target
print(f'修改为:{cell_width_list}')
def main():
# 接受输入
while True:
msg = input(' -> ')
msg = msg.split(' ')
cmd = msg[0]
args = msg[1:]
if cmd == 'check':
check_all()
elif cmd == 'clear' or cmd == 'cls':
os.system('cls' if os.name == 'nt' else 'clear')
elif cmd == 'realtime' or cmd == 'rt':
realtime(args)
elif cmd == 'list':
print_server_list()
elif cmd == 'cell-width' or cmd == 'cw':
cell_width(args)
elif cmd == 'exit':
break
elif cmd == 'help':
print(f'使用方法: 需要把待查看的服务器信息放至{server_list_path}')
print()
print('check, 查看所有显卡状态')
print('realtime, 实时显示(rt同)')
print('list, 查看文件中记录的所有服务器信息')
print('cell-width, 查看或设置cell的宽度')
print('clear, 清空屏幕(cls同)')
print('exit, 退出')
else:
print('没有该指令, 输入help查看帮助')
if __name__ == '__main__':
main()