diff options
Diffstat (limited to 'ATRI/plugins/console/driver/api.py')
-rw-r--r-- | ATRI/plugins/console/driver/api.py | 262 |
1 files changed, 73 insertions, 189 deletions
diff --git a/ATRI/plugins/console/driver/api.py b/ATRI/plugins/console/driver/api.py index 4750148..4b78906 100644 --- a/ATRI/plugins/console/driver/api.py +++ b/ATRI/plugins/console/driver/api.py @@ -1,202 +1,86 @@ -import os -import json -import time -import psutil -from pathlib import Path -from datetime import datetime +import asyncio -from ATRI.service import ServiceTools, SERVICES_DIR -from ATRI.exceptions import GetStatusError -from ..models import PlatformRuntimeInfo, BotRuntimeInfo, ServiceInfo +from fastapi import Depends, status +from starlette.websockets import WebSocket, WebSocketState +from websockets.exceptions import ConnectionClosedOK - -def get_processing_data() -> tuple: - try: - p_cpu = psutil.cpu_percent(interval=1) - p_mem = psutil.virtual_memory().percent - disk = psutil.disk_usage("/").percent - inte_send = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inte_recv = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore - - process = psutil.Process(os.getpid()) - b_cpu = process.cpu_percent(interval=1) - b_mem = process.memory_percent(memtype="rss") - - now = time.time() - boot = psutil.boot_time() - b = process.create_time() - boot_time = str( - datetime.utcfromtimestamp(now).replace(microsecond=0) - - datetime.utcfromtimestamp(boot).replace(microsecond=0) - ) - bot_time = str( - datetime.utcfromtimestamp(now).replace(microsecond=0) - - datetime.utcfromtimestamp(b).replace(microsecond=0) - ) - except Exception: - raise GetStatusError("Getting runtime failed.") - - if p_cpu > 90: # type: ignore - msg = "咱感觉有些头晕..." - if p_mem > 90: - msg = "咱感觉有点头晕并且有点累..." - elif p_mem > 90: - msg = "咱感觉有点累..." - elif disk > 90: - msg = "咱感觉身体要被塞满了..." - else: - msg = "アトリは、高性能ですから!" - - return ( - PlatformRuntimeInfo( - stat_msg=msg, - cpu_percent=str(p_cpu), - mem_percent=str(p_mem), - disk_percent=str(disk), - inte_send=str(inte_send), - inte_recv=str(inte_recv), - boot_time=boot_time, - ).dict(), - BotRuntimeInfo( - cpu_percent=str(b_cpu), mem_percent=str(b_mem), bot_run_time=bot_time - ).dict(), - ) - - -def get_service_list() -> dict: - result = dict() - - files = os.listdir(SERVICES_DIR) - for f in files: - # Thank you, MacOS - if f == ".DS_Store": - continue - - serv_path = SERVICES_DIR / f - data = json.loads(serv_path.read_bytes()) - - serv_name = data["service"] - serv_docs = data["docs"] - serv_is_enabled = data["enabled"] - serv_disable_user = data["disable_user"] - serv_disable_group = data["disable_group"] - - result[serv_name] = ServiceInfo( - service_name=serv_name, - service_docs=serv_docs, - is_enabled=serv_is_enabled, - disable_user=serv_disable_user, - disable_group=serv_disable_group, - ).dict() - - return result - - -def control_service( - serv_name: str, is_enab: int, enab_u: str, enab_g: str, disab_u: str, disab_g: str -) -> tuple: - try: - serv_data = ServiceTools().load_service(serv_name) - except Exception: - return False, dict() - - if is_enab != 1: - if is_enab == 0: - serv_data["enabled"] = False - else: - serv_data["enabled"] = True - - if enab_u: - if enab_u not in serv_data["disable_user"]: - return False, {"msg": "Target not in list"} - serv_data["disable_user"].remove(enab_u) - - if enab_g: - if enab_g not in serv_data["disable_user"]: - return False, {"msg": "Target not in list"} - serv_data["disable_group"].remove(enab_g) - - if disab_u: - if disab_u in serv_data["disable_user"]: - return False, {"msg": "Target already exists in list"} - serv_data["disable_user"].append(disab_u) - - if disab_g: - if disab_g in serv_data["disable_user"]: - return False, {"msg": "Target already exists in list"} - serv_data["disable_group"].append(disab_g) - - try: - ServiceTools().save_service(serv_data, serv_name) - except Exception: - return False, dict() - - return True, serv_data - - -MANEGE_DIR = Path(".") / "data" / "plugins" / "manege" +from .depends import * +from .data_source import ( + get_process_info, + get_service_list, + edit_service as _edit_service, + get_block_list, + edit_block_list as _edit_block_list, +) +from ..listener import get_message_info -def get_block_list() -> dict: - u_data = dict() - g_data = dict() +def base_url(_=Depends(http_author)): + return {"status": status.HTTP_204_NO_CONTENT, "msg": "该路径仅供控制台加载"} - u_f = "block_user.json" - path = MANEGE_DIR / u_f - if path.is_file(): - u_data = json.loads(path.read_bytes()) - g_f = "block_group.json" - path = MANEGE_DIR / g_f - if path.is_file(): - g_data = json.loads(path.read_bytes()) +def auth_info(_=Depends(http_author)): + return {"status": status.HTTP_200_OK, "detail": "OK"} - return {"user": u_data, "group": g_data} - -def edit_block_list(is_enab: bool, user_id: str, group_id: str) -> tuple: - d = get_block_list() - u_d = d["user"] - g_d = d["group"] - - now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - if is_enab: - if user_id: - if user_id in u_d: - return False, {"msg": "Target already exists in list"} - u_d[user_id] = now_time - - if group_id: - if group_id in g_d: - return False, {"msg": "Target already exists in list"} - g_d[group_id] = now_time - else: - if user_id: - if user_id not in u_d: - return False, {"msg": "Target not in list"} - del u_d[user_id] - - if group_id: - if group_id not in g_d: - return False, {"msg": "Target not in list"} - del g_d[group_id] +async def runtime_info(websocket: WebSocket, _pass=Depends(websocket_author)): + if not _pass: + return + await websocket.accept() try: - u_f = "block_user.json" - path = MANEGE_DIR / u_f - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(u_d)) - - g_f = "block_group.json" - path = MANEGE_DIR / g_f - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(g_d)) - except Exception: - return False, dict() + while websocket.client_state == WebSocketState.CONNECTED: + await websocket.send_json(get_process_info()) + await asyncio.sleep(1) + except ConnectionClosedOK: + pass + finally: + await websocket.close() + return - return True, {"user": u_d, "group": g_d} +async def message_info(websocket: WebSocket, _pass=Depends(websocket_author)): + if not _pass: + return + await websocket.accept() -def get_group_list(): - ... + try: + while websocket.client_state == WebSocketState.CONNECTED: + await websocket.send_json(get_message_info()) + await asyncio.sleep(1) + except ConnectionClosedOK: + pass + finally: + await websocket.close() + return + + +def service_list(_=Depends(http_author)): + return {"status": status.HTTP_200_OK, "data": get_service_list()} + + +def edit_service( + service: str, + global_enabled: str = "2", + enabled: str = "2", + user: str = str(), + group: str = str(), + _=Depends(http_author), +): + return { + "status": status.HTTP_200_OK, + "data": _edit_service( + service, int(global_enabled), bool(int(enabled)), user, group + ), + } + + +def block_list_info(_=Depends(http_author)): + return {"status": status.HTTP_200_OK, "data": get_block_list()} + + +def edit_block_list(enabled: str, user_id: str = str(), group_id: str = str(), _=Depends(http_author)): + return { + "status": status.HTTP_200_OK, + "data": _edit_block_list(bool(int(enabled)), user_id, group_id), + } |