diff --git a/.gitignore b/.gitignore index b7f6efc..b0d04cd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ serverList.json +**/__pycache__/ +__pycache__/ +.vscode/ + # build build/ dist/ diff --git a/active_connector.py b/active_connector.py new file mode 100644 index 0000000..75beaae --- /dev/null +++ b/active_connector.py @@ -0,0 +1,230 @@ +from version import version +import threading +import paramiko +import time +import json + +class Connector: + def __init__(self, data_dict : dict, server_cfg : dict, lock : threading.Lock, connect_check_interval : float, reconnect_interval : float, multiple_of_timeout : float = 2): + self.data_dict = data_dict + self.tmp_data_dict = dict() + self.server_cfg = server_cfg + self.lock = lock + self.tmp_lock = threading.Lock() + self.connect_check_interval = connect_check_interval + self.reconnect_interval = reconnect_interval + self.multiple_of_timeout = multiple_of_timeout + + def run(self): + # 开启查询线程 + for i, server_data in enumerate(self.server_cfg): + self.tmp_data_dict[server_data['title']] = {} + # self.tmp_data_dict[server_data['title']]['server_data'] = server_data + thread_check = threading.Thread(target=self.__keep_check_one, args=(server_data, self.tmp_data_dict, server_data['title'], self.connect_check_interval, self.reconnect_interval)) + thread_check.daemon = True + thread_check.start() + + # 开启同步数据线程 + thread_transmit = threading.Thread(target=self.__transmit_data, args=(self.connect_check_interval,)) + thread_transmit.daemon = True + thread_transmit.start() + + # 持续获取一个服务器的信息 + def __keep_check_one(self, server: dict, shared_data_list: dict, server_title: str, interval: float, re_connect_time: float=5): + # 处理一下需要检查的存储空间路径 + if not 'storage_list' in server: + server['storage_list'] = [] + if not '/' in server['storage_list']: + server['storage_list'].insert(0, '/') + + re_try_count = 0 + # 循环连接 + while True: + try: + # 建立SSH连接 + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=interval*self.multiple_of_timeout) + + with self.tmp_lock: + if 'error_dict' in shared_data_list[server_title]: + shared_data_list[server_title].pop('error_dict') + re_try_count = 0 + + # 循环检测 + keep_run = True + while keep_run: + try: + error_info_dict = dict() + # 网络 信息 + network_info = self.__get_network_info(client, interval*self.multiple_of_timeout, server.get('network_interface_name', None), error_info_dict) + # TODO CPU 信息 + # 内存 信息 + memory_info = self.__get_memory_info(client, interval*self.multiple_of_timeout, error_info_dict) + # 存储空间 信息 + storage_info = self.__get_storage_info(client, interval*self.multiple_of_timeout, server['storage_list'], error_info_dict) + # GPU 信息 + gpu_info = self.__get_gpus_info(client, interval*self.multiple_of_timeout, error_info_dict, ignore_gpu=server.get('ignore_gpu', False)) + + # 记录信息 + with self.tmp_lock: + shared_data_list[server_title]['interval'] = interval + shared_data_list[server_title]['note'] = "" # TODO 暂不支持公告,后续增加 + shared_data_list[server_title]['title'] = server_title + shared_data_list[server_title]['version'] = version + shared_data_list[server_title]['update_time_stamp'] = int(time.time()) + if memory_info is not None: + shared_data_list[server_title]['gpu_list'] = gpu_info + if memory_info is not None: + shared_data_list[server_title]['memory'] = memory_info + if storage_info is not None: + shared_data_list[server_title]['storage_list'] = storage_info + if network_info is not None: + shared_data_list[server_title]['network_list'] = [{ + 'default' : False, + 'name' : server['network_interface_name'], + 'in' : network_info['in'], + 'out' : network_info['out'], + }] + if len(error_info_dict) > 0: + shared_data_list[server_title]['error_dict'] = error_info_dict + except Exception as e: + keep_run = False + with self.tmp_lock: + shared_data_list[server_title]['error_dict']['connect'] = f'{e}' + if 'gpu_list' in shared_data_list[server_title]: + shared_data_list[server_title].pop('gpu_list') + + time.sleep(interval) + + # 关闭连接 + client.close() + except Exception as e: + if 'error_dict' not in shared_data_list[server_title]: + shared_data_list[server_title]['error_dict'] = dict() + shared_data_list[server_title]['error_dict']['connect'] = f'retry:{re_try_count}, {e}' + time.sleep(re_connect_time) + re_try_count += 1 + + def __transmit_data(self, interval): + # 等待一点时间再开始 + time.sleep(interval * 0.5) + while True: + time.sleep(interval) + # 将临时字典中的内容移动到正式数据中 + with self.tmp_lock: + with self.lock: + for k, v in self.tmp_data_dict.items(): + self.data_dict[k] = v + + #region 获取信息的方法 + + def __get_memory_info(self, client, timeout, info_dict:dict=None): + try: + stdin, stdout, stderr = client.exec_command('free', timeout=timeout) + output = stdout.read().decode().split('\n')[1] + if output == "": + return None + data = output.split() + result = { + "total": int(data[1]), + "used": int(data[2]) + } + + return result + except paramiko.ssh_exception.SSHException as e: + # ssh 的异常仍然抛出 + raise + except Exception as e: + if info_dict is not None: + info_dict['memory'] = f'{e}' + return None + + def __get_storage_info(self, client, timeout, path_list, info_dict:dict=None): + try: + result = [] + for target_path in path_list: + stdin, stdout, stderr = client.exec_command(f'df {target_path} | grep \'{target_path}\'', timeout=timeout) + output = stdout.read().decode() + if output == "": + continue + data = output.split() + tmp_res = { + "path": target_path, + "total": int(data[1]), + "available": int(data[3]) + } + result.append(tmp_res) + return result + except paramiko.ssh_exception.SSHException as e: + # ssh 的异常仍然抛出 + raise + except Exception as e: + if info_dict is not None: + info_dict['storage'] = f'{e}' + return None + + def __get_network_info(self, client, timeout, interface_name, info_dict:dict=None): + try: + if interface_name is None: + return None + stdin, stdout, stderr = client.exec_command(f'ifstat -i {interface_name} 0.1 1', timeout=timeout) + output = stdout.read().decode().split('\n')[2] + data = output.split() + result = { + "in": float(data[0]), + "out": float(data[1]) + } + return result + except paramiko.ssh_exception.SSHException as e: + # ssh 的异常仍然抛出 + raise + except Exception as e: + if info_dict is not None: + info_dict['network'] = f'{e}' + return None + + def __get_gpus_info(self, client, timeout, info_dict:dict=None, ignore_gpu=False): + if ignore_gpu: + return None + + try: + stdin, stdout, stderr = client.exec_command("gpustat --json") + output = stdout.read().decode() + gpus_info = json.loads(output) + + result = [] + for gpu_info in gpus_info['gpus']: + # 处理一下 + gpu_name = gpu_info['name'].replace('NVIDIA ', '').replace('GeForce ', '') + process_list = [] + for process_info in gpu_info.get('processes', []): + cmd = process_info.get('command', '') + if 'full_command' in process_info: + cmd = ' '.join(process_info["full_command"]) + process_list.append({ + "user": process_info.get('username'), + "memory": process_info.get('gpu_memory_usage'), + "cmd": cmd + }) + # 加到list中 + result.append({ + "idx": gpu_info['index'], + "name": gpu_name, + "temperature": gpu_info['temperature.gpu'], + "used_memory": gpu_info['memory.used'], + "total_memory": gpu_info['memory.total'], + "utilization": gpu_info['utilization.gpu'], + "process_list": process_list + }) + + return result + except paramiko.ssh_exception.SSHException as e: + # ssh 的异常仍然抛出 + raise + except Exception as e: + if info_dict is not None: + info_dict['gpu'] = f'{e}' + return None + + #endregion \ No newline at end of file diff --git a/server.py b/server.py index 72c4167..23ebbd6 100644 --- a/server.py +++ b/server.py @@ -1,8 +1,10 @@ from flask import Flask, jsonify, request from flask_cors import CORS from version import version +from active_connector import Connector import json import argparse +import threading #region 全局 @@ -10,6 +12,8 @@ app = Flask(__name__) CORS(app) server_cfg = None data_dict = dict() +# 线程锁 +data_lock = threading.Lock() parser = argparse.ArgumentParser() parser.add_argument('--cfg', default='server_config.json', type=str, help='the path of config json.') @@ -31,33 +35,45 @@ def hello(): @app.route(f'/{api_name}/get_data', methods=['GET']) def get_data(): - return jsonify(data_dict) + with data_lock: + return jsonify(data_dict) @app.route(f'/{api_name}/update_data', methods=['POST']) def receive_data(): data = request.json # 如果存在对应标题则更新记录 if data['title'] in server_cfg['server_list']: - data_dict['server_dict'][data['title']] = data - # 合并显示信息 - if data['title'] in server_cfg['note_dict']: - client_note = data_dict['server_dict'][data['title']]['note'] - server_note = server_cfg['note_dict'][data['title']] - note = server_note if client_note == '' \ - else server_note + '\n' + client_note - data_dict['server_dict'][data['title']]['note'] = note + with data_lock: + data_dict['server_dict'][data['title']] = data + # 合并显示信息 + if data['title'] in server_cfg['note_dict']: + client_note = data_dict['server_dict'][data['title']]['note'] + server_note = server_cfg['note_dict'][data['title']] + note = server_note if client_note == '' \ + else server_note + '\n' + client_note + data_dict['server_dict'][data['title']]['note'] = note return jsonify({"status": "success"}) #endregion def init(): - data_dict['server_dict'] = dict() - data_dict['version'] = version - for server_name in server_cfg['server_list']: - data_dict['server_dict'][server_name] = None + with data_lock: + data_dict['server_dict'] = dict() + data_dict['version'] = version + for server_name in server_cfg['server_list']: + data_dict['server_dict'][server_name] = None def main(): init() + + # 主动连接 + if 'connect_server' in server_cfg and len(server_cfg['connect_server']) > 0: + connector = Connector(data_dict['server_dict'], server_cfg['connect_server'], data_lock, server_cfg['connect_check_interval'], server_cfg['reconnect_interval']) + connector.run() + print('开启主动服务器主动连接 : ' + '、'.join([s['title'] for s in server_cfg['connect_server']])) + else: + print('未设置主动连接的服务器') + # flask app.run(debug=False, host=server_cfg['host'], port=server_cfg['port']) diff --git a/server_config.json b/server_config.json index 2f8d31d..6564b86 100644 --- a/server_config.json +++ b/server_config.json @@ -4,5 +4,22 @@ "server_list":["76", "174", "233", "222"], "note_dict":{ }, - "api_name": "api" + "api_name": "api", + + "reconnect_interval" : 10, + "connect_check_interval" : 3, + "connect_server" : [ + { + "title": "SERVER_76", + "ip": "lxblxb.top", + "port": 66666, + "username": "lxb", + "key_filename": "/home/lxb/.ssh/id_rsa", + "network_interface_name": "eno2", + "storage_list": [ + "/media/D", + "/media/F" + ] + } + ] } diff --git a/version.py b/version.py index 4fe1d60..626d9d5 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -version = "0.1.1.20250317_beta" \ No newline at end of file +version = "0.2.0.20250626_beta" \ No newline at end of file