批量查看gpu状态
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

466 lines
16 KiB

import multiprocessing
import threading
import paramiko
import argparse
import json
import time
import os
COLOR_GREEN = '\033[0;32m'
COLOR_RED = '\033[0;31m'
COLOR_YELLOW = '\033[0;33m'
END_COLOR = '\033[0m'
server_list_path = 'serverList.json'
run_realtime = False
data_list_lock = threading.Lock()
# data_list = []
def check_gpu_utilization(server:dict):
# 建立SSH连接
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None))
# 执行命令查看显卡占用情况(这里以nvidia-smi为例)
cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv'
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read().decode()
output = output.split('\n')
start_idx = 0
for i in range(len(output)):
if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu':
start_idx = i + 1
break
output = output[start_idx:-1]
# 解析数据 -----------------------------
result = []
for data in output:
data_list = data.split(', ')
idx = int(data_list[0])
gpu_name = data_list[1]
total_mem = int(data_list[2].split(' ')[0])
used_mem = int(data_list[3].split(' ')[0])
free_mem = int(data_list[4].split(' ')[0])
util_gpu = int(data_list[5].split(' ')[0])
util_mem = int(data_list[6].split(' ')[0])
temperature = int(data_list[7])
# 简化GPU名称
if gpu_name.startswith('NVIDIA '):
gpu_name = gpu_name[7:]
if gpu_name.startswith('GeForce '):
gpu_name = gpu_name[8:]
result.append({
'idx': idx,
'gpu_name': gpu_name,
'total_mem': total_mem,
'used_mem': used_mem,
'free_mem': free_mem,
'util_gpu': util_gpu,
'util_mem': util_mem,
'temperature': temperature
})
# 关闭连接
client.close()
return result
def print_res(data_list):
for data in data_list:
idx = data['idx']
gpu_name = data['gpu_name']
used_mem = data['used_mem']
total_mem = data['total_mem']
util_gpu = data['util_gpu']
diff_len_space = ' ' * (len(str(total_mem)) - len(str(used_mem)))
if used_mem < 500:
status = COLOR_GREEN + '空闲' + END_COLOR
elif used_mem / total_mem < 0.5:
status = COLOR_YELLOW + '占用' + END_COLOR
else:
status = COLOR_RED + '占用' + END_COLOR
res = f'{idx}: {status} - {gpu_name} - {diff_len_space}{used_mem} / {total_mem} MiB, GPU Util: {util_gpu} %'
print(res)
table_icon = {
'hline': '',
'vline': '',
'left':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'mid':{
'up' : '',
'middle' : '',
'bottom' : '',
},
'right':{
'up' : '',
'middle' : '',
'bottom' : '',
},
}
def clamp_str(input_str, length, fill=False, fill_type='center', len_is=None):
if len_is:
ori_len = len_is
else:
ori_len = len(input_str)
if ori_len > length:
input_str = input_str[:length-1]
input_str += ''
else:
diff = length - ori_len
if fill_type == 'center':
left = diff // 2
right = diff - left
elif fill_type == 'left':
left = 0
right = diff
elif fill_type == 'right':
left = diff
right = 0
tmp_list = []
tmp_list.append(' ' * left)
tmp_list.append(input_str)
tmp_list.append(' ' * right)
input_str = ''.join(tmp_list)
return input_str
def get_bar(bar_ratio, max_len, color=False):
assert 0 <= bar_ratio <= 1
res = []
used_len = int(bar_ratio * max_len)
res.extend(['|'] * used_len)
res.extend([' '] * (max_len-used_len))
if color:
res.insert(int(max_len*0.4), COLOR_YELLOW)
res.insert(int(max_len*0.8)+1, f'{END_COLOR}{COLOR_RED}')
res.append(END_COLOR)
return ''.join(res)
def print_table_res(data_list):
def print_line(line_type, width_list, spaces=[]):
assert line_type in ['up', 'middle', 'bottom']
str_list = []
for i, cell in enumerate(width_list):
# 开头
if i == 0:
if i in spaces:
str_list.append(table_icon['vline'])
else:
str_list.append(table_icon['left'][line_type])
# 中间
str_list.extend((' ' if i in spaces else table_icon['hline']) * cell)
# 右边
if i == len(width_list) - 1:
if i in spaces:
str_list.append(table_icon['vline'])
else:
str_list.append(table_icon['right'][line_type])
else:
if i in spaces and (i+1) in spaces:
str_list.append(table_icon['vline'])
elif i in spaces:
str_list.append(table_icon['left']['middle'])
elif (i + 1) in spaces:
str_list.append(table_icon['right']['middle'])
else:
str_list.append(table_icon['mid'][line_type])
print(''.join(str_list))
# print('TODO')
print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time())))
cell_width_list = [10, 20, 24, 15]
len_last3 = cell_width_list[1] + cell_width_list[2] + cell_width_list[3] + 2
# 输出head ------------------------------------------
print_line('up', cell_width_list)
str_list = []
str_list.append(table_icon['vline'])
str_list.append(clamp_str('Title', cell_width_list[0], True))
str_list.append(table_icon['vline'])
str_list.append(clamp_str('Name, State, Temperature', cell_width_list[1], True))
str_list.append(table_icon['vline'])
str_list.append(clamp_str('Memory-Usage', cell_width_list[2], True))
str_list.append(table_icon['vline'])
str_list.append(clamp_str('GPU-Util', cell_width_list[3], True))
str_list.append(table_icon['vline'])
print(''.join(str_list))
# 输出内容 ------------------------------------------
for i, data in enumerate(data_list):
print_line('middle', cell_width_list)
title = data['server_data']['title']
info_list = data.get('info_list', None)
if info_list:
for j, info in enumerate(info_list):
str_list = []
str_list.append(table_icon['vline'])
str_list.append(clamp_str(' ', cell_width_list[0], True))
str_list.append(table_icon['vline'])
# 显卡名称
str_list.append(clamp_str(f" {info['idx']} : {info['gpu_name']}", cell_width_list[1], True, 'left'))
str_list.append(table_icon['vline'])
# 显存占用
used_mem = info['used_mem']
total_mem = info['total_mem']
mem_str = f'{used_mem} / {total_mem} MiB '
str_list.append(clamp_str(mem_str, cell_width_list[2], True, 'right'))
str_list.append(table_icon['vline'])
# 显卡利用率
util_gpu = info['util_gpu']
str_list.append(clamp_str(f"{util_gpu} % ", cell_width_list[3], True, 'right'))
str_list.append(table_icon['vline'])
print(''.join(str_list))
# 第二行
str_list = []
# 服务器标题
str_list.append(table_icon['vline'])
str_list.append(clamp_str(title if j==0 else ' ', cell_width_list[0], True))
str_list.append(table_icon['vline'])
# 占用情况
if used_mem < 500:
status = COLOR_GREEN + 'free' + END_COLOR
text_len = 5
elif used_mem / total_mem < 0.5:
status = COLOR_YELLOW + 'occupied' + END_COLOR
text_len = 9
else:
status = COLOR_RED + 'occupied' + END_COLOR
text_len = 9
str_list.append(clamp_str(f" {status}", cell_width_list[1]-5, True, 'left', text_len))
# 温度
str_list.append(clamp_str(f"{info['temperature']}C ", 5, True, 'right'))
str_list.append(table_icon['vline'])
# 显存进度条
str_list.append(get_bar(used_mem/total_mem, cell_width_list[2], True))
str_list.append(table_icon['vline'])
# 利用率进度条
str_list.append(get_bar(util_gpu/100, cell_width_list[3], True))
str_list.append(table_icon['vline'])
print(''.join(str_list))
# 下一张卡
if j != len(info_list)-1:
print_line('middle', cell_width_list, [0])
else:
str_list = []
str_list.append(table_icon['vline'])
str_list.append(clamp_str(title, cell_width_list[0], True))
str_list.append(table_icon['vline'])
str_list.append(clamp_str('erro', len_last3, True))
str_list.append(table_icon['vline'])
print(''.join(str_list))
print_line('bottom', cell_width_list)
def keep_check_one(server: dict, shared_data_list: list, server_idx: int, interval: float):
try:
# 建立SSH连接
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None))
cmd = 'nvidia-smi --query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,utilization.memory,temperature.gpu --format=csv'
# 循环检测
while run_realtime:
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read().decode()
output = output.split('\n')
start_idx = 0
for i in range(len(output)):
if output[i] == 'index, name, memory.total [MiB], memory.used [MiB], memory.free [MiB], utilization.gpu [%], utilization.memory [%], temperature.gpu':
start_idx = i + 1
break
output = output[start_idx:-1]
# 解析数据 -----------------------------
result = []
for data in output:
data_list = data.split(', ')
idx = int(data_list[0])
gpu_name = data_list[1]
total_mem = int(data_list[2].split(' ')[0])
used_mem = int(data_list[3].split(' ')[0])
free_mem = int(data_list[4].split(' ')[0])
util_gpu = int(data_list[5].split(' ')[0])
util_mem = int(data_list[6].split(' ')[0])
temperature = int(data_list[7])
# 简化GPU名称
if gpu_name.startswith('NVIDIA '):
gpu_name = gpu_name[7:]
if gpu_name.startswith('GeForce '):
gpu_name = gpu_name[8:]
result.append({
'idx': idx,
'gpu_name': gpu_name,
'total_mem': total_mem,
'used_mem': used_mem,
'free_mem': free_mem,
'util_gpu': util_gpu,
'util_mem': util_mem,
'temperature': temperature
})
# locked = False
with data_list_lock:
# locked = True
shared_data_list[server_idx]['info_list'] = result
# locked = False
time.sleep(interval)
# 关闭连接
client.close()
except Exception as e:
# if data_list_lock.locked and locked:
# data_list_lock.release()
print(e)
def realtime(args):
global run_realtime
try:
parser = argparse.ArgumentParser()
parser.add_argument('-n', type=float, default=2, help='服务器多久刷新一次')
parser.add_argument('-f', type=float, default=2, help='显示多久刷新一次')
parser.add_argument('-e', '--exclude', type=str, default='', help='不需要显示的服务器(title)用,分割')
parser.add_argument('-t', '--table', action='store_true', help='以表格形式绘制')
args = parser.parse_args(args)
except:
print('参数有误!')
return
exclude_list = args.exclude.split(',')
# 加载json
with open(server_list_path, 'r') as f:
server_list = json.load(f)
# 共享list
manager = multiprocessing.Manager()
data_list = []
run_realtime = True
# 开启线程
idx = 0
for i, server_data in enumerate(server_list):
if server_data['title'] in exclude_list:
continue
data_list.append(dict())
data_list[-1]['server_data'] = server_data
thread = threading.Thread(target=keep_check_one, args=(server_data, data_list, idx, args.n))
idx += 1
thread.daemon = True
thread.start()
# 绘制
while True:
with data_list_lock:
os.system('cls' if os.name == 'nt' else 'clear')
if args.table:
print_table_res(data_list)
else:
print(time.strftime("%Y%m-%d%H:%M:%S", time.localtime(time.time())))
for data in data_list:
print(data['server_data']['title'] + ' ---------------')
info_list = data.get('info_list', None)
if info_list:
print_res(info_list)
# print(info_list)
else:
print('出错')
time.sleep(args.f)
run_realtime = False
def check_all(show_type='list'):
# 加载json
with open(server_list_path, 'r') as f:
server_list = json.load(f)
if show_type == 'list':
# 处理每一个结果
for server_data in server_list:
print('* ' + server_data['title'] + ' --------------------')
try:
res = check_gpu_utilization(server_data)
print_res(res)
except:
print('连接出现问题.')
print()
elif show_type == 'table':
pass
# TODO
def print_server_list():
# 加载json
with open(server_list_path, 'r') as f:
server_list = json.load(f)
for s in server_list:
print('----------------------')
print(f"title:{s['title']}")
print(f"ip:{s['ip']}, port:{s['port']}, usr:{s['username']}")
def main():
# 接受输入
while True:
msg = input(' -> ')
msg = msg.split(' ')
cmd = msg[0]
args = msg[1:]
if cmd == 'check':
check_all()
elif cmd == 'clear' or cmd == 'cls':
os.system('cls' if os.name == 'nt' else 'clear')
elif cmd == 'realtime' or cmd == 'rt':
realtime(args)
elif cmd == 'list':
print_server_list()
elif cmd == 'exit':
break
elif cmd == 'help':
print(f'使用方法: 需要把待查看的服务器信息放至{server_list_path}')
print()
print('check, 查看所有显卡状态')
print('realtime, 实时显示(rt同)')
print('list, 查看文件中记录的所有服务器信息')
print('clear, 清空屏幕(cls同)')
print('exit, 退出')
else:
print('没有该指令, 输入help查看帮助')
if __name__ == '__main__':
main()