diff options
author | Kyomotoi <[email protected]> | 2022-05-13 23:59:19 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2022-05-13 23:59:19 +0800 |
commit | 817b55b84ab65e0404b15195bed97a46d977f3b9 (patch) | |
tree | b957216155b870eebd8a81f9a4d6cc18b59e97da /ATRI/plugins/status/__init__.py | |
parent | 7042d94213532191a0c72d34d7c85193184c079f (diff) | |
download | ATRI-817b55b84ab65e0404b15195bed97a46d977f3b9.tar.gz ATRI-817b55b84ab65e0404b15195bed97a46d977f3b9.tar.bz2 ATRI-817b55b84ab65e0404b15195bed97a46d977f3b9.zip |
🚚 独立控制台
Diffstat (limited to 'ATRI/plugins/status/__init__.py')
-rw-r--r-- | ATRI/plugins/status/__init__.py | 210 |
1 files changed, 4 insertions, 206 deletions
diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py index 9aa5364..ae7702a 100644 --- a/ATRI/plugins/status/__init__.py +++ b/ATRI/plugins/status/__init__.py @@ -1,25 +1,13 @@ -import json - -from nonebot.params import ArgPlainText -from nonebot.permission import SUPERUSER -from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent - from ATRI.log import logger as log -from ATRI.config import BotSelfConfig -from ATRI.exceptions import ReadFileError, WriteError from ATRI.utils.apscheduler import scheduler -from .data_source import Status, STATUS_DIR -from .models import ForAuthData -from .driver import init - -init() +from .data_source import Status ping = Status().on_command("/ping", "检测bot简单信息处理速度") @ping.handle() -async def _ping(): +async def _(): await ping.finish(Status.ping()) @@ -27,7 +15,7 @@ status = Status().on_command("/status", "查看运行资源占用") @status.handle() -async def _status(): +async def _(): msg, _ = Status.get_status() await status.finish(msg) @@ -36,198 +24,8 @@ info_msg = "アトリは高性能ですから!" @scheduler.scheduled_job("interval", name="状态检查", minutes=10, misfire_grace_time=15) # type: ignore -async def _check_runtime(): +async def _(): log.info("开始检查资源消耗...") msg, stat = Status().get_status() if not stat: await status.finish(msg) - - -get_console_key = Status().on_command("/auth", "获取进入网页后台的凭证", permission=SUPERUSER) - - -@get_console_key.got("is_pub_n", "咱的运行环境是否有公网(y/n)") -async def _(event: PrivateMessageEvent, is_pub_n: str = ArgPlainText("is_pub_n")): - if is_pub_n != "y": - ip = str(await Status().get_host_ip(False)) - await get_console_key.send("没有公网吗...嗯知道了") - else: - ip = str(await Status().get_host_ip(True)) - - p = BotSelfConfig.port - rs = Status().get_random_str(20) - - df = STATUS_DIR / "data.json" - try: - if not df.is_file(): - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - d = json.loads(df.read_bytes()) - - ca = d.get("data", None) - if ca: - # 此处原本想用 matcher.finish 但这是在 try 里啊! - await get_console_key.send("咱已经告诉你了嗷!啊!忘了.../gauth 获取吧") - return - - d["data"] = ForAuthData(ip=ip, port=str(p), token=rs).dict() - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(d)) - except WriteError: - msg = f""" - 哦吼!写入文件失败了...还请自行记下哦... - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await get_console_key.send(msg) - - raise WriteError("Writing file: " + str(df) + " failed!") - - msg = f""" - 该信息已保存!可通过 /gauth 获取~ - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await get_console_key.finish(msg) - - -@get_console_key.handle() -async def _(event: GroupMessageEvent): - await get_console_key.finish("请私戳咱获取(") - - -load_console_key = Status().on_command("/gauth", "获取已生成的后台凭证", permission=SUPERUSER) - - -@load_console_key.handle() -async def _(event: PrivateMessageEvent): - df = STATUS_DIR / "data.json" - if not df.is_file(): - await load_console_key.finish("你还没有问咱索要奥!/auth 以获取") - - try: - d = json.loads(df.read_bytes()) - except ReadFileError: - await load_console_key.send("获取数据失败了...请自行打开文件查看吧:\n" + str(df)) - raise ReadFileError("Reading file: " + str(df) + " failed!") - - data = d["data"] - msg = f""" - 诶嘿嘿嘿——凭证信息来咯! - IP: {data['ip']} - PORT: {data['port']} - TOKEN: {data['token']} - 切记!不要告诉他人!! - """.strip() - await load_console_key.finish(msg) - - -@load_console_key.handle() -async def _(event: GroupMessageEvent): - await load_console_key.finish("请私戳咱获取(") - - -del_console_key = Status().on_command("/deauth", "销毁进入网页后台的凭证", permission=SUPERUSER) - - -@del_console_key.got("is_sure_d", "...你确定吗(y/n)") -async def _(is_sure: str = ArgPlainText("is_sure_d")): - if is_sure != "y": - await del_console_key.finish("反悔了呢...") - - df = STATUS_DIR / "data.json" - if not df.is_file(): - await del_console_key.finish("你还没向咱索取凭证呢.../auth 以获取") - - try: - data: dict = json.loads(df.read_bytes()) - - del data["data"] - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(data)) - except WriteError: - await del_console_key.send("销毁失败了...请至此处自行删除文件:\n" + str(df)) - raise WriteError("Writing / Reading file: " + str(df) + " failed!") - - await del_console_key.finish("销毁成功!如需再次获取: /auth") - - -res_console_key = Status().on_command("/reauth", "重置进入网页后台的凭证", permission=SUPERUSER) - - -@res_console_key.got("is_sure_r", "...你确定吗(y/n)") -async def _(is_sure: str = ArgPlainText("is_sure_r")): - if is_sure != "y": - await res_console_key.finish("反悔了呢...") - - df = STATUS_DIR / "data.json" - if not df.is_file(): - await del_console_key.finish("你还没向咱索取凭证呢.../auth 以获取") - - try: - data: dict = json.loads(df.read_bytes()) - - del data["data"] - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(data)) - except WriteError: - await del_console_key.send("销毁失败了...请至此处自行删除文件:\n" + str(df)) - raise WriteError("Writing / Reading file: " + str(df) + " failed!") - - -@res_console_key.got("is_pub_r_n", "咱的运行环境是否有公网(y/n)") -async def _(event: PrivateMessageEvent, is_pub_n: str = ArgPlainText("is_pub_n")): - if is_pub_n != "y": - ip = str(await Status().get_host_ip(False)) - await res_console_key.send("没有公网吗...嗯知道了") - else: - ip = str(await Status().get_host_ip(True)) - - p = BotSelfConfig.port - rs = Status().get_random_str(20) - - df = STATUS_DIR / "data.json" - try: - if not df.is_file(): - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - d = json.loads(df.read_bytes()) - - ca = d.get("data", None) - if ca: - await res_console_key.send("咱已经告诉你了嗷!啊!忘了.../gauth 获取吧") - return - - d["data"] = ForAuthData(ip=ip, port=str(p), token=rs).dict() - - with open(df, "w", encoding="utf-8") as w: - w.write(json.dumps(d)) - except WriteError: - msg = f""" - 哦吼!写入文件失败了...还请自行记下哦... - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await res_console_key.send(msg) - - raise WriteError("Writing file: " + str(df) + " failed!") - - msg = f""" - 该信息已保存!可通过 /gauth 获取~ - IP: {ip} - PORT: {p} - TOKEN: {rs} - 一定要保管好哦!切勿告诉他人哦! - """.strip() - await res_console_key.finish(msg) |