from aiohttp import FormData from nonebot.typing import T_State from nonebot.permission import SUPERUSER from nonebot.adapters.cqhttp import Bot, MessageEvent from ATRI.service import Service as sv from ATRI.log import LOGGER_DIR, NOW_TIME from ATRI.utils.file import open_file from ATRI.utils.ub_paste import paste from ATRI.exceptions import load_error level_list = ['info', 'warning', 'error', 'debug'] __doc__ = """ 获取控制台信息 权限组:维护者 用法: 获取log 等级 行数 示例: 获取log info -20(最新20行) """ get_console = sv.on_command( cmd="获取log", aliases={'获取LOG', '获取控制台', '获取控制台信息'}, docs=__doc__, permission=SUPERUSER ) @get_console.handle() async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: state['level'] = msg @get_console.got('level', prompt='需要获取的等级是?') async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None: quit_list = ['算了', '罢了', '不了'] if state['level'] in quit_list: await get_console.finish('好吧...') @get_console.got('line', prompt='需要获取的行数是?') async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None: level = state['level'] line = state['line'] repo = str() path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log" logs = await open_file(path, 'readlines') try: content = logs[int(line):] # type: ignore repo = "\n".join(content).replace("ATRI", "ATRI") except IndexError: await get_console.finish(f'行数错误...max: {len(logs)}') # type: ignore data = FormData() data.add_field('poster', 'ATRI running log') data.add_field('syntax', 'text') data.add_field('expiration', 'day') data.add_field('content', repo) msg0 = f"> {event.sender.nickname}\n" msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" await track_error.finish(msg0) __doc__ = """ 追踪错误 权限组:维护者 用法: track 追踪ID """ track_error = sv.on_command( cmd="track", aliases={'追踪'}, docs=__doc__, permission=SUPERUSER ) @track_error.args_parser # type: ignore async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() cancel = ['算了', '罢了'] if msg in cancel: await track_error.finish('好吧...') if not msg: await track_error.reject('欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ') else: state['track'] = msg @track_error.handle() async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: state['track'] = msg @track_error.got('track', prompt='欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ') async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None: track_id = state['track'] data = dict() try: data = load_error(track_id) except BaseException: await track_error.finish('未发现对应ID呢...(⇀‸↼‶)') msg = ( f"ID: [{track_id}]\n" f"Time: [{data['time']}]\n" f"Prompt: [{data['prompt']}]\n" "——————\n" f"{data['content']}" ) data = FormData() data.add_field('poster', track_id) data.add_field('syntax', 'text') data.add_field('expiration', 'day') data.add_field('content', msg) msg0 = f"> {event.sender.nickname}\n" msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" await track_error.finish(msg0)