Browse Source

新增主动连接服务器的方式,可以客户端发送的方式同时使用

lxb 3 weeks ago
parent
commit
8070e8d2c2
  1. 4
      .gitignore
  2. 230
      active_connector.py
  3. 42
      server.py
  4. 19
      server_config.json
  5. 2
      version.py

4
.gitignore

@ -1,5 +1,9 @@
serverList.json
**/__pycache__/
__pycache__/
.vscode/
# build
build/
dist/

230
active_connector.py

@ -0,0 +1,230 @@
from version import version
import threading
import paramiko
import time
import json
class Connector:
def __init__(self, data_dict : dict, server_cfg : dict, lock : threading.Lock, connect_check_interval : float, reconnect_interval : float, multiple_of_timeout : float = 2):
self.data_dict = data_dict
self.tmp_data_dict = dict()
self.server_cfg = server_cfg
self.lock = lock
self.tmp_lock = threading.Lock()
self.connect_check_interval = connect_check_interval
self.reconnect_interval = reconnect_interval
self.multiple_of_timeout = multiple_of_timeout
def run(self):
# 开启查询线程
for i, server_data in enumerate(self.server_cfg):
self.tmp_data_dict[server_data['title']] = {}
# self.tmp_data_dict[server_data['title']]['server_data'] = server_data
thread_check = threading.Thread(target=self.__keep_check_one, args=(server_data, self.tmp_data_dict, server_data['title'], self.connect_check_interval, self.reconnect_interval))
thread_check.daemon = True
thread_check.start()
# 开启同步数据线程
thread_transmit = threading.Thread(target=self.__transmit_data, args=(self.connect_check_interval,))
thread_transmit.daemon = True
thread_transmit.start()
# 持续获取一个服务器的信息
def __keep_check_one(self, server: dict, shared_data_list: dict, server_title: str, interval: float, re_connect_time: float=5):
# 处理一下需要检查的存储空间路径
if not 'storage_list' in server:
server['storage_list'] = []
if not '/' in server['storage_list']:
server['storage_list'].insert(0, '/')
re_try_count = 0
# 循环连接
while True:
try:
# 建立SSH连接
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(server['ip'], port=server['port'], username=server['username'], password=server.get('password', None), key_filename=server.get('key_filename', None), timeout=interval*self.multiple_of_timeout)
with self.tmp_lock:
if 'error_dict' in shared_data_list[server_title]:
shared_data_list[server_title].pop('error_dict')
re_try_count = 0
# 循环检测
keep_run = True
while keep_run:
try:
error_info_dict = dict()
# 网络 信息
network_info = self.__get_network_info(client, interval*self.multiple_of_timeout, server.get('network_interface_name', None), error_info_dict)
# TODO CPU 信息
# 内存 信息
memory_info = self.__get_memory_info(client, interval*self.multiple_of_timeout, error_info_dict)
# 存储空间 信息
storage_info = self.__get_storage_info(client, interval*self.multiple_of_timeout, server['storage_list'], error_info_dict)
# GPU 信息
gpu_info = self.__get_gpus_info(client, interval*self.multiple_of_timeout, error_info_dict, ignore_gpu=server.get('ignore_gpu', False))
# 记录信息
with self.tmp_lock:
shared_data_list[server_title]['interval'] = interval
shared_data_list[server_title]['note'] = "" # TODO 暂不支持公告,后续增加
shared_data_list[server_title]['title'] = server_title
shared_data_list[server_title]['version'] = version
shared_data_list[server_title]['update_time_stamp'] = int(time.time())
if memory_info is not None:
shared_data_list[server_title]['gpu_list'] = gpu_info
if memory_info is not None:
shared_data_list[server_title]['memory'] = memory_info
if storage_info is not None:
shared_data_list[server_title]['storage_list'] = storage_info
if network_info is not None:
shared_data_list[server_title]['network_list'] = [{
'default' : False,
'name' : server['network_interface_name'],
'in' : network_info['in'],
'out' : network_info['out'],
}]
if len(error_info_dict) > 0:
shared_data_list[server_title]['error_dict'] = error_info_dict
except Exception as e:
keep_run = False
with self.tmp_lock:
shared_data_list[server_title]['error_dict']['connect'] = f'{e}'
if 'gpu_list' in shared_data_list[server_title]:
shared_data_list[server_title].pop('gpu_list')
time.sleep(interval)
# 关闭连接
client.close()
except Exception as e:
if 'error_dict' not in shared_data_list[server_title]:
shared_data_list[server_title]['error_dict'] = dict()
shared_data_list[server_title]['error_dict']['connect'] = f'retry:{re_try_count}, {e}'
time.sleep(re_connect_time)
re_try_count += 1
def __transmit_data(self, interval):
# 等待一点时间再开始
time.sleep(interval * 0.5)
while True:
time.sleep(interval)
# 将临时字典中的内容移动到正式数据中
with self.tmp_lock:
with self.lock:
for k, v in self.tmp_data_dict.items():
self.data_dict[k] = v
#region 获取信息的方法
def __get_memory_info(self, client, timeout, info_dict:dict=None):
try:
stdin, stdout, stderr = client.exec_command('free', timeout=timeout)
output = stdout.read().decode().split('\n')[1]
if output == "":
return None
data = output.split()
result = {
"total": int(data[1]),
"used": int(data[2])
}
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['memory'] = f'{e}'
return None
def __get_storage_info(self, client, timeout, path_list, info_dict:dict=None):
try:
result = []
for target_path in path_list:
stdin, stdout, stderr = client.exec_command(f'df {target_path} | grep \'{target_path}\'', timeout=timeout)
output = stdout.read().decode()
if output == "":
continue
data = output.split()
tmp_res = {
"path": target_path,
"total": int(data[1]),
"available": int(data[3])
}
result.append(tmp_res)
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['storage'] = f'{e}'
return None
def __get_network_info(self, client, timeout, interface_name, info_dict:dict=None):
try:
if interface_name is None:
return None
stdin, stdout, stderr = client.exec_command(f'ifstat -i {interface_name} 0.1 1', timeout=timeout)
output = stdout.read().decode().split('\n')[2]
data = output.split()
result = {
"in": float(data[0]),
"out": float(data[1])
}
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['network'] = f'{e}'
return None
def __get_gpus_info(self, client, timeout, info_dict:dict=None, ignore_gpu=False):
if ignore_gpu:
return None
try:
stdin, stdout, stderr = client.exec_command("gpustat --json")
output = stdout.read().decode()
gpus_info = json.loads(output)
result = []
for gpu_info in gpus_info['gpus']:
# 处理一下
gpu_name = gpu_info['name'].replace('NVIDIA ', '').replace('GeForce ', '')
process_list = []
for process_info in gpu_info.get('processes', []):
cmd = process_info.get('command', '')
if 'full_command' in process_info:
cmd = ' '.join(process_info["full_command"])
process_list.append({
"user": process_info.get('username'),
"memory": process_info.get('gpu_memory_usage'),
"cmd": cmd
})
# 加到list中
result.append({
"idx": gpu_info['index'],
"name": gpu_name,
"temperature": gpu_info['temperature.gpu'],
"used_memory": gpu_info['memory.used'],
"total_memory": gpu_info['memory.total'],
"utilization": gpu_info['utilization.gpu'],
"process_list": process_list
})
return result
except paramiko.ssh_exception.SSHException as e:
# ssh 的异常仍然抛出
raise
except Exception as e:
if info_dict is not None:
info_dict['gpu'] = f'{e}'
return None
#endregion

