summaryrefslogtreecommitdiff
path: root/ATRI/plugins/console/driver
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-05-14 00:02:20 +0800
committerKyomotoi <[email protected]>2022-05-14 00:02:20 +0800
commitd8dde99fb3a6f742488aee09c20bec7b9f4a3a1b (patch)
tree6c24179fddb06676ad2ecc6c347fb1e9f8f96feb /ATRI/plugins/console/driver
parent2abccb5960cc57e6d8ef8823df5340cb357a6b38 (diff)
downloadATRI-d8dde99fb3a6f742488aee09c20bec7b9f4a3a1b.tar.gz
ATRI-d8dde99fb3a6f742488aee09c20bec7b9f4a3a1b.tar.bz2
ATRI-d8dde99fb3a6f742488aee09c20bec7b9f4a3a1b.zip
✨ 新增插件以支持控制台
Diffstat (limited to 'ATRI/plugins/console/driver')
-rw-r--r--ATRI/plugins/console/driver/__init__.py56
-rw-r--r--ATRI/plugins/console/driver/api.py193
-rw-r--r--ATRI/plugins/console/driver/view.py101
3 files changed, 350 insertions, 0 deletions
diff --git a/ATRI/plugins/console/driver/__init__.py b/ATRI/plugins/console/driver/__init__.py
new file mode 100644
index 0000000..fe4398a
--- /dev/null
+++ b/ATRI/plugins/console/driver/__init__.py
@@ -0,0 +1,56 @@
+from nonebot import get_driver
+from nonebot.drivers.fastapi import Driver
+
+from fastapi.middleware.cors import CORSMiddleware
+
+from .view import (
+ handle_auther,
+ handle_base_uri,
+ handle_control_service,
+ handle_edit_block,
+ handle_get_block_list,
+ handle_get_service_list,
+ handle_runtime_info,
+ handle_message_deal_info,
+)
+
+
+CONSOLE_API_URI = "/capi" # base point
+CONSOLE_API_AUTH_URI = "/capi/auth" # 验证后台许可
+CONSOLE_API_RUNTIME_URI = "/capi/runtime" # 获取运行占用信息
+CONSOLE_API_MESSAGE_URI = "/capi/message" # 获取信息处理信息
+
+CONSOLE_API_SERVICE_LIST_URI = "/capi/service/list" # 获取服务列表
+CONSOLE_API_SERVICE_CONTROL_URI = "/capi/service/control" # 对服务作出修改
+
+CONSOLE_API_BLOCK_LIST_URI = "/capi/block/list" # 获取封禁列表
+CONSOLE_API_BLOCK_EDIT_URI = "/capi/block/edit" # 编辑封禁列表
+
+
+def register_routes(driver: Driver):
+ app = driver.server_app
+
+ origins = ["*"]
+ app.add_middleware(
+ CORSMiddleware,
+ allow_origins=origins,
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+ )
+
+ app.get(CONSOLE_API_URI)(handle_base_uri)
+ app.get(CONSOLE_API_AUTH_URI)(handle_auther)
+ app.get(CONSOLE_API_RUNTIME_URI)(handle_runtime_info)
+ app.get(CONSOLE_API_MESSAGE_URI)(handle_message_deal_info)
+
+ app.get(CONSOLE_API_SERVICE_LIST_URI)(handle_get_service_list)
+ app.get(CONSOLE_API_SERVICE_CONTROL_URI)(handle_control_service)
+
+ app.get(CONSOLE_API_BLOCK_LIST_URI)(handle_get_block_list)
+ app.get(CONSOLE_API_BLOCK_EDIT_URI)(handle_edit_block)
+
+
+def init():
+ driver = get_driver()
+ register_routes(driver) # type: ignore
diff --git a/ATRI/plugins/console/driver/api.py b/ATRI/plugins/console/driver/api.py
new file mode 100644
index 0000000..ba5405a
--- /dev/null
+++ b/ATRI/plugins/console/driver/api.py
@@ -0,0 +1,193 @@
+import os
+import json
+import time
+import psutil
+from pathlib import Path
+from datetime import datetime
+
+from ATRI.service import ServiceTools, SERVICES_DIR
+from ATRI.exceptions import GetStatusError, ReadFileError, WriteFileError
+from ..models import PlatformRuntimeInfo, BotRuntimeInfo, ServiceInfo
+
+
+def get_processing_data() -> tuple:
+ try:
+ p_cpu = psutil.cpu_percent(interval=1)
+ p_mem = psutil.virtual_memory().percent
+ disk = psutil.disk_usage("/").percent
+ inte_send = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
+ inte_recv = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+
+ process = psutil.Process(os.getpid())
+ b_cpu = process.cpu_percent(interval=1)
+ b_mem = process.memory_percent(memtype="rss")
+
+ now = time.time()
+ boot = psutil.boot_time()
+ b = process.create_time()
+ boot_time = str(
+ datetime.utcfromtimestamp(now).replace(microsecond=0)
+ - datetime.utcfromtimestamp(boot).replace(microsecond=0)
+ )
+ bot_time = str(
+ datetime.utcfromtimestamp(now).replace(microsecond=0)
+ - datetime.utcfromtimestamp(b).replace(microsecond=0)
+ )
+ except GetStatusError:
+ raise GetStatusError("Getting runtime failed.")
+
+ if p_cpu > 90: # type: ignore
+ msg = "咱感觉有些头晕..."
+ if p_mem > 90:
+ msg = "咱感觉有点头晕并且有点累..."
+ elif p_mem > 90:
+ msg = "咱感觉有点累..."
+ elif disk > 90:
+ msg = "咱感觉身体要被塞满了..."
+ else:
+ msg = "アトリは、高性能ですから!"
+
+ return (
+ PlatformRuntimeInfo(
+ stat_msg=msg,
+ cpu_percent=str(p_cpu),
+ mem_percent=p_mem,
+ disk_percent=str(disk),
+ inte_send=str(inte_send),
+ inte_recv=str(inte_recv),
+ boot_time=boot_time,
+ ).dict(),
+ BotRuntimeInfo(
+ cpu_percent=str(b_cpu), mem_percent=str(b_mem), bot_run_time=bot_time
+ ).dict(),
+ )
+
+
+def get_service_list() -> dict:
+ result = dict()
+
+ files = os.listdir(SERVICES_DIR)
+ for f in files:
+ # Thank you, MacOS
+ if f == ".DS_Store":
+ continue
+
+ serv_path = SERVICES_DIR / f
+ data = json.loads(serv_path.read_bytes())
+
+ serv_name = data["service"]
+ serv_docs = data["docs"]
+ serv_is_enabled = data["enabled"]
+ serv_disable_user = data["disable_user"]
+ serv_disable_group = data["disable_group"]
+
+ result[serv_name] = ServiceInfo(
+ service_name=serv_name,
+ service_docs=serv_docs,
+ is_enabled=serv_is_enabled,
+ disable_user=serv_disable_user,
+ disable_group=serv_disable_group,
+ ).dict()
+
+ return result
+
+
+def control_service(
+ serv_name: str, is_enab: int, enab_u: str, enab_g: str, disab_u: str, disab_g: str
+) -> tuple:
+ try:
+ serv_data = ServiceTools().load_service(serv_name)
+ except ReadFileError:
+ return False, dict()
+
+ if is_enab != 1:
+ if is_enab == 0:
+ serv_data["enabled"] = False
+ else:
+ serv_data["enabled"] = True
+
+ if enab_u:
+ if enab_u not in serv_data["disable_user"]:
+ return False, {"msg": "Target not in list"}
+ serv_data["disable_user"].remove(enab_u)
+
+ if enab_g:
+ if enab_g not in serv_data["disable_user"]:
+ return False, {"msg": "Target not in list"}
+ serv_data["disable_group"].remove(enab_g)
+
+ if disab_u:
+ if disab_u in serv_data["disable_user"]:
+ return False, {"msg": "Target already exists in list"}
+ serv_data["disable_user"].append(disab_u)
+
+ if disab_g:
+ if disab_g in serv_data["disable_user"]:
+ return False, {"msg": "Target already exists in list"}
+ serv_data["disable_group"].append(disab_g)
+
+ try:
+ ServiceTools().save_service(serv_data, serv_name)
+ except WriteFileError:
+ return False, dict()
+
+ return True, serv_data
+
+
+MANEGE_DIR = Path(".") / "data" / "database" / "manege"
+
+
+def get_block_list() -> dict:
+ u_f = "block_user.json"
+ path = MANEGE_DIR / u_f
+ u_data = json.loads(path.read_bytes())
+
+ g_f = "block_group.json"
+ path = MANEGE_DIR / g_f
+ g_data = json.loads(path.read_bytes())
+
+ return {"user": u_data, "group": g_data}
+
+
+def edit_block_list(is_enab: int, user_id: str, group_id: str) -> tuple:
+ d = get_block_list()
+ u_d = d["user"]
+ g_d = d["group"]
+
+ now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+ if is_enab:
+ if user_id:
+ if user_id in u_d:
+ return False, {"msg": "Target already exists in list"}
+ u_d[user_id] = now_time
+
+ if group_id:
+ if group_id in g_d:
+ return False, {"msg": "Target already exists in list"}
+ g_d[group_id] = now_time
+ else:
+ if user_id:
+ if user_id not in u_d:
+ return False, {"msg": "Target not in list"}
+ del u_d[user_id]
+
+ if group_id:
+ if group_id not in g_d:
+ return False, {"msg": "Target not in list"}
+ del g_d[group_id]
+
+ try:
+ u_f = "block_user.json"
+ path = MANEGE_DIR / u_f
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(u_d))
+
+ g_f = "block_group.json"
+ path = MANEGE_DIR / g_f
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(g_d))
+ except WriteFileError:
+ return False, dict()
+
+ return True, {"user": u_d, "group": g_d}
diff --git a/ATRI/plugins/console/driver/view.py b/ATRI/plugins/console/driver/view.py
new file mode 100644
index 0000000..624e41f
--- /dev/null
+++ b/ATRI/plugins/console/driver/view.py
@@ -0,0 +1,101 @@
+from ..data_source import Console
+from ..listener import get_message_deal_info
+from .api import (
+ control_service,
+ edit_block_list,
+ get_block_list,
+ get_processing_data,
+ get_service_list,
+)
+
+
+def auth_token(token: str) -> tuple:
+ auth_data: dict = Console().get_auth_info()
+ if not auth_data.get("token", None):
+ return False, {"status": 500, "msg": "This bot is not create auth data yet."}
+ _token = auth_data["token"]
+ if token != _token:
+ return False, {"status": 403, "msg": "Token error, please check again."}
+ else:
+ return True, {"status": 200, "msg": "OK"}
+
+
+def handle_base_uri():
+ return {"status": 204, "msg": "This path just for console load."}
+
+
+def handle_auther(token: str):
+ auth, data = auth_token(token)
+ return data if auth else data
+
+
+def handle_runtime_info(token: str):
+ auth, data = auth_token(token)
+ if not auth:
+ return data
+
+ plat, bot = get_processing_data()
+ return {"status": 200, "data": {"platform": plat, "bot": bot}}
+
+
+def handle_message_deal_info(token: str):
+ auth, data = auth_token(token)
+ if not auth:
+ return data
+
+ return {"status": 200, "data": get_message_deal_info()}
+
+
+def handle_get_service_list(token: str):
+ auth, data = auth_token(token)
+ if not auth:
+ return data
+
+ return {"status": 200, "data": get_service_list()}
+
+
+def handle_control_service(
+ token: str,
+ service: str,
+ is_enabled: int = 1,
+ enabled_user: str = str(),
+ enabled_group: str = str(),
+ disable_user: str = str(),
+ disable_group: str = str(),
+):
+ auth, data = auth_token(token)
+ if not auth:
+ return data
+
+ is_ok, data = control_service(
+ service, is_enabled, enabled_user, enabled_group, disable_user, disable_group
+ )
+ if not is_ok:
+ return {"status": 422, "msg": "Dealing service data failed"}
+
+ return {"status": 200, "data": data}
+
+
+def handle_get_block_list(token: str):
+ auth, data = auth_token(token)
+ if not auth:
+ return data
+
+ return {"status": 200, "data": get_block_list()}
+
+
+def handle_edit_block(
+ token: str,
+ is_enabled: bool,
+ user_id: str = str(),
+ group_id: str = str(),
+):
+ auth, data = auth_token(token)
+ if not auth:
+ return data
+
+ is_ok, data = edit_block_list(is_enabled, user_id, group_id)
+ if not is_ok:
+ return {"status": 422, "msg": "Dealing block data failed"}
+
+ return {"status": 200, "data": data}