import os import json import time import psutil import argparse import requests import subprocess from version import version # region get data # 获取显卡相关信息 def get_gpus_info(error_dict): result_list = list() try: gpus_info = json.load(os.popen('gpustat --json')) for gpu_info in gpus_info['gpus']: # 处理一下 gpu_name = gpu_info['name'] gpu_name = gpu_name.replace('NVIDIA ', '').replace('GeForce ', '') process_list = list() for process_info in gpu_info['processes']: cmd = process_info['command'] if 'full_command' in process_info: cmd = ' '.join(process_info["full_command"]) process_list.append({ "user": process_info['username'], "memory": process_info['gpu_memory_usage'], "cmd": cmd }) # 加到list中 result_list.append({ "idx": gpu_info['index'], "name": gpu_name, "temperature": gpu_info['temperature.gpu'], "used_memory": gpu_info['memory.used'], "total_memory": gpu_info['memory.total'], "utilization": gpu_info['utilization.gpu'], "process_list": process_list }) except Exception as e: error_dict['memory'] = f'{e}' return result_list # 获取cpu相关信息 cpu_name = None def get_cpu_info(error_dict): result_dict = dict() try: # 获取cpu型号 global cpu_name def get_cpu_name(): if cpu_name == None: import re # 执行lscpu命令并获取输出 result = subprocess.run(['lscpu'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) output = result.stdout # 使用正则表达式匹配“Model name”或“型号名称” model_name_match = re.search(r'Model name\s*:\s*(.+)', output) if model_name_match: return model_name_match.group(1).strip() else: # 如果没有找到“Model name”,则尝试匹配“型号名称” model_name_match_cn = re.search(r'型号名称\s*:\s*(.+)', output) if model_name_match_cn: return model_name_match_cn.group(1).strip() else: return "CPU型号信息未找到" else: return cpu_name cpu_name = get_cpu_name() # 获取每个cpu的温度 temperature_list = list() temperatures = psutil.sensors_temperatures() if 'coretemp' in temperatures: for entry in temperatures['coretemp']: if entry.label.startswith('Package'): temperature_list.append(entry.current) # 记录信息 result_dict["name"] = cpu_name result_dict["temperature_list"] = temperature_list result_dict["core_avg_occupy"] = psutil.cpu_percent(interval=None, percpu=False) result_dict["core_occupy_list"] = psutil.cpu_percent(interval=None, percpu=True) except Exception as e: error_dict['memory'] = f'{e}' return result_dict # 获取存储相关信息 def get_storages_info(error_dict, path_list): result_list = list() try: for target_path in path_list: data = subprocess.run(['df', target_path, '|', 'grep', target_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True).stdout data = data.split('\n')[1].split() tmp_res = { "path": target_path, "total": int(data[1]), "available": int(data[3]) } result_list.append(tmp_res) except Exception as e: error_dict['memory'] = f'{e}' return result_list # 获取内存相关信息 def get_memory_info(error_dict): result_dict = dict() try: mem = psutil.virtual_memory() result_dict["total"] = mem.total / 1024 result_dict["used"] = mem.used / 1024 except Exception as e: error_dict['memory'] = f'{e}' return result_dict # 获取网络相关信息 last_network_stats = None last_network_time = None def get_networks_info(error_dict): result_list = list() try: global last_network_stats global last_network_time current_stats = psutil.net_io_counters(pernic=True) if last_network_stats is None: # 第一次检测 for k in current_stats.keys(): if k == 'lo': continue result_list.append({ "name": k, "default": False, "in": 0, "out": 0 }) else: time_interval = time.time() - last_network_time for k in current_stats.keys(): if k == 'lo': continue result_list.append({ "name": k, "default": False, "in": (current_stats[k].bytes_recv - last_network_stats[k].bytes_recv) / time_interval / 1000, "out": (current_stats[k].bytes_sent - last_network_stats[k].bytes_sent) / time_interval / 1000 }) # 记录信息下次用 last_network_stats = current_stats last_network_time = time.time() except Exception as e: error_dict['memory'] = f'{e}' return result_list # endregion client_cfg = None def collect_data(): result_dict = dict() error_dict = dict() # 根据设置采集信息 if 'gpu' in client_cfg['enable']: result_dict['gpu_list'] = get_gpus_info(error_dict) if 'cpu' in client_cfg['enable']: result_dict['cpu'] = get_cpu_info(error_dict) if 'storage' in client_cfg['enable']: result_dict['storage_list'] = get_storages_info(error_dict, client_cfg['storage_list']) if 'memory' in client_cfg['enable']: result_dict['memory'] = get_memory_info(error_dict) if 'network' in client_cfg['enable']: result_dict['network_list'] = get_networks_info(error_dict) # 记录其他信息 result_dict['update_time_stamp'] = int(time.time()) result_dict['error_dict'] = error_dict result_dict['note'] = client_cfg['note'] result_dict['title'] = client_cfg['title'] result_dict['interval'] = client_cfg['interval'] result_dict['version'] = version return result_dict def main(): parser = argparse.ArgumentParser() parser.add_argument('--cfg', default='client_config.json', type=str, help='the path of config json.') args = parser.parse_args() # 加载配置文件 cfg_path = args.cfg global client_cfg with open(cfg_path, 'r') as f: client_cfg = json.load(f) # 持续发送 send_interval = client_cfg['interval'] api_name = client_cfg['api_name'] api_url = client_cfg['server_url'] + f'/{api_name}/update_data' while True: data = collect_data() try: result = requests.post(api_url, json=data) except Exception as e: print(e) time.sleep(send_interval) if __name__ == '__main__': main()