网页上看GPU情况
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

321 lines
14 KiB

from version import version
import threading
import paramiko
import time
import json
import re
class Connector:
def __init__(self, data_dict : dict, server_cfg : dict, note_dict : dict, lock : threading.Lock, connect_check_interval : float, reconnect_interval : float, multiple_of_timeout : float = 2):
self.data_dict = data_dict
self.tmp_data_dict = dict()
self.server_cfg = server_cfg
self.note_dict = note_dict
self.lock = lock
self.tmp_lock = threading.Lock()
self.connect_check_interval = connect_check_interval
self.reconnect_interval = reconnect_interval
self.multiple_of_timeout = multiple_of_timeout
def run(self):
# 开启查询线程
for i, server_data in enumerate(self.server_cfg):
self.tmp_data_dict[server_data['title']] = {}
# self.tmp_data_dict[server_data['title']]['server_data'] = server_data
thread_check = threading.Thread(target=self.__keep_check_one, args=(server_data, self.tmp_data_dict, server_data['title'], self.connect_check_interval, self.reconnect_interval))
thread_check.daemon = True
thread_check.start()
# 开启同步数据线程
thread_transmit = threading.Thread(target=self.__transmit_data, args=(self.connect_check_interval,))
thread_transmit.daemon = True
thread_transmit.start()
# 持续获取一个服务器的信息
def __keep_check_one(self, server: dict, shared_data_list: dict, server_title: str, interval: float, re_connect_time: float=5):
# 处理一下需要检查的存储空间路径
if not 'storage_list' in server:
server['storage_list'] = []
if not '/' in server['storage_list']:
server['storage_list'].insert(0, '/')
re_try_count = 0
# 循环连接
while True:
try:
# 建立SSH连接
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=interval*self.multiple_of_timeout)
with self.tmp_lock:
if 'error_dict' in shared_data_list[server_title]:
shared_data_list[server_title].pop('error_dict')
re_try_count = 0
# 循环检测
keep_run = True
while keep_run:
try:
error_info_dict = dict()
# 网络 信息
network_info = self.__get_network_info(client, interval*self.multiple_of_timeout, server.get('network_interface_name', None), error_info_dict)
# CPU 信息
cpu_info = self.__get_cpu_info(client, interval*self.multiple_of_timeout, error_info_dict)
# 内存 信息
memory_info = self.__get_memory_info(client, interval*self.multiple_of_timeout, error_info_dict)
# 存储空间 信息
storage_info = self.__get_storage_info(client, interval*self.multiple_of_timeout, server['storage_list'], error_info_dict)
# GPU 信息
gpu_info = self.__get_gpus_info(client, interval*self.multiple_of_timeout, error_info_dict, ignore_gpu=server.get('ignore_gpu', False))
# 记录信息
with self.tmp_lock:
# 添加公告
if server_title in self.note_dict:
shared_data_list[server_title]['note'] = self.note_dict[server_title]
shared_data_list[server_title]['interval'] = interval
shared_data_list[server_title]['title'] = server_title
shared_data_list[server_title]['version'] = version
shared_data_list[server_title]['update_time_stamp'] = int(time.time())
if cpu_info is not None:
shared_data_list[server_title]['cpu'] = cpu_info
if memory_info is not None:
shared_data_list[server_title]['gpu_list'] = gpu_info
if memory_info is not None:
shared_data_list[server_title]['memory'] = memory_info
if storage_info is not None:
shared_data_list[server_title]['storage_list'] = storage_info
if network_info is not None:
shared_data_list[server_title]['network_list'] = [{
'default' : False,
'name' : server['network_interface_name'],
'in' : network_info['in'],
'out' : network_info['out'],
}]
if len(error_info_dict) > 0:
shared_data_list[server_title]['error_dict'] = error_info_dict
except Exception as e:
keep_run = False
with self.tmp_lock:
shared_data_list[server_title]['error_dict']['connect'] = f'{e}'
if 'gpu_list' in shared_data_list[server_title]:
shared_data_list[server_title].pop('gpu_list')
time.sleep(interval)
# 关闭连接
client.close()
except Exception as e:
if 'error_dict' not in shared_data_list[server_title]:
shared_data_list[server_title]['error_dict'] = dict()
shared_data_list[server_title]['error_dict']['connect'] = f'retry:{re_try_count}, {e}'
time.sleep(re_connect_time)
re_try_count += 1
# 持续的将主动获取的服务器信息同步到最终信息中
def __transmit_data(self, interval):
# 等待一点时间再开始
time.sleep(interval * 0.5)
while True:
time.sleep(interval)
# 将临时字典中的内容移动到正式数据中
with self.tmp_lock:
with self.lock:
for k, v in self.tmp_data_dict.items():
self.data_dict[k] = v
#region 获取信息的方法
def __get_cpu_info(self, client, timeout, info_dict:dict=None):
def get_cpu_temp_via_sysfs(client):
try:
command = r'''
for zone in /sys/class/thermal/thermal_zone*; do
if [ -f "$zone/type" ] && [ -f "$zone/temp" ]; then
type=$(cat "$zone/type")
temp=$(cat "$zone/temp")
echo "$type:$temp"
fi
done
'''
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode().strip()
temperatures = []
for line in output.splitlines():
if ':' in line:
type_str, temp_str = line.split(':', 1)
if 'cpu' in type_str.lower() or 'pkg' in type_str.lower():
try:
temp = int(temp_str.strip())
temperatures.append(round(temp / 1000, 1)) # 转为摄氏度
except ValueError:
continue
return temperatures
except Exception as e:
return []
def get_cpu_avg_usage_via_procstat(client, interval_sec=0.3):
def read_cpu_stat_line():
stdin, stdout, _ = client.exec_command("grep '^cpu ' /proc/stat")
line = stdout.read().decode().strip()
parts = list(map(int, line.split()[1:]))
idle = parts[3] + parts[4] # idle + iowait
total = sum(parts)
return idle, total
try:
idle1, total1 = read_cpu_stat_line()
time.sleep(interval_sec)
idle2, total2 = read_cpu_stat_line()
total_delta = total2 - total1
idle_delta = idle2 - idle1
if total_delta == 0:
return 0.0
usage = 100.0 * (1.0 - idle_delta / total_delta)
return round(usage, 1)
except Exception as e:
return None
try:
result = dict()
# 1. 获取 CPU 型号
stdin, stdout, stderr = client.exec_command("lscpu")
lscpu_output = stdout.read().decode()
model_match = re.search(r"Model name\s*:\s*(.+)", lscpu_output)
if model_match:
result["name"] = model_match.group(1).strip()
else:
# 如果没有找到“Model name”,则尝试匹配“型号名称”
model_name_match_cn = re.search(r'型号名称\s*:\s*(.+)', lscpu_output)
if model_name_match_cn:
result["name"] = model_name_match_cn.group(1).strip()
else:
result["name"] = "未知CPU"
# 2. 获取 CPU 温度
result["temperature_list"] = get_cpu_temp_via_sysfs(client)
# 3. 获取 CPU 占用率
result["core_avg_occupy"] = get_cpu_avg_usage_via_procstat(client)
result["core_occupy_list"] = [-1] # TODO 用现在这种方法去查询所有核心的消耗比较大,也没什么用处,暂时不查了
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['cpu'] = f'{e}'
return None
def __get_memory_info(self, client, timeout, info_dict:dict=None):
try:
stdin, stdout, stderr = client.exec_command('free', timeout=timeout)
output = stdout.read().decode().split('\n')[1]
if output == "":
return None
data = output.split()
result = {
"total": int(data[1]),
"used": int(data[2])
}
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['memory'] = f'{e}'
return None
def __get_storage_info(self, client, timeout, path_list, info_dict:dict=None):
try:
result = []
for target_path in path_list:
stdin, stdout, stderr = client.exec_command(f'df {target_path} | grep \'{target_path}\'', timeout=timeout)
output = stdout.read().decode()
if output == "":
continue
data = output.split()
tmp_res = {
"path": target_path,
"total": int(data[1]),
"available": int(data[3])
}
result.append(tmp_res)
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['storage'] = f'{e}'
return None
def __get_network_info(self, client, timeout, interface_name, info_dict:dict=None):
try:
if interface_name is None:
return None
stdin, stdout, stderr = client.exec_command(f'ifstat -i {interface_name} 0.1 1', timeout=timeout)
output = stdout.read().decode().split('\n')[2]
data = output.split()
result = {
"in": float(data[0]),
"out": float(data[1])
}
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['network'] = f'{e}'
return None
def __get_gpus_info(self, client, timeout, info_dict:dict=None, ignore_gpu=False):
if ignore_gpu:
return None
try:
stdin, stdout, stderr = client.exec_command("gpustat --json")
output = stdout.read().decode()
gpus_info = json.loads(output)
result = []
for gpu_info in gpus_info['gpus']:
# 处理一下
gpu_name = gpu_info['name'].replace('NVIDIA ', '').replace('GeForce ', '')
process_list = []
for process_info in gpu_info.get('processes', []):
cmd = process_info.get('command', '')
if 'full_command' in process_info:
cmd = ' '.join(process_info["full_command"])
process_list.append({
"user": process_info.get('username'),
"memory": process_info.get('gpu_memory_usage'),
"cmd": cmd
})
# 加到list中
result.append({
"idx": gpu_info['index'],
"name": gpu_name,
"temperature": gpu_info['temperature.gpu'],
"used_memory": gpu_info['memory.used'],
"total_memory": gpu_info['memory.total'],
"utilization": gpu_info['utilization.gpu'],
"process_list": process_list
})
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['gpu'] = f'{e}'
return None
#endregion