import os import json import time import psutil import requests import subprocess # region get data # 获取显卡相关信息 def get_gpus_info(error_dict): result_list = list() try: gpus_info = json.load(os.popen('gpustat --json')) for gpu_info in gpus_info['gpus']: # 处理一下 gpu_name = gpu_info['name'] gpu_name = gpu_name.replace('NVIDIA ', '').replace('GeForce ', '') process_list = list() for process_info in gpu_info['processes']: process_list.append({ "user": process_info['username'], "memory": process_info['gpu_memory_usage'], "cmd": ' '.join(process_info["full_command"]) }) # 加到list中 result_list.append({ "idx": gpu_info['index'], "name": gpu_name, "temperature": gpu_info['temperature.gpu'], "used_memory": gpu_info['memory.used'], "total_memory": gpu_info['memory.total'], "utilization": gpu_info['utilization.gpu'], "process_list": process_list }) except Exception as e: error_dict['gpu'] = e return result_list # 获取cpu相关信息 cpu_name = None def get_cpu_info(error_dict): result_dict = dict() try: # 获取cpu型号 global cpu_name def get_cpu_name(): if cpu_name == None: import re # 执行lscpu命令并获取输出 result = subprocess.run(['lscpu'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) output = result.stdout # 使用正则表达式匹配“Model name”或“型号名称” model_name_match = re.search(r'Model name\s*:\s*(.+)', output) if model_name_match: return model_name_match.group(1).strip() else: # 如果没有找到“Model name”,则尝试匹配“型号名称” model_name_match_cn = re.search(r'型号名称\s*:\s*(.+)', output) if model_name_match_cn: return model_name_match_cn.group(1).strip() else: return "CPU型号信息未找到" else: return cpu_name cpu_name = get_cpu_name() # 获取每个cpu的温度 temperature_list = list() temperatures = psutil.sensors_temperatures() if 'coretemp' in temperatures: for entry in temperatures['coretemp']: if entry.label.startswith('Package'): temperature_list.append(entry.current) # 记录信息 result_dict["name"] = cpu_name result_dict["temperature_list"] = temperature_list result_dict["core_avg_occupy"] = psutil.cpu_percent(interval=None, percpu=False) result_dict["core_occupy_list"] = psutil.cpu_percent(interval=None, percpu=True) except Exception as e: error_dict['cpu'] = e return result_dict # 获取存储相关信息 def get_storages_info(error_dict, path_list): result_list = list() try: for target_path in path_list: data = subprocess.run(['df', target_path, '|', 'grep', target_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True).stdout data = data.split('\n')[1].split() tmp_res = { "path": target_path, "total": int(data[1]), "available": int(data[3]) } result_list.append(tmp_res) except Exception as e: error_dict['storage'] = e return result_list # 获取内存相关信息 def get_memory_info(error_dict): result_dict = dict() try: mem = psutil.virtual_memory() result_dict["total"] = mem.total / 1024 result_dict["used"] = mem.used / 1024 except Exception as e: error_dict['memory'] = e return result_dict # 获取网络相关信息 def get_networks_info(error_dict): # net_io = psutil.net_io_counters() # print(net_io) # todo pass # endregion client_cfg = None def collect_data(): result_dict = dict() error_dict = dict() # 根据设置采集信息 if 'gpu' in client_cfg['enable']: result_dict['gpu_list'] = get_gpus_info(error_dict) if 'cpu' in client_cfg['enable']: result_dict['cpu'] = get_cpu_info(error_dict) if 'storage' in client_cfg['enable']: result_dict['storage_list'] = get_storages_info(error_dict, client_cfg['storage_list']) if 'memory' in client_cfg['enable']: result_dict['memory'] = get_memory_info(error_dict) if 'network' in client_cfg['enable']: result_dict['network_list'] = get_networks_info(error_dict) # 记录其他信息 result_dict['update_time_stamp'] = int(time.time()) result_dict['error_dict'] = error_dict result_dict['note'] = client_cfg['note'] result_dict['title'] = client_cfg['title'] result_dict['interval'] = client_cfg['interval'] return result_dict def main(): # 加载配置文件 cfg_path = "client_config.json" global client_cfg with open(cfg_path, 'r') as f: client_cfg = json.load(f) # 持续发送 send_interval = client_cfg['interval'] api_url = client_cfg['server_url'] + '/api/update_data' while True: data = collect_data() try: requests.post(api_url, json=data) except Exception as e: print(e) time.sleep(send_interval) if __name__ == '__main__': main()