summaryrefslogtreecommitdiff
path: root/ATRI/plugins/console/driver/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/console/driver/api.py')
-rw-r--r--ATRI/plugins/console/driver/api.py262
1 files changed, 73 insertions, 189 deletions
diff --git a/ATRI/plugins/console/driver/api.py b/ATRI/plugins/console/driver/api.py
index 4750148..4b78906 100644
--- a/ATRI/plugins/console/driver/api.py
+++ b/ATRI/plugins/console/driver/api.py
@@ -1,202 +1,86 @@
-import os
-import json
-import time
-import psutil
-from pathlib import Path
-from datetime import datetime
+import asyncio
-from ATRI.service import ServiceTools, SERVICES_DIR
-from ATRI.exceptions import GetStatusError
-from ..models import PlatformRuntimeInfo, BotRuntimeInfo, ServiceInfo
+from fastapi import Depends, status
+from starlette.websockets import WebSocket, WebSocketState
+from websockets.exceptions import ConnectionClosedOK
-
-def get_processing_data() -> tuple:
- try:
- p_cpu = psutil.cpu_percent(interval=1)
- p_mem = psutil.virtual_memory().percent
- disk = psutil.disk_usage("/").percent
- inte_send = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
- inte_recv = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
-
- process = psutil.Process(os.getpid())
- b_cpu = process.cpu_percent(interval=1)
- b_mem = process.memory_percent(memtype="rss")
-
- now = time.time()
- boot = psutil.boot_time()
- b = process.create_time()
- boot_time = str(
- datetime.utcfromtimestamp(now).replace(microsecond=0)
- - datetime.utcfromtimestamp(boot).replace(microsecond=0)
- )
- bot_time = str(
- datetime.utcfromtimestamp(now).replace(microsecond=0)
- - datetime.utcfromtimestamp(b).replace(microsecond=0)
- )
- except Exception:
- raise GetStatusError("Getting runtime failed.")
-
- if p_cpu > 90: # type: ignore
- msg = "咱感觉有些头晕..."
- if p_mem > 90:
- msg = "咱感觉有点头晕并且有点累..."
- elif p_mem > 90:
- msg = "咱感觉有点累..."
- elif disk > 90:
- msg = "咱感觉身体要被塞满了..."
- else:
- msg = "アトリは、高性能ですから!"
-
- return (
- PlatformRuntimeInfo(
- stat_msg=msg,
- cpu_percent=str(p_cpu),
- mem_percent=str(p_mem),
- disk_percent=str(disk),
- inte_send=str(inte_send),
- inte_recv=str(inte_recv),
- boot_time=boot_time,
- ).dict(),
- BotRuntimeInfo(
- cpu_percent=str(b_cpu), mem_percent=str(b_mem), bot_run_time=bot_time
- ).dict(),
- )
-
-
-def get_service_list() -> dict:
- result = dict()
-
- files = os.listdir(SERVICES_DIR)
- for f in files:
- # Thank you, MacOS
- if f == ".DS_Store":
- continue
-
- serv_path = SERVICES_DIR / f
- data = json.loads(serv_path.read_bytes())
-
- serv_name = data["service"]
- serv_docs = data["docs"]
- serv_is_enabled = data["enabled"]
- serv_disable_user = data["disable_user"]
- serv_disable_group = data["disable_group"]
-
- result[serv_name] = ServiceInfo(
- service_name=serv_name,
- service_docs=serv_docs,
- is_enabled=serv_is_enabled,
- disable_user=serv_disable_user,
- disable_group=serv_disable_group,
- ).dict()
-
- return result
-
-
-def control_service(
- serv_name: str, is_enab: int, enab_u: str, enab_g: str, disab_u: str, disab_g: str
-) -> tuple:
- try:
- serv_data = ServiceTools().load_service(serv_name)
- except Exception:
- return False, dict()
-
- if is_enab != 1:
- if is_enab == 0:
- serv_data["enabled"] = False
- else:
- serv_data["enabled"] = True
-
- if enab_u:
- if enab_u not in serv_data["disable_user"]:
- return False, {"msg": "Target not in list"}
- serv_data["disable_user"].remove(enab_u)
-
- if enab_g:
- if enab_g not in serv_data["disable_user"]:
- return False, {"msg": "Target not in list"}
- serv_data["disable_group"].remove(enab_g)
-
- if disab_u:
- if disab_u in serv_data["disable_user"]:
- return False, {"msg": "Target already exists in list"}
- serv_data["disable_user"].append(disab_u)
-
- if disab_g:
- if disab_g in serv_data["disable_user"]:
- return False, {"msg": "Target already exists in list"}
- serv_data["disable_group"].append(disab_g)
-
- try:
- ServiceTools().save_service(serv_data, serv_name)
- except Exception:
- return False, dict()
-
- return True, serv_data
-
-
-MANEGE_DIR = Path(".") / "data" / "plugins" / "manege"
+from .depends import *
+from .data_source import (
+ get_process_info,
+ get_service_list,
+ edit_service as _edit_service,
+ get_block_list,
+ edit_block_list as _edit_block_list,
+)
+from ..listener import get_message_info
-def get_block_list() -> dict:
- u_data = dict()
- g_data = dict()
+def base_url(_=Depends(http_author)):
+ return {"status": status.HTTP_204_NO_CONTENT, "msg": "该路径仅供控制台加载"}
- u_f = "block_user.json"
- path = MANEGE_DIR / u_f
- if path.is_file():
- u_data = json.loads(path.read_bytes())
- g_f = "block_group.json"
- path = MANEGE_DIR / g_f
- if path.is_file():
- g_data = json.loads(path.read_bytes())
+def auth_info(_=Depends(http_author)):
+ return {"status": status.HTTP_200_OK, "detail": "OK"}
- return {"user": u_data, "group": g_data}
-
-def edit_block_list(is_enab: bool, user_id: str, group_id: str) -> tuple:
- d = get_block_list()
- u_d = d["user"]
- g_d = d["group"]
-
- now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- if is_enab:
- if user_id:
- if user_id in u_d:
- return False, {"msg": "Target already exists in list"}
- u_d[user_id] = now_time
-
- if group_id:
- if group_id in g_d:
- return False, {"msg": "Target already exists in list"}
- g_d[group_id] = now_time
- else:
- if user_id:
- if user_id not in u_d:
- return False, {"msg": "Target not in list"}
- del u_d[user_id]
-
- if group_id:
- if group_id not in g_d:
- return False, {"msg": "Target not in list"}
- del g_d[group_id]
+async def runtime_info(websocket: WebSocket, _pass=Depends(websocket_author)):
+ if not _pass:
+ return
+ await websocket.accept()
try:
- u_f = "block_user.json"
- path = MANEGE_DIR / u_f
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(u_d))
-
- g_f = "block_group.json"
- path = MANEGE_DIR / g_f
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(g_d))
- except Exception:
- return False, dict()
+ while websocket.client_state == WebSocketState.CONNECTED:
+ await websocket.send_json(get_process_info())
+ await asyncio.sleep(1)
+ except ConnectionClosedOK:
+ pass
+ finally:
+ await websocket.close()
+ return
- return True, {"user": u_d, "group": g_d}
+async def message_info(websocket: WebSocket, _pass=Depends(websocket_author)):
+ if not _pass:
+ return
+ await websocket.accept()
-def get_group_list():
- ...
+ try:
+ while websocket.client_state == WebSocketState.CONNECTED:
+ await websocket.send_json(get_message_info())
+ await asyncio.sleep(1)
+ except ConnectionClosedOK:
+ pass
+ finally:
+ await websocket.close()
+ return
+
+
+def service_list(_=Depends(http_author)):
+ return {"status": status.HTTP_200_OK, "data": get_service_list()}
+
+
+def edit_service(
+ service: str,
+ global_enabled: str = "2",
+ enabled: str = "2",
+ user: str = str(),
+ group: str = str(),
+ _=Depends(http_author),
+):
+ return {
+ "status": status.HTTP_200_OK,
+ "data": _edit_service(
+ service, int(global_enabled), bool(int(enabled)), user, group
+ ),
+ }
+
+
+def block_list_info(_=Depends(http_author)):
+ return {"status": status.HTTP_200_OK, "data": get_block_list()}
+
+
+def edit_block_list(enabled: str, user_id: str = str(), group_id: str = str(), _=Depends(http_author)):
+ return {
+ "status": status.HTTP_200_OK,
+ "data": _edit_block_list(bool(int(enabled)), user_id, group_id),
+ }