diff options
author | Kyomotoi <0w0@imki.moe> | 2023-03-10 01:12:02 +0800 |
---|---|---|
committer | Kyomotoi <0w0@imki.moe> | 2023-03-10 01:12:02 +0800 |
commit | 7c79376f40e1a23b2bc82b8b89957b6f42a73b14 (patch) | |
tree | 0077f2f1e079573ed7d66c2bcadf873d3460a182 | |
parent | 33178079549121d60a9703ad6bf584aa71f5481b (diff) | |
download | ATRI-7c79376f40e1a23b2bc82b8b89957b6f42a73b14.tar.gz ATRI-7c79376f40e1a23b2bc82b8b89957b6f42a73b14.tar.bz2 ATRI-7c79376f40e1a23b2bc82b8b89957b6f42a73b14.zip |
♻️ 重构 console, 使之代码可读性提高
-rw-r--r-- | ATRI/plugins/console/__init__.py | 9 | ||||
-rw-r--r-- | ATRI/plugins/console/backend/__init__.py | 20 | ||||
-rw-r--r-- | ATRI/plugins/console/backend/api.py | 211 | ||||
-rw-r--r-- | ATRI/plugins/console/backend/models.py | 15 | ||||
-rw-r--r-- | ATRI/plugins/console/data_source.py | 36 | ||||
-rw-r--r-- | ATRI/plugins/console/driver/__init__.py | 53 | ||||
-rw-r--r-- | ATRI/plugins/console/driver/api.py | 92 | ||||
-rw-r--r-- | ATRI/plugins/console/driver/data_source.py | 167 | ||||
-rw-r--r-- | ATRI/plugins/console/driver/depends.py | 38 | ||||
-rw-r--r-- | ATRI/plugins/console/driver/models.py | 38 | ||||
-rw-r--r-- | ATRI/plugins/console/driver/path.py | 14 | ||||
-rw-r--r-- | ATRI/plugins/console/listener.py | 70 |
12 files changed, 309 insertions, 454 deletions
diff --git a/ATRI/plugins/console/__init__.py b/ATRI/plugins/console/__init__.py index 3711f86..39fcc26 100644 --- a/ATRI/plugins/console/__init__.py +++ b/ATRI/plugins/console/__init__.py @@ -102,10 +102,9 @@ async def _(is_sure: str = ArgPlainText("is_sure_d")): await del_console_key.finish("销毁成功!如需再次获取: /con.auth") -from ATRI import driver as dr -from .data_source import init_resource -from .driver import init_driver +import nonebot +from .backend import app -dr().on_startup(init_resource) -dr().on_startup(init_driver) +driver = nonebot.get_app() +driver.mount("/", app, name="test") diff --git a/ATRI/plugins/console/backend/__init__.py b/ATRI/plugins/console/backend/__init__.py new file mode 100644 index 0000000..4fb5a37 --- /dev/null +++ b/ATRI/plugins/console/backend/__init__.py @@ -0,0 +1,20 @@ +from fastapi import FastAPI + +# from fastapi.staticfiles import StaticFiles +from fastapi.middleware.cors import CORSMiddleware + +from .api import router as api_router + +# from ..data_source import FRONTEND_DIR + + +app = FastAPI( + title="Console for ATRI", + description="A admin UI controller for ATRI", +) + +app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["GET"]) + +app.include_router(api_router, prefix="/capi") + +# app.mount("/", StaticFiles(directory=FRONTEND_DIR, html=True), name="Console for ATRI") diff --git a/ATRI/plugins/console/backend/api.py b/ATRI/plugins/console/backend/api.py new file mode 100644 index 0000000..ffb1853 --- /dev/null +++ b/ATRI/plugins/console/backend/api.py @@ -0,0 +1,211 @@ +import os +import asyncio +from typing import Union +from datetime import datetime + +from fastapi import APIRouter, status, Depends, Query, HTTPException +from fastapi.websockets import WebSocket, WebSocketState +from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError + +from ATRI.utils import machine +from ATRI.service import SERVICES_DIR, ServiceInfo, ServiceTools + +from ..data_source import * +from ..listener import get_message_info +from . import models + + +def _author(token: Union[str, None] = Query(default=None)): + data = AuthDealer.get() + if data is None: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="缺少信息") + + now_time = datetime.now().timestamp() + if token != data.token: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="密钥不正确") + elif now_time > data.dead_time: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="密钥已过期") + else: + return "OK" + + +router = APIRouter(tags=["capi"], dependencies=[Depends(_author)]) + + +@router.get("/", response_model=models.Response) +async def _(): + return models.Response(status=status.HTTP_200_OK, detail="控制台 API 路径", data=dict()) + + +@router.get("/auth", response_model=models.Response) +async def _(): + return models.Response(status=status.HTTP_200_OK, detail="OK", data=dict()) + + +@router.websocket("/status") +async def _(websocket: WebSocket): + await websocket.accept() + + try: + while websocket.client_state == WebSocketState.CONNECTED: + await websocket.send_json( + models.Response( + status=status.HTTP_200_OK, + detail="OK", + data=models.StatusInfo( + platform=machine.get_platform_info().dict(), + cpu=machine.get_cpu_info().dict(), + mem=machine.get_mem_info().dict(), + disk=machine.get_disk_info().dict(), + net=machine.get_net_info().dict(), + ).dict(), + ).dict() + ) + await asyncio.sleep(2) + except ConnectionClosedOK: + pass + except ConnectionClosedError: + pass + finally: + await websocket.close() + return + + +@router.websocket("/status/message") +async def _(websocket: WebSocket): + await websocket.accept() + + try: + while websocket.client_state == WebSocketState.CONNECTED: + await websocket.send_json( + models.Response( + status=status.HTTP_200_OK, + detail="OK", + data=get_message_info().dict() + ).dict() + ) + await asyncio.sleep(1) + except ConnectionClosedOK: + pass + except ConnectionClosedError: + pass + finally: + await websocket.close() + return + + +@router.get("/service/list", response_model=models.Response) +async def _(): + result = dict() + files = os.listdir(SERVICES_DIR) + for f in files: + if f == ".DS_Store": + continue + serv_path = SERVICES_DIR / f + data = ServiceInfo.parse_file(serv_path) + result[data.service] = data.dict() + + return models.Response(status=status.HTTP_200_OK, detail="OK", data=result) + + +@router.get("/service/edit", response_model=models.Response) +async def _( + service: str, + enabled: int, + for_global: int = 1, + user_id: int = int(), + group_id: int = int(), +): + msg = "OK" + data = ServiceTools(service).load_service() + if enabled and for_global: + data.enabled = True + elif not enabled and for_global: + data.enabled = False + + if user_id or group_id: + if enabled: + if user_id not in data.disable_user: + msg = "用户不存在于禁用名单" + else: + data.disable_user.remove(user_id) + + if group_id not in data.disable_group: + msg = "群不存在于禁用名单" + else: + data.disable_group.remove(group_id) + else: + if user_id in data.disable_user: + msg = "用户已存在于禁用名单" + else: + data.disable_user.append(user_id) + + if group_id in data.disable_group: + msg = "群已存在于禁用名单" + else: + data.disable_group.append(group_id) + + ServiceTools(service).save_service(data.dict()) + + return models.Response(status=status.HTTP_200_OK, detail=msg, data=dict()) + + +def _get_block_list(): + # 该处有一 typo + file_dir = Path(".") / "data" / "plugins" / "manege" + path = file_dir / "block_user.json" + user_data = json.loads(path.read_bytes()) + + path = file_dir / "block_group.json" + group_data = json.loads(path.read_bytes()) + + return {"user": user_data, "group": group_data} + + +@router.get("/block/list", response_model=models.Response) +async def _(): + return models.Response( + status=status.HTTP_200_OK, detail="OK", data=_get_block_list() + ) + + +@router.get("/block/edit", response_model=models.Response) +async def _(enabled: int, user_id: int = int(), group_id: int = int()): + data = _get_block_list() + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + msg = "OK" + if enabled: + if user_id: + if user_id in data["user"]: + msg = "用户已存在于黑名单" + else: + data["user"][user_id] = now_time + if group_id: + if group_id in data["group"]: + msg = "群已存在于黑名单" + else: + data["group"][group_id] = now_time + else: + if user_id: + if user_id not in data["user"]: + msg = "用户不存在于黑名单" + else: + del data["user"][user_id] + if group_id: + if group_id not in data["group"]: + msg = "群不存在于黑名单" + else: + del data["group"][group_id] + + file_dir = Path(".") / "data" / "plugins" / "manege" + path = file_dir / "block_user.json" + await FileDealer(path).write(json.dumps(data["user"])) + + path = file_dir / "block_group.json" + await FileDealer(path).write(json.dumps(data["group"])) + + return models.Response( + status=status.HTTP_200_OK, + detail=msg, + data=data, + ) diff --git a/ATRI/plugins/console/backend/models.py b/ATRI/plugins/console/backend/models.py new file mode 100644 index 0000000..e7aeb54 --- /dev/null +++ b/ATRI/plugins/console/backend/models.py @@ -0,0 +1,15 @@ +from pydantic import BaseModel + + +class Response(BaseModel): + status: int + detail: str + data: dict + + +class StatusInfo(BaseModel): + platform: dict + cpu: dict + mem: dict + disk: dict + net: dict diff --git a/ATRI/plugins/console/data_source.py b/ATRI/plugins/console/data_source.py index b61ab10..55fd0e8 100644 --- a/ATRI/plugins/console/data_source.py +++ b/ATRI/plugins/console/data_source.py @@ -68,27 +68,27 @@ async def get_host_ip(is_pub: bool): s.close() -FRONTEND_DIR = CONSOLE_DIR / "frontend" -FRONTEND_DIR.mkdir(parents=True, exist_ok=True) -__CONSOLE_RESOURCE_URL = ( - "https://guc.imki.moe/kyomotoi/Project-ATRI-Console/main/archive/dist.zip" -) +# FRONTEND_DIR = CONSOLE_DIR / "frontend" +# FRONTEND_DIR.mkdir(parents=True, exist_ok=True) +# __CONSOLE_RESOURCE_URL = ( +# "https://guc.imki.moe/kyomotoi/Project-ATRI-Console/main/archive/dist.zip" +# ) -async def init_resource(): - log.info("控制台初始化中...") +# async def init_resource(): +# log.info("控制台初始化中...") - try: - resp = await request.get(__CONSOLE_RESOURCE_URL) - except Exception: - log.error("控制台资源装载失败, 将无法访问管理界面") - return +# try: +# resp = await request.get(__CONSOLE_RESOURCE_URL) +# except Exception: +# log.error("控制台资源装载失败, 将无法访问管理界面") +# return - file_path = CONSOLE_DIR / "dist.zip" - with open(file_path, "wb") as w: - w.write(resp.read()) +# file_path = CONSOLE_DIR / "dist.zip" +# with open(file_path, "wb") as w: +# w.write(resp.read()) - with zipfile.ZipFile(file_path, "r") as zr: - zr.extractall(FRONTEND_DIR) +# with zipfile.ZipFile(file_path, "r") as zr: +# zr.extractall(FRONTEND_DIR) - log.success("控制台初始化完成") +# log.success("控制台初始化完成") diff --git a/ATRI/plugins/console/driver/__init__.py b/ATRI/plugins/console/driver/__init__.py deleted file mode 100644 index b6b9f89..0000000 --- a/ATRI/plugins/console/driver/__init__.py +++ /dev/null @@ -1,53 +0,0 @@ -from nonebot.drivers.fastapi import Driver - -from fastapi.staticfiles import StaticFiles -from fastapi.middleware.cors import CORSMiddleware - -from ATRI import conf -from ATRI.log import log - -from .path import * -from .api import * - -from ..data_source import FRONTEND_DIR - - -def register_driver(driver: Driver): - app = driver.server_app - - origins = ["*"] - app.add_middleware( - CORSMiddleware, - allow_origins=origins, - allow_credentials=True, - allow_methods=["GET", "POST"], - allow_headers=["Content-Type"], - ) - - app.get(CONSOLE_BASE_URL)(base_url) - - app.get(CONSOLE_AUTH_URL)(auth_info) - - app.websocket(CONSOLE_RUNTIME_INFO_URL)(runtime_info) - app.websocket(CONSOLE_MESSAGE_INFO_URL)(message_info) - - app.get(CONSOLE_SERVICE_LIST_URL)(service_list) - app.get(CONSOLE_SERVICE_EDIT_URL)(edit_service) - - app.get(CONSOLE_BLOCK_LIST_URL)(block_list_info) - app.get(CONSOLE_BLOCK_EDIT_URL)(edit_block_list) - - static_path = str(FRONTEND_DIR) - app.mount( - "/", - StaticFiles(directory=static_path, html=True), - name="atri-console", - ) - - -def init_driver(): - from ATRI import driver - - register_driver(driver()) # type: ignore - c_url = f"{conf.BotConfig.host}:{conf.BotConfig.port}" - log.success(f"控制台将运行于: http://{c_url} 对应API节点为: /capi") diff --git a/ATRI/plugins/console/driver/api.py b/ATRI/plugins/console/driver/api.py deleted file mode 100644 index 668a308..0000000 --- a/ATRI/plugins/console/driver/api.py +++ /dev/null @@ -1,92 +0,0 @@ -import asyncio - -from fastapi import Depends, status -from starlette.websockets import WebSocket, WebSocketState -from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError - -from .depends import * -from .data_source import ( - get_process_info, - get_service_list, - edit_service as _edit_service, - get_block_list, - edit_block_list as _edit_block_list, -) -from ..listener import get_message_info - - -def base_url(_=Depends(http_author)): - return {"status": status.HTTP_204_NO_CONTENT, "msg": "该路径仅供控制台加载"} - - -def auth_info(_=Depends(http_author)): - return {"status": status.HTTP_200_OK, "detail": "OK"} - - -async def runtime_info(websocket: WebSocket, _pass=Depends(websocket_author)): - if not _pass: - return - await websocket.accept() - - try: - while websocket.client_state == WebSocketState.CONNECTED: - await websocket.send_json(get_process_info()) - await asyncio.sleep(1) - except ConnectionClosedOK: - pass - except ConnectionClosedError: - pass - finally: - await websocket.close() - return - - -async def message_info(websocket: WebSocket, _pass=Depends(websocket_author)): - if not _pass: - return - await websocket.accept() - - try: - while websocket.client_state == WebSocketState.CONNECTED: - await websocket.send_json(get_message_info()) - await asyncio.sleep(1) - except ConnectionClosedOK: - pass - except ConnectionClosedError: - pass - finally: - await websocket.close() - return - - -def service_list(_=Depends(http_author)): - return {"status": status.HTTP_200_OK, "data": get_service_list()} - - -def edit_service( - service: str, - global_enabled: str = "2", - enabled: str = "2", - user: str = str(), - group: str = str(), - _=Depends(http_author), -): - return { - "status": status.HTTP_200_OK, - "data": _edit_service( - service, int(global_enabled), bool(int(enabled)), user, group - ), - } - - -def block_list_info(_=Depends(http_author)): - return {"status": status.HTTP_200_OK, "data": get_block_list()} - - -async def edit_block_list( - enabled: str, user_id: str = str(), group_id: str = str(), _=Depends(http_author) -): - return { - "status": status.HTTP_200_OK, - "data": await _edit_block_list(bool(int(enabled)), user_id, group_id), - } diff --git a/ATRI/plugins/console/driver/data_source.py b/ATRI/plugins/console/driver/data_source.py deleted file mode 100644 index 45b07e6..0000000 --- a/ATRI/plugins/console/driver/data_source.py +++ /dev/null @@ -1,167 +0,0 @@ -import os -import json -import psutil -from pathlib import Path -from datetime import datetime - -from ATRI.exceptions import GetStatusError - -from ATRI.utils import FileDealer -from ATRI.service import ServiceTools, SERVICES_DIR - -from . import models - - -def get_process_info() -> dict: - try: - platform_cpu = psutil.cpu_percent(interval=1) - platform_mem = psutil.virtual_memory().percent - platform_disk = psutil.disk_usage("/").percent - platform_net_sent = str(psutil.net_io_counters().bytes_sent / 1000000) - platform_net_recv = str(psutil.net_io_counters().bytes_recv / 1000000) - - process = psutil.Process(os.getpid()) - bot_cpu = str(process.cpu_percent(interval=1)) - bot_mem = str(process.memory_percent(memtype="rss")) - - now_time = datetime.now().timestamp() - _boot_time = psutil.boot_time() - _bot_run_time = process.create_time() - boot_time = str( - datetime.utcfromtimestamp(now_time).replace(microsecond=0) - - datetime.utcfromtimestamp(_boot_time).replace(microsecond=0) - ) - bot_run_time = str( - datetime.utcfromtimestamp(now_time).replace(microsecond=0) - - datetime.utcfromtimestamp(_bot_run_time).replace(microsecond=0) - ) - except Exception: - raise GetStatusError("获取实例运行信息失败") - - stat_msg = "アトリは、高性能ですから!" - if platform_cpu > 90: - stat_msg = "咱感觉有些头晕..." - if platform_mem > 90: - stat_msg = "咱感觉有点头晕并且有点累..." - elif platform_mem > 90: - stat_msg = "咱感觉有点累..." - elif platform_disk > 90: - stat_msg = "咱感觉身体要被塞满了..." - - platform_cpu = str(platform_cpu) - platform_mem = str(platform_mem) - platform_disk = str(platform_disk) - - return models.RuntimeInfo( - status_message=stat_msg, - platform_info=models.PlatformRuntimeInfo( - cpu_percent=platform_cpu, - mem_percent=platform_mem, - disk_percent=platform_disk, - net_sent=platform_net_sent, - net_recv=platform_net_recv, - boot_time=boot_time, - ), - bot_info=models.BotRuntimeInfo( - cpu_percent=bot_cpu, mem_percent=bot_mem, run_time=bot_run_time - ), - ).dict() - - -def get_service_list() -> dict: - result = dict() - - files = os.listdir(SERVICES_DIR) - for file in files: - # Thank you, MacOS - if file == ".DS_Store": - continue - - service_path = SERVICES_DIR / file - data = models.ServiceInfo.parse_file(service_path) - result[data.service] = data.dict() - - return result - - -def edit_service( - service: str, global_enabled: int, enabled: bool, user_id: str, group_id: str -): - data = ServiceTools(service).load_service() - - if global_enabled != 2 and global_enabled: - data.enabled = bool(global_enabled) - else: - data.enabled = False - if user_id or group_id: - if enabled: - if user_id not in data.disable_user: - return {"detail": "用户不存在于禁用名单"} - else: - data.disable_user.remove(user_id) - - if group_id not in data.disable_group: - return {"detail": "群不存在于禁用名单"} - else: - data.disable_group.remove(group_id) - else: - if user_id not in data.disable_user: - data.disable_user.append(user_id) - else: - return {"detail": "用户已存在于禁用名单"} - - if group_id not in data.disable_group: - data.disable_group.append(group_id) - else: - return {"detail": "群已存在于禁用名单"} - - ServiceTools(service).save_service(data.dict()) - - return {"detail": "操作完成~"} - - -def get_block_list() -> models.BlockInfo: - file_dir = Path(".") / "data" / "plugins" / "manege" - path = file_dir / "block_user.json" - user_data = json.loads(path.read_bytes()) - - path = file_dir / "block_group.json" - group_data = json.loads(path.read_bytes()) - - return models.BlockInfo(user=user_data, group=group_data) - - -async def edit_block_list(enabled: bool, user_id: str, group_id: str): - data = get_block_list() - now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if enabled: - if user_id: - if user_id in data.user: - return {"detail": "用户已存在于黑名单"} - else: - data.user[user_id] = now_time - if group_id: - if group_id in data.group: - return {"detail": "群已存在于黑名单"} - else: - data.group[group_id] = now_time - else: - if user_id: - if user_id in data.user: - del data.user[user_id] - else: - return {"detail": "用户不存在于黑名单"} - if group_id: - if group_id in data.group: - del data.group[group_id] - else: - return {"detail": "群不存在于黑名单"} - - file_dir = Path(".") / "data" / "plugins" / "manege" - path = file_dir / "block_user.json" - await FileDealer(path).write(json.dumps(data.user)) - - path = file_dir / "block_group.json" - await FileDealer(path).write(json.dumps(data.group)) - - return {"detail": "操作完成~"} diff --git a/ATRI/plugins/console/driver/depends.py b/ATRI/plugins/console/driver/depends.py deleted file mode 100644 index 8205f44..0000000 --- a/ATRI/plugins/console/driver/depends.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import Union -from datetime import datetime - -from fastapi import Query, HTTPException, status -from starlette.websockets import WebSocket - -from ..data_source import AuthDealer - - -def http_author(token: Union[str, None] = Query(default=None)): - data = AuthDealer.get() - if data is None: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="验证信息不存在") - - now_time = datetime.now().timestamp() - if token != data.token: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="密钥不匹配, 请检查") - elif now_time > data.dead_time: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="密钥已过期") - else: - return token - - -async def websocket_author( - websocket: WebSocket, token: Union[str, None] = Query(default=None) -): - data = AuthDealer.get() - if not data: - await websocket.close(code=status.WS_1008_POLICY_VIOLATION) - return - - now_time = datetime.now().timestamp() - if token != data.token: - await websocket.close(code=status.WS_1008_POLICY_VIOLATION) - elif now_time > data.dead_time: - await websocket.close(code=status.WS_1008_POLICY_VIOLATION) - else: - return token diff --git a/ATRI/plugins/console/driver/models.py b/ATRI/plugins/console/driver/models.py deleted file mode 100644 index cbc110d..0000000 --- a/ATRI/plugins/console/driver/models.py +++ /dev/null @@ -1,38 +0,0 @@ -from pydantic import BaseModel - - -class PlatformRuntimeInfo(BaseModel): - cpu_percent: str - mem_percent: str - disk_percent: str - net_sent: str - net_recv: str - boot_time: str - - -class BotRuntimeInfo(BaseModel): - cpu_percent: str - mem_percent: str - run_time: str - - -class RuntimeInfo(BaseModel): - status_message: str - platform_info: PlatformRuntimeInfo - bot_info: BotRuntimeInfo - - -class ServiceInfo(BaseModel): - service: str - docs: str - permission: list - cmd_list: dict - enabled: bool - only_admin: bool - disable_user: list - disable_group: list - - -class BlockInfo(BaseModel): - user: dict - group: dict diff --git a/ATRI/plugins/console/driver/path.py b/ATRI/plugins/console/driver/path.py deleted file mode 100644 index 7009eb7..0000000 --- a/ATRI/plugins/console/driver/path.py +++ /dev/null @@ -1,14 +0,0 @@ -CONSOLE_BASE_URL = "/capi" - -CONSOLE_AUTH_URL = CONSOLE_BASE_URL + "/auth" - -CONSOLE_RUNTIME_INFO_URL = CONSOLE_BASE_URL + "/runtime" -CONSOLE_MESSAGE_INFO_URL = CONSOLE_BASE_URL + "/message" - -CONSOLE_SERVICE_BASE_URL = CONSOLE_BASE_URL + "/service" -CONSOLE_SERVICE_LIST_URL = CONSOLE_SERVICE_BASE_URL + "/list" -CONSOLE_SERVICE_EDIT_URL = CONSOLE_SERVICE_BASE_URL + "/edit" - -CONSOLE_BLOCK_BASE_URL = CONSOLE_BASE_URL + "/block" -CONSOLE_BLOCK_LIST_URL = CONSOLE_BLOCK_BASE_URL + "/list" -CONSOLE_BLOCK_EDIT_URL = CONSOLE_BLOCK_BASE_URL + "/edit" diff --git a/ATRI/plugins/console/listener.py b/ATRI/plugins/console/listener.py index 2c12c75..1bd553c 100644 --- a/ATRI/plugins/console/listener.py +++ b/ATRI/plugins/console/listener.py @@ -1,49 +1,61 @@ from typing import Optional +from pydantic import BaseModel from nonebot.message import run_postprocessor from ATRI.utils.apscheduler import scheduler -from .models import MessageDealerInfo -recv_msg = int() -deal_msg = int() -failed_deal_msg = int() +interval_deal_message = int() +interval_recv_message = int() +interval_failed_message = int() -total_r_m = int() -total_d_m = int() -total_f_m = int() +total_deal_message = int() +total_recv_message = int() +total_failed_message = int() @run_postprocessor async def _(exception: Optional[Exception]): - global recv_msg, deal_msg, failed_deal_msg, total_r_m, total_d_m, total_f_m + global interval_deal_message, interval_recv_message, interval_failed_message + global total_deal_message, total_recv_message, total_failed_message if exception: - failed_deal_msg += 1 - total_f_m += 1 + interval_failed_message += 1 + total_failed_message += 1 else: - deal_msg += 1 - total_d_m += 1 + interval_deal_message += 1 + total_deal_message += 1 - recv_msg += 1 - total_r_m += 1 + interval_recv_message += 1 + total_recv_message += 1 -def get_message_info() -> dict: - return MessageDealerInfo( - recv_msg=str(recv_msg), - deal_msg=str(deal_msg), - failed_deal_msg=str(failed_deal_msg), - total_r_m=str(total_r_m), - total_d_m=str(total_d_m), - total_f_m=str(total_f_m), - ).dict() +@scheduler.scheduled_job("interval", name="消息统计数据重置", seconds=15, misfire_grace_time=30) +async def _(): + global interval_deal_message, interval_recv_message, interval_failed_message + interval_deal_message = 0 + interval_recv_message = 0 + interval_failed_message = 0 -@scheduler.scheduled_job("interval", name="信息数据重置", seconds=15, misfire_grace_time=1) # type: ignore -async def _(): - global recv_msg, deal_msg, failed_deal_msg - recv_msg = int() - deal_msg = int() - failed_deal_msg = int() + +class MessageInfo(BaseModel): + interval_deal: int + interval_recv: int + interval_failed: int + + total_deal: int + total_recv: int + total_failed: int + + +def get_message_info() -> MessageInfo: + return MessageInfo( + interval_deal=interval_deal_message, + interval_recv=interval_recv_message, + interval_failed=interval_failed_message, + total_deal=total_deal_message, + total_recv=total_deal_message, + total_failed=total_failed_message, + ) |