summaryrefslogtreecommitdiff
path: root/ATRI
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI')
-rw-r--r--ATRI/plugins/status/__init__.py210
-rw-r--r--ATRI/plugins/status/data_source.py79
-rw-r--r--ATRI/plugins/status/driver/__init__.py41
-rw-r--r--ATRI/plugins/status/driver/view.py39
-rw-r--r--ATRI/plugins/status/listener.py49
-rw-r--r--ATRI/plugins/status/models.py32
6 files changed, 10 insertions, 440 deletions
diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py
index 9aa5364..ae7702a 100644
--- a/ATRI/plugins/status/__init__.py
+++ b/ATRI/plugins/status/__init__.py
@@ -1,25 +1,13 @@
-import json
-
-from nonebot.params import ArgPlainText
-from nonebot.permission import SUPERUSER
-from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent
-
from ATRI.log import logger as log
-from ATRI.config import BotSelfConfig
-from ATRI.exceptions import ReadFileError, WriteError
from ATRI.utils.apscheduler import scheduler
-from .data_source import Status, STATUS_DIR
-from .models import ForAuthData
-from .driver import init
-
-init()
+from .data_source import Status
ping = Status().on_command("/ping", "检测bot简单信息处理速度")
@ping.handle()
-async def _ping():
+async def _():
await ping.finish(Status.ping())
@@ -27,7 +15,7 @@ status = Status().on_command("/status", "查看运行资源占用")
@status.handle()
-async def _status():
+async def _():
msg, _ = Status.get_status()
await status.finish(msg)
@@ -36,198 +24,8 @@ info_msg = "アトリは高性能ですから!"
@scheduler.scheduled_job("interval", name="状态检查", minutes=10, misfire_grace_time=15) # type: ignore
-async def _check_runtime():
+async def _():
log.info("开始检查资源消耗...")
msg, stat = Status().get_status()
if not stat:
await status.finish(msg)
-
-
-get_console_key = Status().on_command("/auth", "获取进入网页后台的凭证", permission=SUPERUSER)
-
-
-@get_console_key.got("is_pub_n", "咱的运行环境是否有公网(y/n)")
-async def _(event: PrivateMessageEvent, is_pub_n: str = ArgPlainText("is_pub_n")):
- if is_pub_n != "y":
- ip = str(await Status().get_host_ip(False))
- await get_console_key.send("没有公网吗...嗯知道了")
- else:
- ip = str(await Status().get_host_ip(True))
-
- p = BotSelfConfig.port
- rs = Status().get_random_str(20)
-
- df = STATUS_DIR / "data.json"
- try:
- if not df.is_file():
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
-
- d = json.loads(df.read_bytes())
-
- ca = d.get("data", None)
- if ca:
- # 此处原本想用 matcher.finish 但这是在 try 里啊!
- await get_console_key.send("咱已经告诉你了嗷!啊!忘了.../gauth 获取吧")
- return
-
- d["data"] = ForAuthData(ip=ip, port=str(p), token=rs).dict()
-
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps(d))
- except WriteError:
- msg = f"""
- 哦吼!写入文件失败了...还请自行记下哦...
- IP: {ip}
- PORT: {p}
- TOKEN: {rs}
- 一定要保管好哦!切勿告诉他人哦!
- """.strip()
- await get_console_key.send(msg)
-
- raise WriteError("Writing file: " + str(df) + " failed!")
-
- msg = f"""
- 该信息已保存!可通过 /gauth 获取~
- IP: {ip}
- PORT: {p}
- TOKEN: {rs}
- 一定要保管好哦!切勿告诉他人哦!
- """.strip()
- await get_console_key.finish(msg)
-
-
-@get_console_key.handle()
-async def _(event: GroupMessageEvent):
- await get_console_key.finish("请私戳咱获取(")
-
-
-load_console_key = Status().on_command("/gauth", "获取已生成的后台凭证", permission=SUPERUSER)
-
-
-@load_console_key.handle()
-async def _(event: PrivateMessageEvent):
- df = STATUS_DIR / "data.json"
- if not df.is_file():
- await load_console_key.finish("你还没有问咱索要奥!/auth 以获取")
-
- try:
- d = json.loads(df.read_bytes())
- except ReadFileError:
- await load_console_key.send("获取数据失败了...请自行打开文件查看吧:\n" + str(df))
- raise ReadFileError("Reading file: " + str(df) + " failed!")
-
- data = d["data"]
- msg = f"""
- 诶嘿嘿嘿——凭证信息来咯!
- IP: {data['ip']}
- PORT: {data['port']}
- TOKEN: {data['token']}
- 切记!不要告诉他人!!
- """.strip()
- await load_console_key.finish(msg)
-
-
-@load_console_key.handle()
-async def _(event: GroupMessageEvent):
- await load_console_key.finish("请私戳咱获取(")
-
-
-del_console_key = Status().on_command("/deauth", "销毁进入网页后台的凭证", permission=SUPERUSER)
-
-
-@del_console_key.got("is_sure_d", "...你确定吗(y/n)")
-async def _(is_sure: str = ArgPlainText("is_sure_d")):
- if is_sure != "y":
- await del_console_key.finish("反悔了呢...")
-
- df = STATUS_DIR / "data.json"
- if not df.is_file():
- await del_console_key.finish("你还没向咱索取凭证呢.../auth 以获取")
-
- try:
- data: dict = json.loads(df.read_bytes())
-
- del data["data"]
-
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps(data))
- except WriteError:
- await del_console_key.send("销毁失败了...请至此处自行删除文件:\n" + str(df))
- raise WriteError("Writing / Reading file: " + str(df) + " failed!")
-
- await del_console_key.finish("销毁成功!如需再次获取: /auth")
-
-
-res_console_key = Status().on_command("/reauth", "重置进入网页后台的凭证", permission=SUPERUSER)
-
-
-@res_console_key.got("is_sure_r", "...你确定吗(y/n)")
-async def _(is_sure: str = ArgPlainText("is_sure_r")):
- if is_sure != "y":
- await res_console_key.finish("反悔了呢...")
-
- df = STATUS_DIR / "data.json"
- if not df.is_file():
- await del_console_key.finish("你还没向咱索取凭证呢.../auth 以获取")
-
- try:
- data: dict = json.loads(df.read_bytes())
-
- del data["data"]
-
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps(data))
- except WriteError:
- await del_console_key.send("销毁失败了...请至此处自行删除文件:\n" + str(df))
- raise WriteError("Writing / Reading file: " + str(df) + " failed!")
-
-
-@res_console_key.got("is_pub_r_n", "咱的运行环境是否有公网(y/n)")
-async def _(event: PrivateMessageEvent, is_pub_n: str = ArgPlainText("is_pub_n")):
- if is_pub_n != "y":
- ip = str(await Status().get_host_ip(False))
- await res_console_key.send("没有公网吗...嗯知道了")
- else:
- ip = str(await Status().get_host_ip(True))
-
- p = BotSelfConfig.port
- rs = Status().get_random_str(20)
-
- df = STATUS_DIR / "data.json"
- try:
- if not df.is_file():
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
-
- d = json.loads(df.read_bytes())
-
- ca = d.get("data", None)
- if ca:
- await res_console_key.send("咱已经告诉你了嗷!啊!忘了.../gauth 获取吧")
- return
-
- d["data"] = ForAuthData(ip=ip, port=str(p), token=rs).dict()
-
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps(d))
- except WriteError:
- msg = f"""
- 哦吼!写入文件失败了...还请自行记下哦...
- IP: {ip}
- PORT: {p}
- TOKEN: {rs}
- 一定要保管好哦!切勿告诉他人哦!
- """.strip()
- await res_console_key.send(msg)
-
- raise WriteError("Writing file: " + str(df) + " failed!")
-
- msg = f"""
- 该信息已保存!可通过 /gauth 获取~
- IP: {ip}
- PORT: {p}
- TOKEN: {rs}
- 一定要保管好哦!切勿告诉他人哦!
- """.strip()
- await res_console_key.finish(msg)
diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py
index 1658468..ddd3568 100644
--- a/ATRI/plugins/status/data_source.py
+++ b/ATRI/plugins/status/data_source.py
@@ -1,24 +1,13 @@
import os
import time
-import json
import psutil
-import socket
-import string
-from pathlib import Path
-from random import sample
from datetime import datetime
from ATRI.service import Service
-from ATRI.log import logger as log
from ATRI.rule import is_in_service
-from ATRI.utils import request
-from ATRI.exceptions import GetStatusError, WriteError
-from .models import PlatfromRuntimeInfo, BotRuntimeInfo
+from ATRI.exceptions import GetStatusError
-STATUS_DIR = Path(".") / "data" / "database" / "status"
-STATUS_DIR.mkdir(exist_ok=True)
-
_status_msg = """
> Status Overview
@@ -44,15 +33,13 @@ class Status(Service):
return "I'm fine."
@staticmethod
- def get_status(is_for_fn: bool = False) -> tuple:
- data_p = Path(".") / "data"
-
+ def get_status() -> tuple:
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
- inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
- inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+ inte_send = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
+ inte_recv = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
process = psutil.Process(os.getpid())
b_cpu = process.cpu_percent(interval=1)
@@ -88,71 +75,17 @@ class Status(Service):
else:
is_ok = True
- if is_for_fn:
- return (
- PlatfromRuntimeInfo(
- stat_msg=msg,
- cpu_percent=str(cpu),
- mem_percent=mem,
- disk_percent=str(disk),
- inte_send=str(inteSENT),
- inte_recv=str(inteRECV),
- boot_time=boot_time,
- ).dict(),
- BotRuntimeInfo(
- cpu_percent=str(b_cpu),
- mem_percent=str(b_mem),
- bot_run_time=bot_time,
- ).dict(),
- )
-
msg0 = _status_msg.format(
p_cpu=cpu,
p_mem=mem,
p_disk=disk,
b_cpu=f"{b_cpu}%",
b_mem="%.1f%%" % b_mem,
- inteSENT=inteSENT,
- inteRECV=inteRECV,
+ inteSENT=inte_send,
+ inteRECV=inte_recv,
bot_time=bot_time,
boot_time=boot_time,
msg=msg,
)
return msg0, is_ok
-
- @staticmethod
- async def get_host_ip(is_pub: bool):
- if is_pub:
- data = await request.get("https://ifconfig.me/ip")
- return data.text
-
- s = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('8.8.8.8', 80))
- ip = s.getsockname()[0]
- return ip
- finally:
- if s:
- s.close()
-
- @staticmethod
- def get_random_str(k: int) -> str:
- return "".join(sample(string.ascii_letters + string.digits, k))
-
- @staticmethod
- def get_auth_info() -> dict:
- df = STATUS_DIR / "data.json"
- if not df.is_file():
- try:
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
- except WriteError:
- raise WriteError("Writing file: " + str(df) + " failed!")
-
- base_data: dict = json.loads(df.read_bytes())
- data = base_data.get("data", None)
- if not data:
- return {"data": None}
- return data
diff --git a/ATRI/plugins/status/driver/__init__.py b/ATRI/plugins/status/driver/__init__.py
deleted file mode 100644
index 199fb1e..0000000
--- a/ATRI/plugins/status/driver/__init__.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from nonebot import get_driver
-from nonebot.drivers.fastapi import Driver
-
-from fastapi.middleware.cors import CORSMiddleware
-
-from .view import (
- handle_auther,
- handle_base_uri,
- handle_runtime_info,
- handle_message_deal_info,
-)
-
-
-CONSOLE_API_URI = "/capi" # base point
-CONSOLE_API_AUTH_URI = "/capi/auth" # 验证后台许可
-CONSOLE_API_RUNTIME_URI = "/capi/runtime" # 获取运行占用信息
-CONSOLE_API_MESSAGE_URI = "/capi/message"
-# CONSOLE_API_AUTH_COOKIES_URI = "/capi/auth/cookies" # 验证cookies
-
-
-def register_routes(driver: Driver):
- app = driver.server_app
-
- origins = ["*"]
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
-
- app.get(CONSOLE_API_URI)(handle_base_uri)
- app.get(CONSOLE_API_RUNTIME_URI)(handle_runtime_info)
- app.get(CONSOLE_API_MESSAGE_URI)(handle_message_deal_info)
- app.get(CONSOLE_API_AUTH_URI)(handle_auther)
-
-
-def init():
- driver = get_driver()
- register_routes(driver) # type: ignore
diff --git a/ATRI/plugins/status/driver/view.py b/ATRI/plugins/status/driver/view.py
deleted file mode 100644
index 5d8c56f..0000000
--- a/ATRI/plugins/status/driver/view.py
+++ /dev/null
@@ -1,39 +0,0 @@
-from ..data_source import Status
-from ..listener import get_message_deal_info
-
-
-def handle_base_uri():
- return {"status": 204, "msg": "This path just for console load."}
-
-
-def handle_runtime_info(token: str):
- auth, data = auth_token(token)
- plat, bot = Status().get_status(True)
- if auth:
- return {"status": 200, "data": {"platform": plat, "bot": bot}}
- else:
- return data
-
-
-def handle_message_deal_info(token: str):
- auth, data = auth_token(token)
- if auth:
- return {"status": 200, "data": get_message_deal_info()}
- else:
- return data
-
-
-def handle_auther(token: str):
- auth, data = auth_token(token)
- return data if auth else data
-
-
-def auth_token(token: str) -> tuple:
- auth_data: dict = Status().get_auth_info()
- if not auth_data.get("token", None):
- return False, {"status": 500, "msg": "This bot is not create auth data yet."}
- _token = auth_data["token"]
- if token != _token:
- return False, {"status": 403, "msg": "Token error, please check again."}
- else:
- return True, {"status": 200, "msg": "OK"}
diff --git a/ATRI/plugins/status/listener.py b/ATRI/plugins/status/listener.py
deleted file mode 100644
index 0ae5c25..0000000
--- a/ATRI/plugins/status/listener.py
+++ /dev/null
@@ -1,49 +0,0 @@
-from typing import Optional
-
-from nonebot.message import run_postprocessor
-
-from ATRI.utils.apscheduler import scheduler
-from .models import MessageDealerInfo
-
-
-recv_msg = int()
-deal_msg = int()
-failed_deal_msg = int()
-
-total_r_m = int()
-total_d_m = int()
-total_f_m = int()
-
-
-@run_postprocessor
-async def _(exception: Optional[Exception]):
- global recv_msg, deal_msg, failed_deal_msg, total_r_m, total_d_m, total_f_m
-
- if exception:
- failed_deal_msg += 1
- total_f_m += 1
- else:
- deal_msg += 1
- total_d_m += 1
-
- recv_msg += 1
- total_r_m += 1
-
-
-def get_message_deal_info() -> dict:
- return MessageDealerInfo(
- recv_msg=str(recv_msg),
- deal_msg=str(deal_msg),
- failed_deal_msg=str(failed_deal_msg),
- total_r_m=str(total_r_m),
- total_d_m=str(total_d_m),
- total_f_m=str(total_f_m),
- ).dict()
-
-
[email protected]_job("interval", name="信息数据重置", seconds=15, misfire_grace_time=1) # type: ignore
-async def _():
- global recv_msg, deal_msg, failed_deal_msg
- recv_msg = int()
- deal_msg = int()
- failed_deal_msg = int()
diff --git a/ATRI/plugins/status/models.py b/ATRI/plugins/status/models.py
deleted file mode 100644
index 9026b2d..0000000
--- a/ATRI/plugins/status/models.py
+++ /dev/null
@@ -1,32 +0,0 @@
-from pydantic import BaseModel
-
-
-class ForAuthData(BaseModel):
- ip: str
- port: str
- token: str
-
-
-class PlatfromRuntimeInfo(BaseModel):
- stat_msg: str
- cpu_percent: str
- mem_percent: str
- disk_percent: str
- inte_send: str
- inte_recv: str
- boot_time: str
-
-
-class BotRuntimeInfo(BaseModel):
- cpu_percent: str
- mem_percent: str
- bot_run_time: str
-
-
-class MessageDealerInfo(BaseModel):
- recv_msg: str
- deal_msg: str
- failed_deal_msg: str
- total_r_m: str
- total_d_m: str
- total_f_m: str