网页上看GPU情况
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

175 lines
5.6 KiB

import os
import json
import time
import psutil
import requests
import subprocess
# region get data
# 获取显卡相关信息
def get_gpus_info(error_dict):
result_list = list()
try:
gpus_info = json.load(os.popen('gpustat --json'))
for gpu_info in gpus_info['gpus']:
# 处理一下
gpu_name = gpu_info['name']
gpu_name = gpu_name.replace('NVIDIA ', '').replace('GeForce ', '')
process_list = list()
for process_info in gpu_info['processes']:
process_list.append({
"user": process_info['username'],
"memory": process_info['gpu_memory_usage'],
"cmd": ' '.join(process_info["full_command"])
})
# 加到list中
result_list.append({
"idx": gpu_info['index'],
"name": gpu_name,
"temperature": gpu_info['temperature.gpu'],
"used_memory": gpu_info['memory.used'],
"total_memory": gpu_info['memory.total'],
"utilization": gpu_info['utilization.gpu'],
"process_list": process_list
})
except Exception as e:
error_dict['gpu'] = e
return result_list
# 获取cpu相关信息
cpu_name = None
def get_cpu_info(error_dict):
result_dict = dict()
try:
# 获取cpu型号
global cpu_name
def get_cpu_name():
if cpu_name == None:
import re
# 执行lscpu命令并获取输出
result = subprocess.run(['lscpu'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output = result.stdout
# 使用正则表达式匹配“Model name”或“型号名称”
model_name_match = re.search(r'Model name\s*:\s*(.+)', output)
if model_name_match:
return model_name_match.group(1).strip()
else:
# 如果没有找到“Model name”,则尝试匹配“型号名称”
model_name_match_cn = re.search(r'型号名称\s*:\s*(.+)', output)
if model_name_match_cn:
return model_name_match_cn.group(1).strip()
else:
return "CPU型号信息未找到"
else:
return cpu_name
cpu_name = get_cpu_name()
# 获取每个cpu的温度
temperature_list = list()
temperatures = psutil.sensors_temperatures()
if 'coretemp' in temperatures:
for entry in temperatures['coretemp']:
if entry.label.startswith('Package'):
temperature_list.append(entry.current)
# 记录信息
result_dict["name"] = cpu_name
result_dict["temperature_list"] = temperature_list
result_dict["core_avg_occupy"] = psutil.cpu_percent(interval=None, percpu=False)
result_dict["core_occupy_list"] = psutil.cpu_percent(interval=None, percpu=True)
except Exception as e:
error_dict['cpu'] = e
return result_dict
# 获取存储相关信息
def get_storages_info(error_dict, path_list):
result_list = list()
try:
for target_path in path_list:
data = subprocess.run(['df', target_path, '|', 'grep', target_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True).stdout
data = data.split('\n')[1].split()
tmp_res = {
"path": target_path,
"total": int(data[1]),
"available": int(data[3])
}
result_list.append(tmp_res)
except Exception as e:
error_dict['storage'] = e
return result_list
# 获取内存相关信息
def get_memory_info(error_dict):
result_dict = dict()
try:
mem = psutil.virtual_memory()
result_dict["total"] = mem.total
result_dict["used"] = mem.used
except Exception as e:
error_dict['memory'] = e
return result_dict
# 获取网络相关信息
def get_networks_info(error_dict):
# net_io = psutil.net_io_counters()
# print(net_io)
pass
# endregion
client_cfg = None
def collect_data():
result_dict = dict()
error_dict = dict()
# 根据设置采集信息
if 'gpu' in client_cfg['enable']:
result_dict['gpu_list'] = get_gpus_info(error_dict)
if 'cpu' in client_cfg['enable']:
result_dict['cpu'] = get_cpu_info(error_dict)
if 'storage' in client_cfg['enable']:
result_dict['storage_list'] = get_storages_info(error_dict, client_cfg['storage_list'])
if 'memory' in client_cfg['enable']:
result_dict['memory'] = get_memory_info(error_dict)
if 'network' in client_cfg['enable']:
result_dict['network_list'] = get_networks_info(error_dict)
# 记录其他信息
result_dict['update_time_stamp'] = int(time.time())
result_dict['error_dict'] = error_dict
result_dict['note'] = client_cfg['note']
result_dict['api_key'] = client_cfg['api_key']
return result_dict
def main():
# 加载配置文件
cfg_path = "client_config.json"
global client_cfg
with open(cfg_path, 'r') as f:
client_cfg = json.load(f)
# 持续发送
send_interval = client_cfg['interval']
api_url = client_cfg['server_url'] + '/api/update_data'
while True:
data = collect_data()
try:
response = requests.post(api_url, json=data)
except Exception as e:
print(e)
time.sleep(send_interval)
if __name__ == '__main__':
main()