from version import version import threading import paramiko import time import json import re class Connector: def __init__(self, data_dict : dict, server_cfg : dict, note_dict : dict, lock : threading.Lock, connect_check_interval : float, reconnect_interval : float, multiple_of_timeout : float = 2): self.data_dict = data_dict self.tmp_data_dict = dict() self.server_cfg = server_cfg self.note_dict = note_dict self.lock = lock self.tmp_lock = threading.Lock() self.connect_check_interval = connect_check_interval self.reconnect_interval = reconnect_interval self.multiple_of_timeout = multiple_of_timeout def run(self): # 开启查询线程 for i, server_data in enumerate(self.server_cfg): self.tmp_data_dict[server_data['title']] = {} # self.tmp_data_dict[server_data['title']]['server_data'] = server_data thread_check = threading.Thread(target=self.__keep_check_one, args=(server_data, self.tmp_data_dict, server_data['title'], self.connect_check_interval, self.reconnect_interval)) thread_check.daemon = True thread_check.start() # 开启同步数据线程 thread_transmit = threading.Thread(target=self.__transmit_data, args=(self.connect_check_interval,)) thread_transmit.daemon = True thread_transmit.start() # 持续获取一个服务器的信息 def __keep_check_one(self, server: dict, shared_data_list: dict, server_title: str, interval: float, re_connect_time: float=5): # 处理一下需要检查的存储空间路径 if not 'storage_list' in server: server['storage_list'] = [] if not '/' in server['storage_list']: server['storage_list'].insert(0, '/') re_try_count = 0 # 循环连接 while True: try: # 建立SSH连接 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=interval*self.multiple_of_timeout) with self.tmp_lock: if 'error_dict' in shared_data_list[server_title]: shared_data_list[server_title].pop('error_dict') re_try_count = 0 # 循环检测 keep_run = True while keep_run: try: error_info_dict = dict() # 网络 信息 network_info = self.__get_network_info(client, interval*self.multiple_of_timeout, server.get('network_interface_name', None), error_info_dict) # CPU 信息 cpu_info = self.__get_cpu_info(client, interval*self.multiple_of_timeout, error_info_dict) # 内存 信息 memory_info = self.__get_memory_info(client, interval*self.multiple_of_timeout, error_info_dict) # 存储空间 信息 storage_info = self.__get_storage_info(client, interval*self.multiple_of_timeout, server['storage_list'], error_info_dict) # GPU 信息 gpu_info = self.__get_gpus_info(client, interval*self.multiple_of_timeout, error_info_dict, ignore_gpu=server.get('ignore_gpu', False)) # 记录信息 with self.tmp_lock: # 添加公告 if server_title in self.note_dict: shared_data_list[server_title]['note'] = self.note_dict[server_title] shared_data_list[server_title]['interval'] = interval shared_data_list[server_title]['title'] = server_title shared_data_list[server_title]['version'] = version shared_data_list[server_title]['update_time_stamp'] = int(time.time()) if cpu_info is not None: shared_data_list[server_title]['cpu'] = cpu_info if memory_info is not None: shared_data_list[server_title]['gpu_list'] = gpu_info if memory_info is not None: shared_data_list[server_title]['memory'] = memory_info if storage_info is not None: shared_data_list[server_title]['storage_list'] = storage_info if network_info is not None: shared_data_list[server_title]['network_list'] = [{ 'default' : False, 'name' : server['network_interface_name'], 'in' : network_info['in'], 'out' : network_info['out'], }] if len(error_info_dict) > 0: shared_data_list[server_title]['error_dict'] = error_info_dict except Exception as e: keep_run = False with self.tmp_lock: shared_data_list[server_title]['error_dict']['connect'] = f'{e}' if 'gpu_list' in shared_data_list[server_title]: shared_data_list[server_title].pop('gpu_list') time.sleep(interval) # 关闭连接 client.close() except Exception as e: if 'error_dict' not in shared_data_list[server_title]: shared_data_list[server_title]['error_dict'] = dict() shared_data_list[server_title]['error_dict']['connect'] = f'retry:{re_try_count}, {e}' time.sleep(re_connect_time) re_try_count += 1 # 持续的将主动获取的服务器信息同步到最终信息中 def __transmit_data(self, interval): # 等待一点时间再开始 time.sleep(interval * 0.5) while True: time.sleep(interval) # 将临时字典中的内容移动到正式数据中 with self.tmp_lock: with self.lock: for k, v in self.tmp_data_dict.items(): self.data_dict[k] = v #region 获取信息的方法 def __get_cpu_info(self, client, timeout, info_dict:dict=None): def get_cpu_temp_via_sysfs(client): try: command = r''' for zone in /sys/class/thermal/thermal_zone*; do if [ -f "$zone/type" ] && [ -f "$zone/temp" ]; then type=$(cat "$zone/type") temp=$(cat "$zone/temp") echo "$type:$temp" fi done ''' stdin, stdout, stderr = client.exec_command(command) output = stdout.read().decode().strip() temperatures = [] for line in output.splitlines(): if ':' in line: type_str, temp_str = line.split(':', 1) if 'cpu' in type_str.lower() or 'pkg' in type_str.lower(): try: temp = int(temp_str.strip()) temperatures.append(round(temp / 1000, 1)) # 转为摄氏度 except ValueError: continue return temperatures except Exception as e: return [] def get_cpu_avg_usage_via_procstat(client, interval_sec=0.3): def read_cpu_stat_line(): stdin, stdout, _ = client.exec_command("grep '^cpu ' /proc/stat") line = stdout.read().decode().strip() parts = list(map(int, line.split()[1:])) idle = parts[3] + parts[4] # idle + iowait total = sum(parts) return idle, total try: idle1, total1 = read_cpu_stat_line() time.sleep(interval_sec) idle2, total2 = read_cpu_stat_line() total_delta = total2 - total1 idle_delta = idle2 - idle1 if total_delta == 0: return 0.0 usage = 100.0 * (1.0 - idle_delta / total_delta) return round(usage, 1) except Exception as e: return None try: result = dict() # 1. 获取 CPU 型号 stdin, stdout, stderr = client.exec_command("lscpu") lscpu_output = stdout.read().decode() model_match = re.search(r"Model name\s*:\s*(.+)", lscpu_output) if model_match: result["name"] = model_match.group(1).strip() else: # 如果没有找到“Model name”,则尝试匹配“型号名称” model_name_match_cn = re.search(r'型号名称\s*:\s*(.+)', lscpu_output) if model_name_match_cn: result["name"] = model_name_match_cn.group(1).strip() else: result["name"] = "未知CPU" # 2. 获取 CPU 温度 result["temperature_list"] = get_cpu_temp_via_sysfs(client) # 3. 获取 CPU 占用率 result["core_avg_occupy"] = get_cpu_avg_usage_via_procstat(client) result["core_occupy_list"] = [-1] # TODO 用现在这种方法去查询所有核心的消耗比较大,也没什么用处,暂时不查了 return result except paramiko.ssh_exception.SSHException as e: # ssh 的异常仍然抛出 raise except Exception as e: if info_dict is not None: info_dict['cpu'] = f'{e}' return None def __get_memory_info(self, client, timeout, info_dict:dict=None): try: stdin, stdout, stderr = client.exec_command('free', timeout=timeout) output = stdout.read().decode().split('\n')[1] if output == "": return None data = output.split() result = { "total": int(data[1]), "used": int(data[2]) } return result except paramiko.ssh_exception.SSHException as e: # ssh 的异常仍然抛出 raise except Exception as e: if info_dict is not None: info_dict['memory'] = f'{e}' return None def __get_storage_info(self, client, timeout, path_list, info_dict:dict=None): try: result = [] for target_path in path_list: stdin, stdout, stderr = client.exec_command(f'df {target_path} | grep \'{target_path}\'', timeout=timeout) output = stdout.read().decode() if output == "": continue data = output.split() tmp_res = { "path": target_path, "total": int(data[1]), "available": int(data[3]) } result.append(tmp_res) return result except paramiko.ssh_exception.SSHException as e: # ssh 的异常仍然抛出 raise except Exception as e: if info_dict is not None: info_dict['storage'] = f'{e}' return None def __get_network_info(self, client, timeout, interface_name, info_dict:dict=None): try: if interface_name is None: return None stdin, stdout, stderr = client.exec_command(f'ifstat -i {interface_name} 0.1 1', timeout=timeout) output = stdout.read().decode().split('\n')[2] data = output.split() result = { "in": float(data[0]), "out": float(data[1]) } return result except paramiko.ssh_exception.SSHException as e: # ssh 的异常仍然抛出 raise except Exception as e: if info_dict is not None: info_dict['network'] = f'{e}' return None def __get_gpus_info(self, client, timeout, info_dict:dict=None, ignore_gpu=False): if ignore_gpu: return None try: stdin, stdout, stderr = client.exec_command("gpustat --json") output = stdout.read().decode() gpus_info = json.loads(output) result = [] for gpu_info in gpus_info['gpus']: # 处理一下 gpu_name = gpu_info['name'].replace('NVIDIA ', '').replace('GeForce ', '') process_list = [] for process_info in gpu_info.get('processes', []): cmd = process_info.get('command', '') if 'full_command' in process_info: cmd = ' '.join(process_info["full_command"]) process_list.append({ "user": process_info.get('username'), "memory": process_info.get('gpu_memory_usage'), "cmd": cmd }) # 加到list中 result.append({ "idx": gpu_info['index'], "name": gpu_name, "temperature": gpu_info['temperature.gpu'], "used_memory": gpu_info['memory.used'], "total_memory": gpu_info['memory.total'], "utilization": gpu_info['utilization.gpu'], "process_list": process_list }) return result except paramiko.ssh_exception.SSHException as e: # ssh 的异常仍然抛出 raise except Exception as e: if info_dict is not None: info_dict['gpu'] = f'{e}' return None #endregion