You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
14 KiB
321 lines
14 KiB
from version import version
|
|
import threading
|
|
import paramiko
|
|
import time
|
|
import json
|
|
import re
|
|
|
|
class Connector:
|
|
def __init__(self, data_dict : dict, server_cfg : dict, note_dict : dict, lock : threading.Lock, connect_check_interval : float, reconnect_interval : float, multiple_of_timeout : float = 2):
|
|
self.data_dict = data_dict
|
|
self.tmp_data_dict = dict()
|
|
self.server_cfg = server_cfg
|
|
self.note_dict = note_dict
|
|
self.lock = lock
|
|
self.tmp_lock = threading.Lock()
|
|
self.connect_check_interval = connect_check_interval
|
|
self.reconnect_interval = reconnect_interval
|
|
self.multiple_of_timeout = multiple_of_timeout
|
|
|
|
def run(self):
|
|
# 开启查询线程
|
|
for i, server_data in enumerate(self.server_cfg):
|
|
self.tmp_data_dict[server_data['title']] = {}
|
|
# self.tmp_data_dict[server_data['title']]['server_data'] = server_data
|
|
thread_check = threading.Thread(target=self.__keep_check_one, args=(server_data, self.tmp_data_dict, server_data['title'], self.connect_check_interval, self.reconnect_interval))
|
|
thread_check.daemon = True
|
|
thread_check.start()
|
|
|
|
# 开启同步数据线程
|
|
thread_transmit = threading.Thread(target=self.__transmit_data, args=(self.connect_check_interval,))
|
|
thread_transmit.daemon = True
|
|
thread_transmit.start()
|
|
|
|
# 持续获取一个服务器的信息
|
|
def __keep_check_one(self, server: dict, shared_data_list: dict, server_title: str, interval: float, re_connect_time: float=5):
|
|
# 处理一下需要检查的存储空间路径
|
|
if not 'storage_list' in server:
|
|
server['storage_list'] = []
|
|
if not '/' in server['storage_list']:
|
|
server['storage_list'].insert(0, '/')
|
|
|
|
re_try_count = 0
|
|
# 循环连接
|
|
while True:
|
|
try:
|
|
# 建立SSH连接
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=interval*self.multiple_of_timeout)
|
|
|
|
with self.tmp_lock:
|
|
if 'error_dict' in shared_data_list[server_title]:
|
|
shared_data_list[server_title].pop('error_dict')
|
|
re_try_count = 0
|
|
|
|
# 循环检测
|
|
keep_run = True
|
|
while keep_run:
|
|
try:
|
|
error_info_dict = dict()
|
|
# 网络 信息
|
|
network_info = self.__get_network_info(client, interval*self.multiple_of_timeout, server.get('network_interface_name', None), error_info_dict)
|
|
# CPU 信息
|
|
cpu_info = self.__get_cpu_info(client, interval*self.multiple_of_timeout, error_info_dict)
|
|
# 内存 信息
|
|
memory_info = self.__get_memory_info(client, interval*self.multiple_of_timeout, error_info_dict)
|
|
# 存储空间 信息
|
|
storage_info = self.__get_storage_info(client, interval*self.multiple_of_timeout, server['storage_list'], error_info_dict)
|
|
# GPU 信息
|
|
gpu_info = self.__get_gpus_info(client, interval*self.multiple_of_timeout, error_info_dict, ignore_gpu=server.get('ignore_gpu', False))
|
|
|
|
# 记录信息
|
|
with self.tmp_lock:
|
|
# 添加公告
|
|
if server_title in self.note_dict:
|
|
shared_data_list[server_title]['note'] = self.note_dict[server_title]
|
|
shared_data_list[server_title]['interval'] = interval
|
|
shared_data_list[server_title]['title'] = server_title
|
|
shared_data_list[server_title]['version'] = version
|
|
shared_data_list[server_title]['update_time_stamp'] = int(time.time())
|
|
if cpu_info is not None:
|
|
shared_data_list[server_title]['cpu'] = cpu_info
|
|
if memory_info is not None:
|
|
shared_data_list[server_title]['gpu_list'] = gpu_info
|
|
if memory_info is not None:
|
|
shared_data_list[server_title]['memory'] = memory_info
|
|
if storage_info is not None:
|
|
shared_data_list[server_title]['storage_list'] = storage_info
|
|
if network_info is not None:
|
|
shared_data_list[server_title]['network_list'] = [{
|
|
'default' : False,
|
|
'name' : server['network_interface_name'],
|
|
'in' : network_info['in'],
|
|
'out' : network_info['out'],
|
|
}]
|
|
if len(error_info_dict) > 0:
|
|
shared_data_list[server_title]['error_dict'] = error_info_dict
|
|
except Exception as e:
|
|
keep_run = False
|
|
with self.tmp_lock:
|
|
shared_data_list[server_title]['error_dict']['connect'] = f'{e}'
|
|
if 'gpu_list' in shared_data_list[server_title]:
|
|
shared_data_list[server_title].pop('gpu_list')
|
|
|
|
time.sleep(interval)
|
|
|
|
# 关闭连接
|
|
client.close()
|
|
except Exception as e:
|
|
if 'error_dict' not in shared_data_list[server_title]:
|
|
shared_data_list[server_title]['error_dict'] = dict()
|
|
shared_data_list[server_title]['error_dict']['connect'] = f'retry:{re_try_count}, {e}'
|
|
time.sleep(re_connect_time)
|
|
re_try_count += 1
|
|
|
|
# 持续的将主动获取的服务器信息同步到最终信息中
|
|
def __transmit_data(self, interval):
|
|
# 等待一点时间再开始
|
|
time.sleep(interval * 0.5)
|
|
while True:
|
|
time.sleep(interval)
|
|
# 将临时字典中的内容移动到正式数据中
|
|
with self.tmp_lock:
|
|
with self.lock:
|
|
for k, v in self.tmp_data_dict.items():
|
|
self.data_dict[k] = v
|
|
|
|
#region 获取信息的方法
|
|
|
|
def __get_cpu_info(self, client, timeout, info_dict:dict=None):
|
|
def get_cpu_temp_via_sysfs(client):
|
|
try:
|
|
command = r'''
|
|
for zone in /sys/class/thermal/thermal_zone*; do
|
|
if [ -f "$zone/type" ] && [ -f "$zone/temp" ]; then
|
|
type=$(cat "$zone/type")
|
|
temp=$(cat "$zone/temp")
|
|
echo "$type:$temp"
|
|
fi
|
|
done
|
|
'''
|
|
stdin, stdout, stderr = client.exec_command(command)
|
|
output = stdout.read().decode().strip()
|
|
temperatures = []
|
|
for line in output.splitlines():
|
|
if ':' in line:
|
|
type_str, temp_str = line.split(':', 1)
|
|
if 'cpu' in type_str.lower() or 'pkg' in type_str.lower():
|
|
try:
|
|
temp = int(temp_str.strip())
|
|
temperatures.append(round(temp / 1000, 1)) # 转为摄氏度
|
|
except ValueError:
|
|
continue
|
|
return temperatures
|
|
except Exception as e:
|
|
return []
|
|
|
|
def get_cpu_avg_usage_via_procstat(client, interval_sec=0.3):
|
|
def read_cpu_stat_line():
|
|
stdin, stdout, _ = client.exec_command("grep '^cpu ' /proc/stat")
|
|
line = stdout.read().decode().strip()
|
|
parts = list(map(int, line.split()[1:]))
|
|
idle = parts[3] + parts[4] # idle + iowait
|
|
total = sum(parts)
|
|
return idle, total
|
|
try:
|
|
idle1, total1 = read_cpu_stat_line()
|
|
time.sleep(interval_sec)
|
|
idle2, total2 = read_cpu_stat_line()
|
|
|
|
total_delta = total2 - total1
|
|
idle_delta = idle2 - idle1
|
|
|
|
if total_delta == 0:
|
|
return 0.0
|
|
usage = 100.0 * (1.0 - idle_delta / total_delta)
|
|
return round(usage, 1)
|
|
except Exception as e:
|
|
return None
|
|
|
|
try:
|
|
result = dict()
|
|
# 1. 获取 CPU 型号
|
|
stdin, stdout, stderr = client.exec_command("lscpu")
|
|
lscpu_output = stdout.read().decode()
|
|
model_match = re.search(r"Model name\s*:\s*(.+)", lscpu_output)
|
|
if model_match:
|
|
result["name"] = model_match.group(1).strip()
|
|
else:
|
|
# 如果没有找到“Model name”,则尝试匹配“型号名称”
|
|
model_name_match_cn = re.search(r'型号名称\s*:\s*(.+)', lscpu_output)
|
|
if model_name_match_cn:
|
|
result["name"] = model_name_match_cn.group(1).strip()
|
|
else:
|
|
result["name"] = "未知CPU"
|
|
|
|
# 2. 获取 CPU 温度
|
|
result["temperature_list"] = get_cpu_temp_via_sysfs(client)
|
|
|
|
# 3. 获取 CPU 占用率
|
|
result["core_avg_occupy"] = get_cpu_avg_usage_via_procstat(client)
|
|
result["core_occupy_list"] = [-1] # TODO 用现在这种方法去查询所有核心的消耗比较大,也没什么用处,暂时不查了
|
|
|
|
return result
|
|
except paramiko.ssh_exception.SSHException as e:
|
|
# ssh 的异常仍然抛出
|
|
raise
|
|
except Exception as e:
|
|
if info_dict is not None:
|
|
info_dict['cpu'] = f'{e}'
|
|
return None
|
|
|
|
def __get_memory_info(self, client, timeout, info_dict:dict=None):
|
|
try:
|
|
stdin, stdout, stderr = client.exec_command('free', timeout=timeout)
|
|
output = stdout.read().decode().split('\n')[1]
|
|
if output == "":
|
|
return None
|
|
data = output.split()
|
|
result = {
|
|
"total": int(data[1]),
|
|
"used": int(data[2])
|
|
}
|
|
|
|
return result
|
|
except paramiko.ssh_exception.SSHException as e:
|
|
# ssh 的异常仍然抛出
|
|
raise
|
|
except Exception as e:
|
|
if info_dict is not None:
|
|
info_dict['memory'] = f'{e}'
|
|
return None
|
|
|
|
def __get_storage_info(self, client, timeout, path_list, info_dict:dict=None):
|
|
try:
|
|
result = []
|
|
for target_path in path_list:
|
|
stdin, stdout, stderr = client.exec_command(f'df {target_path} | grep \'{target_path}\'', timeout=timeout)
|
|
output = stdout.read().decode()
|
|
if output == "":
|
|
continue
|
|
data = output.split()
|
|
tmp_res = {
|
|
"path": target_path,
|
|
"total": int(data[1]),
|
|
"available": int(data[3])
|
|
}
|
|
result.append(tmp_res)
|
|
return result
|
|
except paramiko.ssh_exception.SSHException as e:
|
|
# ssh 的异常仍然抛出
|
|
raise
|
|
except Exception as e:
|
|
if info_dict is not None:
|
|
info_dict['storage'] = f'{e}'
|
|
return None
|
|
|
|
def __get_network_info(self, client, timeout, interface_name, info_dict:dict=None):
|
|
try:
|
|
if interface_name is None:
|
|
return None
|
|
stdin, stdout, stderr = client.exec_command(f'ifstat -i {interface_name} 0.1 1', timeout=timeout)
|
|
output = stdout.read().decode().split('\n')[2]
|
|
data = output.split()
|
|
result = {
|
|
"in": float(data[0]),
|
|
"out": float(data[1])
|
|
}
|
|
return result
|
|
except paramiko.ssh_exception.SSHException as e:
|
|
# ssh 的异常仍然抛出
|
|
raise
|
|
except Exception as e:
|
|
if info_dict is not None:
|
|
info_dict['network'] = f'{e}'
|
|
return None
|
|
|
|
def __get_gpus_info(self, client, timeout, info_dict:dict=None, ignore_gpu=False):
|
|
if ignore_gpu:
|
|
return None
|
|
|
|
try:
|
|
stdin, stdout, stderr = client.exec_command("gpustat --json")
|
|
output = stdout.read().decode()
|
|
gpus_info = json.loads(output)
|
|
|
|
result = []
|
|
for gpu_info in gpus_info['gpus']:
|
|
# 处理一下
|
|
gpu_name = gpu_info['name'].replace('NVIDIA ', '').replace('GeForce ', '')
|
|
process_list = []
|
|
for process_info in gpu_info.get('processes', []):
|
|
cmd = process_info.get('command', '')
|
|
if 'full_command' in process_info:
|
|
cmd = ' '.join(process_info["full_command"])
|
|
process_list.append({
|
|
"user": process_info.get('username'),
|
|
"memory": process_info.get('gpu_memory_usage'),
|
|
"cmd": cmd
|
|
})
|
|
# 加到list中
|
|
result.append({
|
|
"idx": gpu_info['index'],
|
|
"name": gpu_name,
|
|
"temperature": gpu_info['temperature.gpu'],
|
|
"used_memory": gpu_info['memory.used'],
|
|
"total_memory": gpu_info['memory.total'],
|
|
"utilization": gpu_info['utilization.gpu'],
|
|
"process_list": process_list
|
|
})
|
|
|
|
return result
|
|
except paramiko.ssh_exception.SSHException as e:
|
|
# ssh 的异常仍然抛出
|
|
raise
|
|
except Exception as e:
|
|
if info_dict is not None:
|
|
info_dict['gpu'] = f'{e}'
|
|
return None
|
|
|
|
#endregion
|