42
server.py

@ -1,8 +1,10 @@
from flask import Flask, jsonify, request
from flask_cors import CORS
from version import version
from active_connector import Connector
import json
import argparse
import threading
#region 全局
@ -10,6 +12,8 @@ app = Flask(__name__)
CORS(app)
server_cfg = None
data_dict = dict()
# 线程锁
data_lock = threading.Lock()
parser = argparse.ArgumentParser()
parser.add_argument('--cfg', default='server_config.json', type=str, help='the path of config json.')
@ -31,33 +35,45 @@ def hello():
@app.route(f'/{api_name}/get_data', methods=['GET'])
def get_data():
return jsonify(data_dict)
with data_lock:
return jsonify(data_dict)
@app.route(f'/{api_name}/update_data', methods=['POST'])
def receive_data():
data = request.json
# 如果存在对应标题则更新记录
if data['title'] in server_cfg['server_list']:
data_dict['server_dict'][data['title']] = data
# 合并显示信息
if data['title'] in server_cfg['note_dict']:
client_note = data_dict['server_dict'][data['title']]['note']
server_note = server_cfg['note_dict'][data['title']]
note = server_note if client_note == '' \
else server_note + '\n' + client_note
data_dict['server_dict'][data['title']]['note'] = note
with data_lock:
data_dict['server_dict'][data['title']] = data
# 合并显示信息
if data['title'] in server_cfg['note_dict']:
client_note = data_dict['server_dict'][data['title']]['note']
server_note = server_cfg['note_dict'][data['title']]
note = server_note if client_note == '' \
else server_note + '\n' + client_note
data_dict['server_dict'][data['title']]['note'] = note
return jsonify({"status": "success"})
#endregion
def init():
data_dict['server_dict'] = dict()
data_dict['version'] = version
for server_name in server_cfg['server_list']:
data_dict['server_dict'][server_name] = None
with data_lock:
data_dict['server_dict'] = dict()
data_dict['version'] = version
for server_name in server_cfg['server_list']:
data_dict['server_dict'][server_name] = None
def main():
init()
# 主动连接
if 'connect_server' in server_cfg and len(server_cfg['connect_server']) > 0:
connector = Connector(data_dict['server_dict'], server_cfg['connect_server'], data_lock, server_cfg['connect_check_interval'], server_cfg['reconnect_interval'])
connector.run()
print('开启主动服务器主动连接 : ' + ''.join([s['title'] for s in server_cfg['connect_server']]))
else:
print('未设置主动连接的服务器')
# flask
app.run(debug=False, host=server_cfg['host'], port=server_cfg['port'])

19
server_config.json

@ -4,5 +4,22 @@
"server_list":["76", "174", "233", "222"],
"note_dict":{
},
"api_name": "api"
"api_name": "api",
"reconnect_interval" : 10,
"connect_check_interval" : 3,
"connect_server" : [
{
"title": "SERVER_76",
"ip": "lxblxb.top",
"port": 66666,
"username": "lxb",
"key_filename": "/home/lxb/.ssh/id_rsa",
"network_interface_name": "eno2",
"storage_list": [
"/media/D",
"/media/F"
]
}
]
}

2
version.py

@ -1 +1 @@
version = "0.1.1.20250317_beta"
version = "0.2.0.20250626_beta"
Loading…
Cancel
Save