diff options
Diffstat (limited to 'ATRI/plugins/status')
-rw-r--r-- | ATRI/plugins/status/__init__.py | 210 | ||||
-rw-r--r-- | ATRI/plugins/status/data_source.py | 79 | ||||
-rw-r--r-- | ATRI/plugins/status/driver/__init__.py | 41 | ||||
-rw-r--r-- | ATRI/plugins/status/driver/view.py | 39 | ||||
-rw-r--r-- | ATRI/plugins/status/listener.py | 49 | ||||
-rw-r--r-- | ATRI/plugins/status/models.py | 32 |
6 files changed, 10 insertions, 440 deletions
diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py index 9aa5364..ae7702a 100644 --- a/ATRI/plugins/status/__init__.py +++ b/ATRI/plugins/status/__init__.py @@ -1,25 +1,13 @@ -import json - -from nonebot.params import ArgPlainText -from nonebot.permission import SUPERUSER -from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent - from ATRI.log import logger as log -from ATRI.config import BotSelfConfig -from ATRI.exceptions import ReadFileError, WriteError from ATRI.utils.apscheduler import scheduler -from .data_source import Status, STATUS_DIR -from .models import ForAuthData -from .driver import init - -init() +from .data_source import Status ping = Status().on_command("/ping", "检测bot简单信息处理速度") @ping.handle() -async def _ping(): +async def _(): await ping.finish(Status.ping()) @@ -27,7 +15,7 @@ status = Status().on_command("/status", "查看运行资源占用") @status.handle() -async def _status(): +async def _(): msg, _ = Status.get_status() await status.finish(msg) @@ -36,198 +24,8 @@ info_msg = "アトリは高性能ですから!" @scheduler.scheduled_job("interval", name="状态检查", minutes=10, misfire_grace_time=15) # type: ignore -async def _check_runtime(): +async def _(): log.info("开始检查资源消耗...") msg, stat = Status().get_status() if not stat: await status.finish(msg) - - -get_console_key = Status().on_command("/auth", "获取进入网页后台的凭证", permission=SUPERUSER) - - -@get_console_key.got("is_pub_n", "咱的运行环境是否有公网(y/n)") -async def _(event: PrivateMessageEvent, is_pub_n: str = ArgPlainText("is_pub_n")): - if is_pub_n != "y": - ip = str(await Status().get_host_ip(False)) - await get_console_key.send("没有公网吗...嗯知道了") - else: - ip = str(await Status().get_host_ip(True)) - - p = BotSelfConfig.port - rs = Status().get_random_str(20) - - df = STATUS_DIR / "data.json" - try: - if not df.is_file(): - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - d = json.loads(df.read_bytes()) - - ca = d.get("data", None) - if ca: - # 此处原本想用 matcher.finish 但这是在 try 里啊! - await get_console_key.send("咱已经告诉你了嗷!啊!忘了.../gauth 获取吧") - return - - d["data"] = ForAuthData(ip=ip, port=str(p), token=rs).dict() - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(d)) - except WriteError: - msg = f""" - 哦吼!写入文件失败了...还请自行记下哦... - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await get_console_key.send(msg) - - raise WriteError("Writing file: " + str(df) + " failed!") - - msg = f""" - 该信息已保存!可通过 /gauth 获取~ - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await get_console_key.finish(msg) - - -@get_console_key.handle() -async def _(event: GroupMessageEvent): - await get_console_key.finish("请私戳咱获取(") - - -load_console_key = Status().on_command("/gauth", "获取已生成的后台凭证", permission=SUPERUSER) - - -@load_console_key.handle() -async def _(event: PrivateMessageEvent): - df = STATUS_DIR / "data.json" - if not df.is_file(): - await load_console_key.finish("你还没有问咱索要奥!/auth 以获取") - - try: - d = json.loads(df.read_bytes()) - except ReadFileError: - await load_console_key.send("获取数据失败了...请自行打开文件查看吧:\n" + str(df)) - raise ReadFileError("Reading file: " + str(df) + " failed!") - - data = d["data"] - msg = f""" - 诶嘿嘿嘿——凭证信息来咯! - IP: {data['ip']} - PORT: {data['port']} - TOKEN: {data['token']} - 切记!不要告诉他人!! - """.strip() - await load_console_key.finish(msg) - - -@load_console_key.handle() -async def _(event: GroupMessageEvent): - await load_console_key.finish("请私戳咱获取(") - - -del_console_key = Status().on_command("/deauth", "销毁进入网页后台的凭证", permission=SUPERUSER) - - -@del_console_key.got("is_sure_d", "...你确定吗(y/n)") -async def _(is_sure: str = ArgPlainText("is_sure_d")): - if is_sure != "y": - await del_console_key.finish("反悔了呢...") - - df = STATUS_DIR / "data.json" - if not df.is_file(): - await del_console_key.finish("你还没向咱索取凭证呢.../auth 以获取") - - try: - data: dict = json.loads(df.read_bytes()) - - del data["data"] - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(data)) - except WriteError: - await del_console_key.send("销毁失败了...请至此处自行删除文件:\n" + str(df)) - raise WriteError("Writing / Reading file: " + str(df) + " failed!") - - await del_console_key.finish("销毁成功!如需再次获取: /auth") - - -res_console_key = Status().on_command("/reauth", "重置进入网页后台的凭证", permission=SUPERUSER) - - -@res_console_key.got("is_sure_r", "...你确定吗(y/n)") -async def _(is_sure: str = ArgPlainText("is_sure_r")): - if is_sure != "y": - await res_console_key.finish("反悔了呢...") - - df = STATUS_DIR / "data.json" - if not df.is_file(): - await del_console_key.finish("你还没向咱索取凭证呢.../auth 以获取") - - try: - data: dict = json.loads(df.read_bytes()) - - del data["data"] - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(data)) - except WriteError: - await del_console_key.send("销毁失败了...请至此处自行删除文件:\n" + str(df)) - raise WriteError("Writing / Reading file: " + str(df) + " failed!") - - -@res_console_key.got("is_pub_r_n", "咱的运行环境是否有公网(y/n)") -async def _(event: PrivateMessageEvent, is_pub_n: str = ArgPlainText("is_pub_n")): - if is_pub_n != "y": - ip = str(await Status().get_host_ip(False)) - await res_console_key.send("没有公网吗...嗯知道了") - else: - ip = str(await Status().get_host_ip(True)) - - p = BotSelfConfig.port - rs = Status().get_random_str(20) - - df = STATUS_DIR / "data.json" - try: - if not df.is_file(): - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - d = json.loads(df.read_bytes()) - - ca = d.get("data", None) - if ca: - await res_console_key.send("咱已经告诉你了嗷!啊!忘了.../gauth 获取吧") - return - - d["data"] = ForAuthData(ip=ip, port=str(p), token=rs).dict() - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(d)) - except WriteError: - msg = f""" - 哦吼!写入文件失败了...还请自行记下哦... - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await res_console_key.send(msg) - - raise WriteError("Writing file: " + str(df) + " failed!") - - msg = f""" - 该信息已保存!可通过 /gauth 获取~ - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await res_console_key.finish(msg) diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py index 1658468..ddd3568 100644 --- a/ATRI/plugins/status/data_source.py +++ b/ATRI/plugins/status/data_source.py @@ -1,24 +1,13 @@ import os import time -import json import psutil -import socket -import string -from pathlib import Path -from random import sample from datetime import datetime from ATRI.service import Service -from ATRI.log import logger as log from ATRI.rule import is_in_service -from ATRI.utils import request -from ATRI.exceptions import GetStatusError, WriteError -from .models import PlatfromRuntimeInfo, BotRuntimeInfo +from ATRI.exceptions import GetStatusError -STATUS_DIR = Path(".") / "data" / "database" / "status" -STATUS_DIR.mkdir(exist_ok=True) - _status_msg = """ > Status Overview @@ -44,15 +33,13 @@ class Status(Service): return "I'm fine." @staticmethod - def get_status(is_for_fn: bool = False) -> tuple: - data_p = Path(".") / "data" - + def get_status() -> tuple: try: cpu = psutil.cpu_percent(interval=1) mem = psutil.virtual_memory().percent disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + inte_send = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore + inte_recv = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore process = psutil.Process(os.getpid()) b_cpu = process.cpu_percent(interval=1) @@ -88,71 +75,17 @@ class Status(Service): else: is_ok = True - if is_for_fn: - return ( - PlatfromRuntimeInfo( - stat_msg=msg, - cpu_percent=str(cpu), - mem_percent=mem, - disk_percent=str(disk), - inte_send=str(inteSENT), - inte_recv=str(inteRECV), - boot_time=boot_time, - ).dict(), - BotRuntimeInfo( - cpu_percent=str(b_cpu), - mem_percent=str(b_mem), - bot_run_time=bot_time, - ).dict(), - ) - msg0 = _status_msg.format( p_cpu=cpu, p_mem=mem, p_disk=disk, b_cpu=f"{b_cpu}%", b_mem="%.1f%%" % b_mem, - inteSENT=inteSENT, - inteRECV=inteRECV, + inteSENT=inte_send, + inteRECV=inte_recv, bot_time=bot_time, boot_time=boot_time, msg=msg, ) return msg0, is_ok - - @staticmethod - async def get_host_ip(is_pub: bool): - if is_pub: - data = await request.get("https://ifconfig.me/ip") - return data.text - - s = None - try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(('8.8.8.8', 80)) - ip = s.getsockname()[0] - return ip - finally: - if s: - s.close() - - @staticmethod - def get_random_str(k: int) -> str: - return "".join(sample(string.ascii_letters + string.digits, k)) - - @staticmethod - def get_auth_info() -> dict: - df = STATUS_DIR / "data.json" - if not df.is_file(): - try: - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - except WriteError: - raise WriteError("Writing file: " + str(df) + " failed!") - - base_data: dict = json.loads(df.read_bytes()) - data = base_data.get("data", None) - if not data: - return {"data": None} - return data diff --git a/ATRI/plugins/status/driver/__init__.py b/ATRI/plugins/status/driver/__init__.py deleted file mode 100644 index 199fb1e..0000000 --- a/ATRI/plugins/status/driver/__init__.py +++ /dev/null @@ -1,41 +0,0 @@ -from nonebot import get_driver -from nonebot.drivers.fastapi import Driver - -from fastapi.middleware.cors import CORSMiddleware - -from .view import ( - handle_auther, - handle_base_uri, - handle_runtime_info, - handle_message_deal_info, -) - - -CONSOLE_API_URI = "/capi" # base point -CONSOLE_API_AUTH_URI = "/capi/auth" # 验证后台许可 -CONSOLE_API_RUNTIME_URI = "/capi/runtime" # 获取运行占用信息 -CONSOLE_API_MESSAGE_URI = "/capi/message" -# CONSOLE_API_AUTH_COOKIES_URI = "/capi/auth/cookies" # 验证cookies - - -def register_routes(driver: Driver): - app = driver.server_app - - origins = ["*"] - app.add_middleware( - CORSMiddleware, - allow_origins=origins, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - app.get(CONSOLE_API_URI)(handle_base_uri) - app.get(CONSOLE_API_RUNTIME_URI)(handle_runtime_info) - app.get(CONSOLE_API_MESSAGE_URI)(handle_message_deal_info) - app.get(CONSOLE_API_AUTH_URI)(handle_auther) - - -def init(): - driver = get_driver() - register_routes(driver) # type: ignore diff --git a/ATRI/plugins/status/driver/view.py b/ATRI/plugins/status/driver/view.py deleted file mode 100644 index 5d8c56f..0000000 --- a/ATRI/plugins/status/driver/view.py +++ /dev/null @@ -1,39 +0,0 @@ -from ..data_source import Status -from ..listener import get_message_deal_info - - -def handle_base_uri(): - return {"status": 204, "msg": "This path just for console load."} - - -def handle_runtime_info(token: str): - auth, data = auth_token(token) - plat, bot = Status().get_status(True) - if auth: - return {"status": 200, "data": {"platform": plat, "bot": bot}} - else: - return data - - -def handle_message_deal_info(token: str): - auth, data = auth_token(token) - if auth: - return {"status": 200, "data": get_message_deal_info()} - else: - return data - - -def handle_auther(token: str): - auth, data = auth_token(token) - return data if auth else data - - -def auth_token(token: str) -> tuple: - auth_data: dict = Status().get_auth_info() - if not auth_data.get("token", None): - return False, {"status": 500, "msg": "This bot is not create auth data yet."} - _token = auth_data["token"] - if token != _token: - return False, {"status": 403, "msg": "Token error, please check again."} - else: - return True, {"status": 200, "msg": "OK"} diff --git a/ATRI/plugins/status/listener.py b/ATRI/plugins/status/listener.py deleted file mode 100644 index 0ae5c25..0000000 --- a/ATRI/plugins/status/listener.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import Optional - -from nonebot.message import run_postprocessor - -from ATRI.utils.apscheduler import scheduler -from .models import MessageDealerInfo - - -recv_msg = int() -deal_msg = int() -failed_deal_msg = int() - -total_r_m = int() -total_d_m = int() -total_f_m = int() - - -@run_postprocessor -async def _(exception: Optional[Exception]): - global recv_msg, deal_msg, failed_deal_msg, total_r_m, total_d_m, total_f_m - - if exception: - failed_deal_msg += 1 - total_f_m += 1 - else: - deal_msg += 1 - total_d_m += 1 - - recv_msg += 1 - total_r_m += 1 - - -def get_message_deal_info() -> dict: - return MessageDealerInfo( - recv_msg=str(recv_msg), - deal_msg=str(deal_msg), - failed_deal_msg=str(failed_deal_msg), - total_r_m=str(total_r_m), - total_d_m=str(total_d_m), - total_f_m=str(total_f_m), - ).dict() - - [email protected]_job("interval", name="信息数据重置", seconds=15, misfire_grace_time=1) # type: ignore -async def _(): - global recv_msg, deal_msg, failed_deal_msg - recv_msg = int() - deal_msg = int() - failed_deal_msg = int() diff --git a/ATRI/plugins/status/models.py b/ATRI/plugins/status/models.py deleted file mode 100644 index 9026b2d..0000000 --- a/ATRI/plugins/status/models.py +++ /dev/null @@ -1,32 +0,0 @@ -from pydantic import BaseModel - - -class ForAuthData(BaseModel): - ip: str - port: str - token: str - - -class PlatfromRuntimeInfo(BaseModel): - stat_msg: str - cpu_percent: str - mem_percent: str - disk_percent: str - inte_send: str - inte_recv: str - boot_time: str - - -class BotRuntimeInfo(BaseModel): - cpu_percent: str - mem_percent: str - bot_run_time: str - - -class MessageDealerInfo(BaseModel): - recv_msg: str - deal_msg: str - failed_deal_msg: str - total_r_m: str - total_d_m: str - total_f_m: str |