diff options
Diffstat (limited to 'ATRI/plugins')
25 files changed, 590 insertions, 806 deletions
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py index a62ed5e..1c7e5a3 100644 --- a/ATRI/plugins/anime_search.py +++ b/ATRI/plugins/anime_search.py @@ -24,45 +24,40 @@ __doc__ = """ 以图搜番 (pic) """ -anime_search = sv.on_command( - cmd="以图搜番", - docs=__doc__, - rule=is_in_service('以图搜番') -) +anime_search = sv.on_command(cmd="以图搜番", docs=__doc__, rule=is_in_service("以图搜番")) + @anime_search.args_parser # type: ignore async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message) - quit_list = ['算了', '罢了', '不搜了', '取消'] + quit_list = ["算了", "罢了", "不搜了", "取消"] if msg in quit_list: - await anime_search.finish('好吧...') + await anime_search.finish("好吧...") if not msg: - await anime_search.reject('图呢?') + await anime_search.reject("图呢?") else: - state['pic_anime'] = msg + state["pic_anime"] = msg + @anime_search.handle() -async def _anime_search(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _anime_search(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['pic_anime'] = msg + state["pic_anime"] = msg + -@anime_search.got('pic_anime', prompt='图呢?') -async def _deal_search(bot: Bot, - event: MessageEvent, - state: T_State) -> None: - msg = state['pic_anime'] +@anime_search.got("pic_anime", prompt="图呢?") +async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = state["pic_anime"] img = re.findall(r"url=(.*?)]", msg) if not img: await anime_search.reject("请发送图片而不是其它东西!!") - + try: req = await get_bytes(URL + img[0]) except RequestTimeOut: raise RequestTimeOut("Request failed!") - + data = json.loads(req)["docs"] try: d = dict() @@ -72,28 +67,24 @@ async def _deal_search(bot: Bot, else: m = data[i]["at"] / 60 s = data[i]["at"] % 60 - + if not data[i]["episode"]: n = 1 else: n = data[i]["episode"] - + d[to_simple_string(data[i]["title_chinese"])] = [ data[i]["similarity"], f"第{n}集", - f"{int(m)}分{int(s)}秒处" + f"{int(m)}分{int(s)}秒处", ] except Exception as err: raise Exception(f"Invalid data.\n{err}") - - result = sorted( - d.items(), - key=lambda x:x[1], - reverse=True - ) - + + result = sorted(d.items(), key=lambda x: x[1], reverse=True) + t = 0 - + msg0 = f"> {event.sender.nickname}" for i in result: t += 1 @@ -104,16 +95,16 @@ async def _deal_search(bot: Bot, f"Name: {i[0]}\n" f"Time: {i[1][1]} {i[1][2]}" ) - + if len(result) == 2: await anime_search.finish(Message(msg0)) else: data = FormData() - data.add_field('poster', 'ATRI running log') - data.add_field('syntax', 'text') - data.add_field('expiration', 'day') - data.add_field('content', msg0) + data.add_field("poster", "ATRI running log") + data.add_field("syntax", "text") + data.add_field("expiration", "day") + data.add_field("content", msg0) repo = f"> {event.sender.nickname}\n" repo = repo + f"详细请移步此处~\n{await paste(data)}" - await anime_search.finish(repo)
\ No newline at end of file + await anime_search.finish(repo) diff --git a/ATRI/plugins/call_owner.py b/ATRI/plugins/call_owner.py index cd79d1c..d282824 100644 --- a/ATRI/plugins/call_owner.py +++ b/ATRI/plugins/call_owner.py @@ -1,9 +1,6 @@ from nonebot.permission import SUPERUSER from nonebot.typing import T_State -from nonebot.adapters.cqhttp import ( - Bot, - MessageEvent -) +from nonebot.adapters.cqhttp import Bot, MessageEvent from ATRI.service import Service as sv from ATRI.config import Config @@ -21,48 +18,44 @@ __doc__ = """ repo = sv.on_command(cmd="来杯红茶", docs=__doc__) + @repo.args_parser # type: ignore async def _repo_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message) if msg == "算了": - await repo.finish('好吧') - + await repo.finish("好吧") + if not msg: - await repo.reject('话呢?') + await repo.reject("话呢?") else: - state['msg_repo'] = msg + state["msg_repo"] = msg + @repo.handle() async def _repo(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['msg_repo'] = msg + state["msg_repo"] = msg + [email protected]('msg_repo', prompt="请告诉咱需要反馈的内容~!") [email protected]("msg_repo", prompt="请告诉咱需要反馈的内容~!") async def _repo_deal(bot: Bot, event: MessageEvent, state: T_State) -> None: global repo_list - msg = state['msg_repo'] + msg = state["msg_repo"] user = event.user_id - + if count_list(repo_list, user) == 5: await repo.finish("吾辈已经喝了五杯红茶啦!明天再来吧。") - + repo_list.append(user) for sup in Config.BotSelfConfig.superusers: - await bot.send_private_msg( - user_id=sup, - message=f"来自用户[{user}]反馈:\n{msg}" - ) - + await bot.send_private_msg(user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}") + await repo.finish("吾辈的心愿已由咱转告给咱的维护者了~!") [email protected]_job( - "cron", - hour=0, - misfire_grace_time=60 -) [email protected]_job("cron", hour=0, misfire_grace_time=60) async def _() -> None: global repo_list repo_list.clear() @@ -75,11 +68,8 @@ __doc__ = """ /重置红茶 """ -reset_repo = sv.on_command( - cmd="重置红茶", - docs=__doc__, - permission=SUPERUSER -) +reset_repo = sv.on_command(cmd="重置红茶", docs=__doc__, permission=SUPERUSER) + @reset_repo.handle() async def _reset_repo(bot: Bot, event: MessageEvent) -> None: diff --git a/ATRI/plugins/code_runner.py b/ATRI/plugins/code_runner.py index b8f1106..d34872f 100644 --- a/ATRI/plugins/code_runner.py +++ b/ATRI/plugins/code_runner.py @@ -10,33 +10,33 @@ from ATRI.utils.request import post_bytes from ATRI.exceptions import RequestTimeOut -RUN_API_URL_FORMAT = 'https://glot.io/run/{}?version=latest' +RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest" SUPPORTED_LANGUAGES = { - 'assembly': {'ext': 'asm'}, - 'bash': {'ext': 'sh'}, - 'c': {'ext': 'c'}, - 'clojure': {'ext': 'clj'}, - 'coffeescript': {'ext': 'coffe'}, - 'cpp': {'ext': 'cpp'}, - 'csharp': {'ext': 'cs'}, - 'erlang': {'ext': 'erl'}, - 'fsharp': {'ext': 'fs'}, - 'go': {'ext': 'go'}, - 'groovy': {'ext': 'groovy'}, - 'haskell': {'ext': 'hs'}, - 'java': {'ext': 'java', 'name': 'Main'}, - 'javascript': {'ext': 'js'}, - 'julia': {'ext': 'jl'}, - 'kotlin': {'ext': 'kt'}, - 'lua': {'ext': 'lua'}, - 'perl': {'ext': 'pl'}, - 'php': {'ext': 'php'}, - 'python': {'ext': 'py'}, - 'ruby': {'ext': 'rb'}, - 'rust': {'ext': 'rs'}, - 'scala': {'ext': 'scala'}, - 'swift': {'ext': 'swift'}, - 'typescript': {'ext': 'ts'}, + "assembly": {"ext": "asm"}, + "bash": {"ext": "sh"}, + "c": {"ext": "c"}, + "clojure": {"ext": "clj"}, + "coffeescript": {"ext": "coffe"}, + "cpp": {"ext": "cpp"}, + "csharp": {"ext": "cs"}, + "erlang": {"ext": "erl"}, + "fsharp": {"ext": "fs"}, + "go": {"ext": "go"}, + "groovy": {"ext": "groovy"}, + "haskell": {"ext": "hs"}, + "java": {"ext": "java", "name": "Main"}, + "javascript": {"ext": "js"}, + "julia": {"ext": "jl"}, + "kotlin": {"ext": "kt"}, + "lua": {"ext": "lua"}, + "perl": {"ext": "pl"}, + "php": {"ext": "php"}, + "python": {"ext": "py"}, + "ruby": {"ext": "rb"}, + "rust": {"ext": "rs"}, + "scala": {"ext": "scala"}, + "swift": {"ext": "swift"}, + "typescript": {"ext": "ts"}, } @@ -50,54 +50,55 @@ __doc__ = """ print('Hello world!') """ -code_runner = sv.on_command( - cmd="/code", - docs=__doc__, - rule=is_in_service('code') -) +code_runner = sv.on_command(cmd="/code", docs=__doc__, rule=is_in_service("code")) + @code_runner.handle() async def _code_runner(bot: Bot, event: MessageEvent) -> None: msg = str(event.message).split("\n") - + if msg[0] == "list": msg0 = "咱现在支持的语言如下:\n" msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys())) - + await code_runner.finish(msg0) elif not msg[0]: await code_runner.finish("请键入/help以获取更多支持...") - + laug = msg[0].replace("\r", "") if laug not in SUPPORTED_LANGUAGES: await code_runner.finish("该语言暂不支持...") - + del msg[0] code = "\n".join(map(str, msg)) try: req = await post_bytes( RUN_API_URL_FORMAT.format(laug), json={ - "files": [{ - "name": (SUPPORTED_LANGUAGES[laug].get("name", "main") + - f".{SUPPORTED_LANGUAGES[laug]['ext']}"), - "content": code - }], + "files": [ + { + "name": ( + SUPPORTED_LANGUAGES[laug].get("name", "main") + + f".{SUPPORTED_LANGUAGES[laug]['ext']}" + ), + "content": code, + } + ], "stdin": "", - "command": "" - } + "command": "", + }, ) except RequestTimeOut: raise RequestTimeOut("Failed to request!") - + payload = json.loads(req) sent = False - for k in ['stdout', 'stderr', 'error']: + for k in ["stdout", "stderr", "error"]: v = payload.get(k) lines = v.splitlines() lines, remained_lines = lines[:10], lines[10:] - out = '\n'.join(lines) - out, remained_out = out[:60 * 10], out[60 * 10:] + out = "\n".join(lines) + out, remained_out = out[: 60 * 10], out[60 * 10 :] if remained_lines or remained_out: out += f"\n(太多了太多了...)" @@ -105,6 +106,6 @@ async def _code_runner(bot: Bot, event: MessageEvent) -> None: if out: await bot.send(event, f"{k}:\n\n{out}") sent = True - + if not sent: await code_runner.finish("Running success! Nothing print.") diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py index 5085dbf..acfaa9b 100644 --- a/ATRI/plugins/curse/__init__.py +++ b/ATRI/plugins/curse/__init__.py @@ -19,23 +19,17 @@ __doc__ = """ """ curse = sv.on_command( - cmd='口臭', - docs=__doc__, - aliases={'口臭一下,骂我'}, - rule=is_in_service('口臭') + cmd="口臭", docs=__doc__, aliases={"口臭一下,骂我"}, rule=is_in_service("口臭") ) + @curse.handle() async def _curse(bot: Bot, event: MessageEvent) -> None: global sick_list user = event.get_user_id() if count_list(sick_list, user) == 3: sick_list.append(user) - repo = ( - "不是??你这么想被咱骂的嘛??" - "被咱骂就这么舒服的吗?!" - "该......你该不会是.....M吧!" - ) + repo = "不是??你这么想被咱骂的嘛??" "被咱骂就这么舒服的吗?!" "该......你该不会是.....M吧!" await curse.finish(repo) elif count_list(sick_list, user) == 6: sick_list = del_list_aim(sick_list, user) diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 3ebaa36..7f14882 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -24,7 +24,7 @@ from nonebot.adapters.cqhttp import ( LuckyKingNotifyEvent, GroupUploadNoticeEvent, GroupRecallNoticeEvent, - FriendRecallNoticeEvent + FriendRecallNoticeEvent, ) import ATRI @@ -35,8 +35,8 @@ from ATRI.service import Service as sv from ATRI.utils.cqcode import coolq_code_check -PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services' -ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' +PLUGIN_INFO_DIR = Path(".") / "ATRI" / "data" / "service" / "services" +ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" os.makedirs(PLUGIN_INFO_DIR, exist_ok=True) os.makedirs(ESSENTIAL_DIR, exist_ok=True) @@ -57,11 +57,7 @@ async def shutdown() -> None: shutil.rmtree(PLUGIN_INFO_DIR) logger.debug("成功!") except Exception: - repo = ( - '清理插件信息失败', - '请前往 ATRI/data/service/services 下', - '将 services 整个文件夹删除' - ) + repo = ("清理插件信息失败", "请前往 ATRI/data/service/services 下", "将 services 整个文件夹删除") time.sleep(10) raise Exception(repo) @@ -69,59 +65,49 @@ async def shutdown() -> None: @driver.on_bot_connect async def connect(bot) -> None: for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( - int(superuser), - "WebSocket 成功连接,数据开始传输。" - ) + await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 成功连接,数据开始传输。") @driver.on_bot_disconnect async def disconnect(bot) -> None: for superuser in Config.BotSelfConfig.superusers: try: - await sv.NetworkPost.send_private_msg( - int(superuser), - "WebSocket 貌似断开了呢..." - ) + await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 貌似断开了呢...") except: logger.error("WebSocket 已断开,等待重连") @run_preprocessor # type: ignore -async def _check_block(matcher: Matcher, - bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _check_block( + matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State +) -> None: user = str(event.user_id) try: msg = str(event.message) except: msg = "" if not sv.BlockSystem.auth_user(user): - raise IgnoredException(f'Block user: {user}') - + raise IgnoredException(f"Block user: {user}") + if not sv.Dormant.is_dormant(): if "/dormant" not in msg: - raise IgnoredException('Bot has been dormant.') - + raise IgnoredException("Bot has been dormant.") + if isinstance(event, GroupMessageEvent): group = str(event.group_id) if not sv.BlockSystem.auth_group(group): - raise IgnoredException(f'Block group: {group}') + raise IgnoredException(f"Block group: {group}") @run_preprocessor # type: ignore -async def _store_message(matcher: Matcher, - bot: Bot, - event, - state: T_State) -> None: +async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> None: if isinstance(event, GroupMessageEvent): if event.sub_type == "normal": - now_time = datetime.now().strftime('%Y-%m-%d') - GROUP_DIR = ESSENTIAL_DIR / 'chat_history' / f'{event.group_id}' + now_time = datetime.now().strftime("%Y-%m-%d") + GROUP_DIR = ESSENTIAL_DIR / "chat_history" / f"{event.group_id}" os.makedirs(GROUP_DIR, exist_ok=True) path = GROUP_DIR / f"{now_time}.chat.json" - now_time = datetime.now().strftime('%Y%m%d-%H%M%S') + now_time = datetime.now().strftime("%Y%m%d-%H%M%S") try: data = json.loads(path.read_bytes()) @@ -147,12 +133,12 @@ async def _store_message(matcher: Matcher, "area": event.sender.area, "level": event.sender.level, "role": event.sender.role, - "title": event.sender.title + "title": event.sender.title, }, - "to_me": str(event.to_me) + "to_me": str(event.to_me), } try: - with open(path, 'w', encoding='utf-8') as r: + with open(path, "w", encoding="utf-8") as r: r.write(json.dumps(data, indent=4)) logger.debug(f"写入消息成功,id: {event.message_id}") except WriteError: @@ -168,30 +154,24 @@ async def _store_message(matcher: Matcher, # 处理:好友请求 request_friend_event = sv.on_request() + @request_friend_event.handle() async def _request_friend_event(bot, event: FriendRequestEvent) -> None: file_name = "request_friend.json" path = ESSENTIAL_DIR / file_name path.parent.mkdir(exist_ok=True, parents=True) - + try: data = json.loads(path.read_bytes()) except: data = dict() - data[event.flag] = { - "user_id": event.user_id, - "comment": event.comment - } + data[event.flag] = {"user_id": event.user_id, "comment": event.comment} try: - with open(path, 'w', encoding='utf-8') as r: - r.write( - json.dumps( - data, indent=4 - ) - ) + with open(path, "w", encoding="utf-8") as r: + r.write(json.dumps(data, indent=4)) except WriteError: raise WriteError("Writing file failed!") - + for superuser in Config.BotSelfConfig.superusers: msg = ( "主人,收到一条好友请求:\n" @@ -199,21 +179,19 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None: f"申请信息:{event.comment}\n" f"申请码:{event.flag}" ) - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) + await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) # 处理:邀请入群,如身为管理,还附有入群请求 request_group_event = sv.on_request() + @request_group_event.handle() async def _request_group_event(bot, event: GroupRequestEvent) -> None: file_name = "request_group.json" path = ESSENTIAL_DIR / file_name path.parent.mkdir(exist_ok=True, parents=True) - + try: data = json.loads(path.read_bytes()) except: @@ -222,18 +200,14 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None: "user_id": event.user_id, "group_id": event.group_id, "sub_type": event.sub_type, - "comment": event.comment + "comment": event.comment, } try: - with open(path, 'w', encoding='utf-8') as r: - r.write( - json.dumps( - data, indent=4 - ) - ) + with open(path, "w", encoding="utf-8") as r: + r.write(json.dumps(data, indent=4)) except WriteError: raise WriteError("Writing file failed!") - + for superuser in Config.BotSelfConfig.superusers: msg = ( "主人,收到一条入群请求:\n" @@ -241,37 +215,27 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None: f"申请信息:{event.comment}\n" f"申请码:{event.flag}" ) - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) - + await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + # 处理群成员变动 group_member_event = sv.on_notice() + @group_member_event.handle() async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None: - msg = ( - "好欸!事新人!\n" - f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!" - ) + msg = "好欸!事新人!\n" f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!" await group_member_event.finish(msg) + @group_member_event.handle() async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: if event.is_tome(): if event.user_id != event.self_id: return - msg = ( - "呜呜呜,主人" - f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." - ) + msg = "呜呜呜,主人" f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) + await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) else: await group_member_event.finish("阿!有人离开了我们...") @@ -279,26 +243,27 @@ async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: # 处理群管理事件 group_admin_event = sv.on_notice() + @group_admin_event.handle() async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: if not event.is_tome(): return - + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" + user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" ) # 处理群禁言事件 group_ban_event = sv.on_notice() + @group_ban_event.handle() async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: if not event.is_tome(): return - + if event.duration: msg = ( "那个..。,主人\n" @@ -306,71 +271,54 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: f"时长...是 {event.duration} 秒" ) for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) + await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) else: - msg = ( - "好欸!主人\n" - f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!" - ) + msg = "好欸!主人\n" f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!" for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) + await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) # 处理群红包运气王事件 lucky_read_bag_event = sv.on_notice() + @lucky_read_bag_event.handle() async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: - msg = ( - "8行,这可忍?" - f"gkd [CQ:at,qq={event.user_id}] 发一个!" - ) + msg = "8行,这可忍?" f"gkd [CQ:at,qq={event.user_id}] 发一个!" await lucky_read_bag_event.finish(Message(msg)) # 处理群文件上传事件 group_file_upload_event = sv.on_notice() + @group_file_upload_event.handle() -async def _group_file_upload_event(bot, - event: GroupUploadNoticeEvent) -> None: +async def _group_file_upload_event(bot, event: GroupUploadNoticeEvent) -> None: await group_file_upload_event.finish("让我康康传了啥好东西") # 处理撤回事件 recall_event = sv.on_notice() + @recall_event.handle() async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None: try: repo = await bot.get_msg(message_id=event.message_id) except: return - + group = event.group_id repo = str(repo["message"]) check = await coolq_code_check(repo, group=group) if not check: repo = repo.replace("CQ", "QC") - msg = ( - "主人,咱拿到了一条撤回信息!\n" - f"{event.user_id}@[群:{event.group_id}]\n" - "撤回了\n" - f"{repo}" - ) + msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[群:{event.group_id}]\n" "撤回了\n" f"{repo}" for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) + await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + @recall_event.handle() async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: @@ -378,23 +326,15 @@ async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: repo = await bot.get_msg(message_id=event.message_id) except: return - + user = event.user_id repo = str(repo["message"]) check = await coolq_code_check(repo, user) if not check: repo = repo.replace("CQ", "QC") - msg = ( - "主人,咱拿到了一条撤回信息!\n" - f"{event.user_id}@[私聊]" - "撤回了\n" - f"{repo}" - ) + msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[私聊]" "撤回了\n" f"{repo}" await bot.send(event, "咱看到惹~!") for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) + await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py index 03b844a..72e5abb 100644 --- a/ATRI/plugins/funny.py +++ b/ATRI/plugins/funny.py @@ -22,28 +22,26 @@ __doc__ = """ 来句笑话 """ -get_laugh = sv.on_command( - cmd='来句笑话', - docs=__doc__, - rule=is_in_service('来句笑话') -) +get_laugh = sv.on_command(cmd="来句笑话", docs=__doc__, rule=is_in_service("来句笑话")) + @get_laugh.handle() async def _get_laugh(bot: Bot, event: MessageEvent) -> None: user_name = event.sender.nickname laugh_list = [] - - FILE = Path('.') / 'ATRI' / 'data' / 'database' / 'funny' / 'laugh.txt' - with open(FILE, 'r', encoding='utf-8') as r: + + FILE = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt" + with open(FILE, "r", encoding="utf-8") as r: for line in r: - laugh_list.append(line.strip('\n')) - + laugh_list.append(line.strip("\n")) + result = choice(laugh_list) await get_laugh.finish(result.replace("%name", user_name)) me_to_you = sv.on_message() + @me_to_you.handle() async def _me_to_you(bot: Bot, event: MessageEvent) -> None: if randint(0, 15) == 5: @@ -59,38 +57,28 @@ __doc__ = """ 抽老婆 """ -roll_wife = sv.on_command( - cmd='抽老婆', - docs=__doc__, - rule=is_in_service('抽老婆') -) +roll_wife = sv.on_command(cmd="抽老婆", docs=__doc__, rule=is_in_service("抽老婆")) + @roll_wife.handle() async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None: user = event.user_id group = event.group_id - - user_name = await bot.get_group_member_info(group_id=group, - user_id=user) - user_name = user_name['nickname'] + + user_name = await bot.get_group_member_info(group_id=group, user_id=user) + user_name = user_name["nickname"] run = await is_too_exciting(user, group, 5, True) if not run: return - + luck_list = await bot.get_group_member_list(group_id=group) luck_user = choice(luck_list) - luck_qq = luck_user['user_id'] - luck_user = luck_user['nickname'] - msg = ( - "5秒后咱将随机抽取一位群友成为\n" - f"{user_name} 的老婆!究竟是谁呢~?" - ) + luck_qq = luck_user["user_id"] + luck_user = luck_user["nickname"] + msg = "5秒后咱将随机抽取一位群友成为\n" f"{user_name} 的老婆!究竟是谁呢~?" await bot.send(event, msg) await asyncio.sleep(5) - msg = ( - f"> {luck_user}({luck_qq})\n" - f"恭喜成为 {user_name} 的老婆~⭐" - ) + msg = f"> {luck_user}({luck_qq})\n" f"恭喜成为 {user_name} 的老婆~⭐" await bot.send(event, msg) @@ -107,43 +95,34 @@ __doc__ = """ /fakemsg 123456789*生草人*草 114514*仙贝*臭死了 """ -fake_msg = sv.on_command( - cmd="/fakemsg", - docs=__doc__, - rule=is_in_service('fakemsg') -) +fake_msg = sv.on_command(cmd="/fakemsg", docs=__doc__, rule=is_in_service("fakemsg")) + @fake_msg.handle() async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None: - msg = str(event.message).split(' ') + msg = str(event.message).split(" ") user = event.user_id group = event.group_id node = [] check = await is_too_exciting(user, group, 1, True) - + if check: for i in msg: - args = i.split('*') + args = i.split("*") qq = args[0] - name = args[1].replace('[', '[') - name = name.replace(']', ']') - repo = args[2].replace('[', '[') - repo = repo.replace(']', ']') - dic = { - "type": "node", - "data": { - "name": name, - "uin": qq, - "content": repo - } - } + name = args[1].replace("[", "[") + name = name.replace("]", "]") + repo = args[2].replace("[", "[") + repo = repo.replace("]", "]") + dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}} node.append(dic) await bot.send_group_forward_msg(group_id=group, messages=node) EAT_URL = "https://wtf.hiigara.net/api/run/{}" -eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service('今天吃什么')) +eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service("今天吃什么")) + @eat_wat.handle() async def _eat(bot: Bot, event: MessageEvent) -> None: @@ -152,7 +131,7 @@ async def _eat(bot: Bot, event: MessageEvent) -> None: user_n = event.sender.nickname arg = re.findall(r"[今|明|后|大后]天(.*?)吃什么", msg)[0] nd = re.match(r"[今|明|后|大后][天]", msg)[0] - + if arg == "中午": a = f"LdS4K6/{randint(0, 999999)}" url = EAT_URL.format(a) @@ -160,12 +139,12 @@ async def _eat(bot: Bot, event: MessageEvent) -> None: try: data = json.loads(await post_bytes(url, params)) except RequestTimeOut: - raise RequestTimeOut('Request failed!') - - text = to_simple_string(data['text']).replace('今天', nd) + raise RequestTimeOut("Request failed!") + + text = to_simple_string(data["text"]).replace("今天", nd) get_a = re.search(r"非常(.*?)的", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, '') - + result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, "") + elif arg == "晚上": a = f"KaTMS/{randint(0, 999999)}" url = EAT_URL.format(a) @@ -173,11 +152,11 @@ async def _eat(bot: Bot, event: MessageEvent) -> None: try: data = json.loads(await post_bytes(url, params)) except RequestTimeOut: - raise RequestTimeOut('Request failed!') - - text = to_simple_string(data['text']).replace('今天', '') + raise RequestTimeOut("Request failed!") + + text = to_simple_string(data["text"]).replace("今天", "") result = f"> {MessageSegment.at(user)}\n" + text - + else: rd = randint(1, 10) if rd == 5: @@ -189,10 +168,12 @@ async def _eat(bot: Bot, event: MessageEvent) -> None: try: data = json.loads(await post_bytes(url, params)) except RequestTimeOut: - raise RequestTimeOut('Request failed!') - - text = to_simple_string(data['text']).replace('今天', nd) + raise RequestTimeOut("Request failed!") + + text = to_simple_string(data["text"]).replace("今天", nd) get_a = re.match(r"(.*?)的智商", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, f'{user_n}的智商') - + result = f"> {MessageSegment.at(user)}\n" + text.replace( + get_a, f"{user_n}的智商" + ) + await eat_wat.finish(Message(result)) diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py index c238b69..e7c0d80 100644 --- a/ATRI/plugins/github.py +++ b/ATRI/plugins/github.py @@ -12,6 +12,7 @@ URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}" github_issues = sv.on_message() + @github_issues.handle() async def _github_issues(bot: Bot, event: MessageEvent) -> None: msg = str(event.message) @@ -19,21 +20,19 @@ async def _github_issues(bot: Bot, event: MessageEvent) -> None: need_info = re.findall(patt, msg) if not need_info: return - + for i in need_info: need_info = list(i) owner = need_info[0] repo = need_info[1] issue_number = need_info[2] - url = URL.format(owner=owner, - repo=repo, - issue_number=issue_number) - + url = URL.format(owner=owner, repo=repo, issue_number=issue_number) + try: data = await get_bytes(url) except RequestTimeOut: return - + data = json.loads(data) msg0 = ( f"{repo}: #{issue_number} {data['state']}\n" diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py index 32d3f65..d2754b1 100644 --- a/ATRI/plugins/help.py +++ b/ATRI/plugins/help.py @@ -7,7 +7,7 @@ from ATRI.service import SERVICE_DIR from ATRI.service import Service as sv -SERVICE_DIR = SERVICE_DIR / 'services' +SERVICE_DIR = SERVICE_DIR / "services" __doc__ = """ @@ -19,14 +19,12 @@ __doc__ = """ /help info (cmd) """ -help = sv.on_command( - cmd="/help", - docs=__doc__ -) +help = sv.on_command(cmd="/help", docs=__doc__) + @help.handle() async def _help(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(' ') + msg = str(event.message).split(" ") if msg[0] == "": msg = ( "呀?找不到路了?\n" @@ -41,7 +39,7 @@ async def _help(bot: Bot, event: MessageEvent) -> None: for _, _, i in os.walk(SERVICE_DIR): for a in i: f = SERVICE_DIR / a - files.append(json.loads(f.read_bytes())['command']) + files.append(json.loads(f.read_bytes())["command"]) cmds = " | ".join(map(str, files)) msg = "咱能做很多事!比如:\n" + cmds msg0 = msg + "\n没反应可能是没权限...或者为探测类型...不属于可直接触发命令..." @@ -53,13 +51,9 @@ async def _help(bot: Bot, event: MessageEvent) -> None: try: data = json.loads(path.read_bytes()) except: - await help.finish('未找到相关命令...') + await help.finish("未找到相关命令...") - msg = ( - f"{cmd} INFO:\n" - f"Enabled: {data['enabled']}\n" - f"{data['docs']}" - ) + msg = f"{cmd} INFO:\n" f"Enabled: {data['enabled']}\n" f"{data['docs']}" await help.finish(msg) else: - await help.finish('请检查输入...') + await help.finish("请检查输入...") diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py index 2b8d81f..1078ca3 100644 --- a/ATRI/plugins/hitokoto.py +++ b/ATRI/plugins/hitokoto.py @@ -11,7 +11,7 @@ from ATRI.utils.request import get_bytes URL = [ "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/a.json", "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/b.json", - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json" + "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json", ] sick_list = [] @@ -24,12 +24,10 @@ __doc__ = """ """ hitokoto = sv.on_command( - cmd='一言', - aliases={'抑郁一下', '网抑云'}, - docs=__doc__, - rule=is_in_service('一言') & to_bot() + cmd="一言", aliases={"抑郁一下", "网抑云"}, docs=__doc__, rule=is_in_service("一言") & to_bot() ) + @hitokoto.handle() async def _hitokoto(bot: Bot, event: MessageEvent) -> None: global sick_list @@ -41,10 +39,7 @@ async def _hitokoto(bot: Bot, event: MessageEvent) -> None: await hitokoto.finish("额......需要咱安慰一下嘛~?") elif count_list(sick_list, user) == 6: sick_list = del_list_aim(sick_list, user) - msg = ( - "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" - "我...我会守在你身边的!...嗯..一定" - ) + msg = "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" "我...我会守在你身边的!...嗯..一定" await hitokoto.finish(msg) else: sick_list.append(user) @@ -53,4 +48,4 @@ async def _hitokoto(bot: Bot, event: MessageEvent) -> None: data = json.loads(await get_bytes(url)) except RequestTimeOut: raise RequestTimeOut("Request failed!") - await hitokoto.finish(data[randint(1, len(data) - 1)]['hitokoto']) + await hitokoto.finish(data[randint(1, len(data) - 1)]["hitokoto"]) diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py index d9f1ffe..b316020 100644 --- a/ATRI/plugins/manage/__init__.py +++ b/ATRI/plugins/manage/__init__.py @@ -4,5 +4,4 @@ from pathlib import Path _sub_plugins = set() -_sub_plugins |= nonebot.load_plugins( - str((Path(__file__).parent / 'modules').resolve())) +_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve())) diff --git a/ATRI/plugins/manage/modules/block.py b/ATRI/plugins/manage/modules/block.py index 65a6a86..0c2fcec 100644 --- a/ATRI/plugins/manage/modules/block.py +++ b/ATRI/plugins/manage/modules/block.py @@ -12,28 +12,24 @@ __doc__ = """ 封禁用户 QQ号 """ -block_user = sv.on_command( - cmd="封禁用户", - docs=__doc__, - permission=SUPERUSER -) +block_user = sv.on_command(cmd="封禁用户", docs=__doc__, permission=SUPERUSER) + @block_user.args_parser # type: ignore -async def _block_user_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _block_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() - cancel = ['算了', '罢了'] + cancel = ["算了", "罢了"] if msg in cancel: - await block_user.finish('好吧...') + await block_user.finish("好吧...") if not msg: - await block_user.reject('是谁呢?!GKD!') + await block_user.reject("是谁呢?!GKD!") else: - state['noob'] = msg + state["noob"] = msg + @block_user.handle() async def _block_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob = state['noob'] + noob = state["noob"] sv.BlockSystem.control_list(True, user=noob) msg = f"用户[{noob}]已被封禁(;′⌒`)" await block_user.finish(msg) @@ -46,28 +42,24 @@ __doc__ = """ 解封用户 QQ号 """ -unblock_user = sv.on_command( - cmd="解封用户", - docs=__doc__, - permission=SUPERUSER -) +unblock_user = sv.on_command(cmd="解封用户", docs=__doc__, permission=SUPERUSER) + @unblock_user.args_parser # type: ignore -async def _unblock_user_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _unblock_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() - cancel = ['算了', '罢了'] + cancel = ["算了", "罢了"] if msg in cancel: - await unblock_user.finish('好吧...') + await unblock_user.finish("好吧...") if not msg: - await unblock_user.reject('要原谅谁呢...') + await unblock_user.reject("要原谅谁呢...") else: - state['forgive'] = msg + state["forgive"] = msg + @unblock_user.handle() async def _unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - forgive = state['forgive'] + forgive = state["forgive"] sv.BlockSystem.control_list(False, user=forgive) msg = f"用户[{forgive}]已被解封ヾ(´・ω・`)ノ" await unblock_user.finish(msg) @@ -80,28 +72,24 @@ __doc__ = """ 封禁群 群号 """ -block_group = sv.on_command( - cmd="封禁群", - docs=__doc__, - permission=SUPERUSER -) +block_group = sv.on_command(cmd="封禁群", docs=__doc__, permission=SUPERUSER) + @block_group.args_parser # type: ignore -async def _block_group_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _block_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() - cancel = ['算了', '罢了'] + cancel = ["算了", "罢了"] if msg in cancel: - await block_user.finish('好吧...') + await block_user.finish("好吧...") if not msg: - await block_user.reject('是哪个群?!GKD!') + await block_user.reject("是哪个群?!GKD!") else: - state['noob_g'] = msg + state["noob_g"] = msg + @block_group.handle() async def _block_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob_g = state['noob_g'] + noob_g = state["noob_g"] sv.BlockSystem.control_list(True, group=noob_g) msg = f"群[{noob_g}]已被封禁(;′⌒`)" await block_user.finish(msg) @@ -114,30 +102,24 @@ __doc__ = """ 解封 群号 """ -unblock_group = sv.on_command( - cmd="解封群", - docs=__doc__, - permission=SUPERUSER -) +unblock_group = sv.on_command(cmd="解封群", docs=__doc__, permission=SUPERUSER) + @unblock_group.args_parser # type: ignore -async def _unblock_group_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _unblock_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() - cancel = ['算了', '罢了'] + cancel = ["算了", "罢了"] if msg in cancel: - await block_user.finish('好吧...') + await block_user.finish("好吧...") if not msg: - await block_user.reject('要原谅哪个群呢...') + await block_user.reject("要原谅哪个群呢...") else: - state['forgive_g'] = msg + state["forgive_g"] = msg + @unblock_group.handle() -async def _unblock_group(bot: Bot, - event: MessageEvent, - state: T_State) -> None: - forgive_g = state['forgive_g'] +async def _unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None: + forgive_g = state["forgive_g"] sv.BlockSystem.control_list(False, group=forgive_g) msg = f"群[{forgive_g}]已被解封ヾ(´・ω・`)ノ" await unblock_user.finish(msg) diff --git a/ATRI/plugins/manage/modules/broadcast.py b/ATRI/plugins/manage/modules/broadcast.py index 5086fcf..7f7816d 100644 --- a/ATRI/plugins/manage/modules/broadcast.py +++ b/ATRI/plugins/manage/modules/broadcast.py @@ -15,52 +15,46 @@ __doc__ = """ 广播 内容 """ -broadcast = sv.on_command( - cmd="广播", - docs=__doc__, - permission=SUPERUSER -) +broadcast = sv.on_command(cmd="广播", docs=__doc__, permission=SUPERUSER) + @broadcast.args_parser # type: ignore -async def _broadcast_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _broadcast_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message) - quit_list = ['算了', '罢了', '取消'] + quit_list = ["算了", "罢了", "取消"] if msg in quit_list: - await broadcast.finish('好吧...') + await broadcast.finish("好吧...") if not msg: - await broadcast.reject('想群发啥呢0w0') + await broadcast.reject("想群发啥呢0w0") else: - state['msg'] = msg + state["msg"] = msg + @broadcast.handle() async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['msg'] = msg - [email protected]('msg', prompt='请告诉咱需要群发的内容~!') -async def _deal_broadcast(bot: Bot, - event: MessageEvent, - state: T_State) -> None: - msg = state['msg'] + state["msg"] = msg + + [email protected]("msg", prompt="请告诉咱需要群发的内容~!") +async def _deal_broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = state["msg"] group_list = await bot.get_group_list() succ_list = [] err_list = [] - + for group in group_list: await asyncio.sleep(randint(0, 2)) try: - await bot.send_group_msg(group_id=group["group_id"], - message=msg) + await bot.send_group_msg(group_id=group["group_id"], message=msg) except BaseException: err_list.append(group["group_id"]) - + msg0 = "" for i in err_list: msg0 += f" {i}\n" - + repo_msg = ( f"推送消息:\n{msg}\n" "————————\n" diff --git a/ATRI/plugins/manage/modules/debug.py b/ATRI/plugins/manage/modules/debug.py index 53b75b7..7bd3992 100644 --- a/ATRI/plugins/manage/modules/debug.py +++ b/ATRI/plugins/manage/modules/debug.py @@ -11,7 +11,7 @@ from ATRI.utils.ub_paste import paste from ATRI.exceptions import load_error -level_list = ['info', 'warning', 'error', 'debug'] +level_list = ["info", "warning", "error", "debug"] __doc__ = """ @@ -25,44 +25,47 @@ __doc__ = """ get_console = sv.on_command( cmd="获取log", - aliases={'获取LOG', '获取控制台', '获取控制台信息'}, + aliases={"获取LOG", "获取控制台", "获取控制台信息"}, docs=__doc__, - permission=SUPERUSER + permission=SUPERUSER, ) + @get_console.handle() async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['level'] = msg + state["level"] = msg + -@get_console.got('level', prompt='需要获取的等级是?') +@get_console.got("level", prompt="需要获取的等级是?") async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None: - quit_list = ['算了', '罢了', '不了'] - if state['level'] in quit_list: - await get_console.finish('好吧...') + quit_list = ["算了", "罢了", "不了"] + if state["level"] in quit_list: + await get_console.finish("好吧...") -@get_console.got('line', prompt='需要获取的行数是?') + +@get_console.got("line", prompt="需要获取的行数是?") async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None: - level = state['level'] - line = state['line'] + level = state["level"] + line = state["line"] repo = str() - + path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log" - logs = await open_file(path, 'readlines') - + logs = await open_file(path, "readlines") + try: - content = logs[int(line):] # type: ignore + content = logs[int(line) :] # type: ignore repo = "\n".join(content).replace("[36mATRI[0m", "ATRI") except IndexError: - await get_console.finish(f'行数错误...max: {len(logs)}') # type: ignore - + await get_console.finish(f"行数错误...max: {len(logs)}") # type: ignore + data = FormData() - data.add_field('poster', 'ATRI running log') - data.add_field('syntax', 'text') - data.add_field('expiration', 'day') - data.add_field('content', repo) - + data.add_field("poster", "ATRI running log") + data.add_field("syntax", "text") + data.add_field("expiration", "day") + data.add_field("content", repo) + msg0 = f"> {event.sender.nickname}\n" msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" await track_error.finish(msg0) @@ -76,41 +79,39 @@ __doc__ = """ """ track_error = sv.on_command( - cmd="track", - aliases={'追踪'}, - docs=__doc__, - permission=SUPERUSER + cmd="track", aliases={"追踪"}, docs=__doc__, permission=SUPERUSER ) + @track_error.args_parser # type: ignore -async def _track_error_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() - cancel = ['算了', '罢了'] + cancel = ["算了", "罢了"] if msg in cancel: - await track_error.finish('好吧...') + await track_error.finish("好吧...") if not msg: - await track_error.reject('欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ') + await track_error.reject("欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") else: - state['track'] = msg + state["track"] = msg + @track_error.handle() async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['track'] = msg + state["track"] = msg -@track_error.got('track', prompt='欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ') + +@track_error.got("track", prompt="欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None: - track_id = state['track'] + track_id = state["track"] data = dict() - + try: data = load_error(track_id) except BaseException: - await track_error.finish('未发现对应ID呢...(⇀‸↼‶)') - + await track_error.finish("未发现对应ID呢...(⇀‸↼‶)") + msg = ( f"ID: [{track_id}]\n" f"Time: [{data['time']}]\n" @@ -118,13 +119,13 @@ async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None: "——————\n" f"{data['content']}" ) - + data = FormData() - data.add_field('poster', track_id) - data.add_field('syntax', 'text') - data.add_field('expiration', 'day') - data.add_field('content', msg) - + data.add_field("poster", track_id) + data.add_field("syntax", "text") + data.add_field("expiration", "day") + data.add_field("content", msg) + msg0 = f"> {event.sender.nickname}\n" msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" await track_error.finish(msg0) diff --git a/ATRI/plugins/manage/modules/dormant.py b/ATRI/plugins/manage/modules/dormant.py index d5ae321..cd72d47 100644 --- a/ATRI/plugins/manage/modules/dormant.py +++ b/ATRI/plugins/manage/modules/dormant.py @@ -13,12 +13,10 @@ __doc__ = """ """ dormant_enabled = sv.on_command( - cmd='休眠', - docs=__doc__, - rule=to_bot(), - permission=SUPERUSER + cmd="休眠", docs=__doc__, rule=to_bot(), permission=SUPERUSER ) + @dormant_enabled.handle() async def _dormant_enabled(bot: Bot, event: MessageEvent) -> None: sv.Dormant.control_dormant(True) @@ -34,12 +32,10 @@ __doc__ = """ """ dormant_disabled = sv.on_command( - cmd='苏醒', - docs=__doc__, - rule=to_bot(), - permission=SUPERUSER + cmd="苏醒", docs=__doc__, rule=to_bot(), permission=SUPERUSER ) + @dormant_disabled.handle() async def _dormant_disabled(bot: Bot, event: MessageEvent) -> None: sv.Dormant.control_dormant(False) diff --git a/ATRI/plugins/manage/modules/request.py b/ATRI/plugins/manage/modules/request.py index 843a94e..762e918 100644 --- a/ATRI/plugins/manage/modules/request.py +++ b/ATRI/plugins/manage/modules/request.py @@ -9,7 +9,7 @@ from ATRI.service import Service as sv from ATRI.exceptions import ReadFileError, FormatError -ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' +ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" __doc__ = """ @@ -19,16 +19,13 @@ __doc__ = """ 查看申请列表 """ -req_list = sv.on_command( - cmd="申请列表", - docs=__doc__, - permission=SUPERUSER -) +req_list = sv.on_command(cmd="申请列表", docs=__doc__, permission=SUPERUSER) + @req_list.handle() async def _req_list(bot: Bot, event: MessageEvent) -> None: - path_f = ESSENTIAL_DIR / 'request_friend.json' - path_g = ESSENTIAL_DIR / 'request_group.json' + path_f = ESSENTIAL_DIR / "request_friend.json" + path_g = ESSENTIAL_DIR / "request_group.json" data_f, data_g = dict() try: data_f = json.loads(path_f.read_bytes()) @@ -38,7 +35,7 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None: data_g = json.loads(path_g.read_bytes()) except ReadFileError: msg_g = "[读取文件失败]" - + msg_f = str() for i in data_f.keys(): msg_f += f"{i} | {data_f[i]['user_id']} | {data_f[i]['comment']}\n" @@ -46,75 +43,65 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None: msg_g = str() for i in data_g.keys(): msg_g += f"{i} | {data_g[i]['sub_type']} | {data_g[i]['user_id']} | {data_g[i]['comment']}\n" - - msg = ( - "好友/群申请列表如下:\n" - "· 好友:\n" - f"{msg_f}" - "· 群:\n" - f"{msg_g}" - ) + + msg = "好友/群申请列表如下:\n" "· 好友:\n" f"{msg_f}" "· 群:\n" f"{msg_g}" await req_list.finish(msg) -req_deal = sv.on_regex( - r'[同意|拒绝][好友|群]申请', - permission=SUPERUSER -) +req_deal = sv.on_regex(r"[同意|拒绝][好友|群]申请", permission=SUPERUSER) + @req_deal.handle() async def _req_deal(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(' ') - arg = re.findall(r'[同意|拒绝][好友|群]申请', msg[0]) + msg = str(event.message).split(" ") + arg = re.findall(r"[同意|拒绝][好友|群]申请", msg[0]) app = arg[0] _type = arg[1] - + if not msg[1]: - await req_deal.finish(f'正确用法!速看\n{app}{_type}申请 (reqid)') - + await req_deal.finish(f"正确用法!速看\n{app}{_type}申请 (reqid)") + path = ESSENTIAL_DIR / "request_group.json" data_g = dict() try: data_g = json.loads(path.read_bytes()) except FileExistsError: await req_deal.finish("读取群数据失败,可能并没有请求...") - + reqid = msg[1] if app == "同意": if _type == "好友": try: - await bot.set_friend_add_request(flag=reqid, - approve=True) - await req_deal.finish('完成~!已同意申请') + await bot.set_friend_add_request(flag=reqid, approve=True) + await req_deal.finish("完成~!已同意申请") except FormatError: - await req_deal.finish('请检查输入的值是否正确——!') + await req_deal.finish("请检查输入的值是否正确——!") elif _type == "群": try: - await bot.set_group_add_request(flag=reqid, - sub_type=data_g[reqid]['sub_type'], - approve=True) - await req_deal.finish('完成~!已同意申请') + await bot.set_group_add_request( + flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=True + ) + await req_deal.finish("完成~!已同意申请") except FormatError: - await req_deal.finish('请检查输入的值是否正确——!') + await req_deal.finish("请检查输入的值是否正确——!") else: - await req_deal.finish('请检查输入的值是否正确——!') + await req_deal.finish("请检查输入的值是否正确——!") elif app == "拒绝": if _type == "好友": try: - await bot.set_friend_add_request(flag=reqid, - approve=False) - await req_deal.finish('完成~!已拒绝申请') + await bot.set_friend_add_request(flag=reqid, approve=False) + await req_deal.finish("完成~!已拒绝申请") except FormatError: - await req_deal.finish('请检查输入的值是否正确——!') + await req_deal.finish("请检查输入的值是否正确——!") elif _type == "群": try: - await bot.set_group_add_request(flag=reqid, - sub_type=data_g[reqid]['sub_type'], - approve=False) - await req_deal.finish('完成~!已拒绝申请') + await bot.set_group_add_request( + flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=False + ) + await req_deal.finish("完成~!已拒绝申请") except FormatError: - await req_deal.finish('请检查输入的值是否正确——!') + await req_deal.finish("请检查输入的值是否正确——!") else: - await req_deal.finish('请检查输入的值是否正确——!') + await req_deal.finish("请检查输入的值是否正确——!") else: - await req_deal.finish('请检查输入的值是否正确——!') + await req_deal.finish("请检查输入的值是否正确——!") diff --git a/ATRI/plugins/manage/modules/service.py b/ATRI/plugins/manage/modules/service.py index 6489ad6..a3624df 100644 --- a/ATRI/plugins/manage/modules/service.py +++ b/ATRI/plugins/manage/modules/service.py @@ -5,7 +5,7 @@ from nonebot.adapters.cqhttp import ( Bot, MessageEvent, GroupMessageEvent, - PrivateMessageEvent + PrivateMessageEvent, ) from ATRI.service import Service as sv @@ -21,46 +21,40 @@ __doc__ = """ """ cur_service_ena = sv.on_command( - cmd="启用功能", - docs=__doc__, - permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER + cmd="启用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER ) + @cur_service_ena.args_parser # type: ignore -async def _cur_ena_load(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: +async def _cur_ena_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: msg = str(event.message).strip() - quit_list = ['算了', '罢了', '取消'] + quit_list = ["算了", "罢了", "取消"] if msg in quit_list: - await cur_service_ena.finish('好吧...') + await cur_service_ena.finish("好吧...") if not msg: - await cur_service_ena.reject('请告诉咱目标命令!') + await cur_service_ena.reject("请告诉咱目标命令!") else: - state['service_e'] = msg + state["service_e"] = msg + @cur_service_ena.handle() -async def _cur_ena(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: +async def _cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['service_e'] = msg + state["service_e"] = msg -@cur_service_ena.got('service_e', prompt='请告诉咱目标命令!') -async def _deal_cur_ena(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: - cmd = state['service_e'] + +@cur_service_ena.got("service_e", prompt="请告诉咱目标命令!") +async def _deal_cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: + cmd = state["service_e"] group = str(event.group_id) sv.control_service(cmd, False, True, group=group) - await cur_service_ena.finish(f'成功!本群已启用:{cmd}') + await cur_service_ena.finish(f"成功!本群已启用:{cmd}") + @cur_service_ena.handle() -async def _refuse_cur_ena(bot: Bot, - event: PrivateMessageEvent, - state: T_State) -> None: - await cur_service_ena.finish('只能在群聊中决定哦...') +async def _refuse_cur_ena(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: + await cur_service_ena.finish("只能在群聊中决定哦...") __doc__ = """ @@ -73,46 +67,40 @@ __doc__ = """ """ cur_service_dis = sv.on_command( - cmd="禁用功能", - docs=__doc__, - permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER + cmd="禁用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER ) + @cur_service_dis.args_parser # type: ignore -async def _cur_dis_load(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: +async def _cur_dis_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: msg = str(event.message).strip() - quit_list = ['算了', '罢了', '取消'] + quit_list = ["算了", "罢了", "取消"] if msg in quit_list: - await cur_service_dis.finish('好吧...') + await cur_service_dis.finish("好吧...") if not msg: - await cur_service_dis.reject('请告诉咱目标命令!') + await cur_service_dis.reject("请告诉咱目标命令!") else: - state['service_d'] = msg + state["service_d"] = msg + @cur_service_dis.handle() -async def _cur_dis(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: +async def _cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['service_d'] = msg + state["service_d"] = msg -@cur_service_dis.got('service_d', prompt='请告诉咱目标命令!') -async def _deal_cur_dis(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: - cmd = state['service_d'] + +@cur_service_dis.got("service_d", prompt="请告诉咱目标命令!") +async def _deal_cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: + cmd = state["service_d"] group = str(event.group_id) sv.control_service(cmd, False, False, group=group) - await cur_service_dis.finish(f'成功!本群已禁用:{cmd}') + await cur_service_dis.finish(f"成功!本群已禁用:{cmd}") + @cur_service_dis.handle() -async def _refuse_cur_dis(bot: Bot, - event: PrivateMessageEvent, - state: T_State) -> None: - await cur_service_dis.finish('只能在群聊中决定哦...') +async def _refuse_cur_dis(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: + await cur_service_dis.finish("只能在群聊中决定哦...") __doc__ = """ @@ -124,40 +112,33 @@ __doc__ = """ 全局启用 以图搜图 """ -glo_service_ena = sv.on_command( - cmd="全局启用", - docs=__doc__, - permission=SUPERUSER -) +glo_service_ena = sv.on_command(cmd="全局启用", docs=__doc__, permission=SUPERUSER) + @glo_service_ena.args_parser # type: ignore -async def _glo_ena_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _glo_ena_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() - quit_list = ['算了', '罢了', '取消'] + quit_list = ["算了", "罢了", "取消"] if msg in quit_list: - await glo_service_ena.finish('好吧...') + await glo_service_ena.finish("好吧...") if not msg: - await glo_service_ena.reject('请告诉咱目标命令!') + await glo_service_ena.reject("请告诉咱目标命令!") else: - state['service_e_g'] = msg + state["service_e_g"] = msg + @glo_service_ena.handle() -async def _glo_ena(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['service_e_g'] = msg + state["service_e_g"] = msg + -@glo_service_ena.got('service_e_g', prompt='请告诉咱目标命令!') -async def _deal_glo_ena(bot: Bot, - event: MessageEvent, - state: T_State) -> None: - cmd = state['service_e_g'] +@glo_service_ena.got("service_e_g", prompt="请告诉咱目标命令!") +async def _deal_glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: + cmd = state["service_e_g"] sv.control_service(cmd, True, True) - await glo_service_ena.finish(f'成功!已全局启用:{cmd}') + await glo_service_ena.finish(f"成功!已全局启用:{cmd}") __doc__ = """ @@ -169,37 +150,30 @@ __doc__ = """ 全局禁用 以图搜图 """ -glo_service_dis = sv.on_command( - cmd="全局禁用", - docs=__doc__, - permission=SUPERUSER -) +glo_service_dis = sv.on_command(cmd="全局禁用", docs=__doc__, permission=SUPERUSER) + @glo_service_dis.args_parser # type: ignore -async def _glo_dis_load(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _glo_dis_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() - quit_list = ['算了', '罢了', '取消'] + quit_list = ["算了", "罢了", "取消"] if msg in quit_list: - await glo_service_dis.finish('好吧...') + await glo_service_dis.finish("好吧...") if not msg: - await glo_service_dis.reject('请告诉咱目标命令!') + await glo_service_dis.reject("请告诉咱目标命令!") else: - state['service_d_g'] = msg + state["service_d_g"] = msg + @glo_service_dis.handle() -async def _glo_dis(bot: Bot, - event: MessageEvent, - state: T_State) -> None: +async def _glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['service_d_g'] = msg + state["service_d_g"] = msg + -@glo_service_dis.got('service_d_g', prompt='请告诉咱目标命令!') -async def _deal_glo_dis(bot: Bot, - event: MessageEvent, - state: T_State) -> None: - cmd = state['service_d_g'] +@glo_service_dis.got("service_d_g", prompt="请告诉咱目标命令!") +async def _deal_glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: + cmd = state["service_d_g"] sv.control_service(cmd, True, False) - await glo_service_dis.finish(f'成功!已全局禁用:{cmd}') + await glo_service_dis.finish(f"成功!已全局禁用:{cmd}") diff --git a/ATRI/plugins/manage/modules/shutdown.py b/ATRI/plugins/manage/modules/shutdown.py index 11b2b1b..78f23e8 100644 --- a/ATRI/plugins/manage/modules/shutdown.py +++ b/ATRI/plugins/manage/modules/shutdown.py @@ -12,11 +12,8 @@ __doc__ = """ @ 关机 """ -shutdown = sv.on_command( - cmd="关机", - docs=__doc__, - permission=SUPERUSER -) +shutdown = sv.on_command(cmd="关机", docs=__doc__, permission=SUPERUSER) + @shutdown.handle() async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: @@ -24,9 +21,10 @@ async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: if msg: state["msg"] = msg + @shutdown.got("msg", prompt="[WARNING]此项操作将强行终止bot运行,是否继续(y/n)") async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: - t = ['y', 'Y', '是'] + t = ["y", "Y", "是"] if state["msg"] in t: await bot.send(event, "咱还会醒来的,一定") exit(0) diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py index fe5bf78..076ce03 100644 --- a/ATRI/plugins/nsfw.py +++ b/ATRI/plugins/nsfw.py @@ -13,14 +13,12 @@ from ATRI.utils.request import get_bytes from ATRI.utils.cqcode import coolq_code_check -nsfw_url = ( - f"http://{Config.NsfwCheck.host}:" - f"{Config.NsfwCheck.port}/?url=" -) +nsfw_url = f"http://{Config.NsfwCheck.host}:" f"{Config.NsfwCheck.port}/?url=" nsfw_checking = sv.on_message() + @nsfw_checking.handle() async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None: if Config.NsfwCheck.enabled: @@ -28,23 +26,25 @@ async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None: user = event.user_id group = event.group_id check = await coolq_code_check(msg, user, group) - + if check: if "image" not in msg: return - + url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0] try: data = json.loads(await get_bytes(url)) except: - log.warning('检测涩图失败,请查阅文档以获取帮助') + log.warning("检测涩图失败,请查阅文档以获取帮助") return - if round(data['score'], 4) > Config.NsfwCheck.passing_rate: - score = "{:.2%}".format(round(data['score'], 4)) - log.debug(f'截获涩图,得分:{score}') - await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!') + if round(data["score"], 4) > Config.NsfwCheck.passing_rate: + score = "{:.2%}".format(round(data["score"], 4)) + log.debug(f"截获涩图,得分:{score}") + await bot.send(event, f"好涩哦!涩值:{score}\n不行了咱要发给主人看!") for sup in Config.BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=f"{msg}\n涩值: {score}") + await bot.send_private_msg( + user_id=sup, message=f"{msg}\n涩值: {score}" + ) else: pass @@ -60,60 +60,57 @@ __doc__ = """ /nsfw 然后Bot会向你索取图片 """ -nsfw_reading = sv.on_command( - cmd="/nsfw", - docs=__doc__, - rule=is_in_service('nsfw') -) +nsfw_reading = sv.on_command(cmd="/nsfw", docs=__doc__, rule=is_in_service("nsfw")) + @nsfw_reading.args_parser # type: ignore async def _nsfw(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: msg = str(event.message) - quit_list = ['算了', '罢了', '不搜了'] + quit_list = ["算了", "罢了", "不搜了"] if msg in quit_list: - await nsfw_reading.finish('好吧') - + await nsfw_reading.finish("好吧") + if not msg: - await nsfw_reading.reject('图呢?') + await nsfw_reading.reject("图呢?") else: - state['pic_nsfw'] = msg + state["pic_nsfw"] = msg + @nsfw_reading.handle() -async def _nsfw_r(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: +async def _nsfw_r(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: user = event.user_id group = event.group_id msg = str(event.message).strip() check = await coolq_code_check(msg, user, group) if check and msg: - state['pic_nsfw'] = msg + state["pic_nsfw"] = msg + -@nsfw_reading.got('pic_nsfw', prompt='图呢?') -async def _nsfw_reading(bot: Bot, - event: GroupMessageEvent, - state: T_State) -> None: - msg = state['pic_nsfw'] +@nsfw_reading.got("pic_nsfw", prompt="图呢?") +async def _nsfw_reading(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: + msg = state["pic_nsfw"] pic = re.findall(r"url=(.*?)]", msg) if not pic: - await nsfw_reading.reject('请发送图片而不是其它东西!!') - + await nsfw_reading.reject("请发送图片而不是其它东西!!") + url = nsfw_url + pic[0] try: data = json.loads(await get_bytes(url)) except RequestTimeOut: - raise RequestTimeOut('Time out!') - - score = round(data['score'], 4) - result = "{:.2%}".format(round(data['score'], 4)) + raise RequestTimeOut("Time out!") + + score = round(data["score"], 4) + result = "{:.2%}".format(round(data["score"], 4)) if score > 0.9: level = "hso! 我要发给主人看!" for sup in Config.BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}") + await bot.send_private_msg( + user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}" + ) elif 0.9 > score >= 0.6: level = "嗯,可冲" else: level = "?能不能换张55完全冲不起来" - + repo = f"涩值:{result}\n{level}" await nsfw_reading.finish(repo) diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py index cd4c5bd..14b9534 100644 --- a/ATRI/plugins/rich/__init__.py +++ b/ATRI/plugins/rich/__init__.py @@ -16,22 +16,23 @@ from .data_source import dec temp_list = [] img_url = [ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/fkrich.png", - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg" + "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg", ] bilibili_rich = sv.on_message() + @bilibili_rich.handle() async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: global temp_list try: msg = str(event.raw_message).replace("\\", "") bv = False - + if "qqdocurl" not in msg: if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace('av', '') + av = re.findall(r"(av\d+)", msg)[0].replace("av", "") else: bv = re.findall(r"(BV\w+)", msg) av = str(dec(bv[0])) @@ -43,31 +44,29 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: async with session.get(url=bv_url) as r: bv = re.findall(r"(BV\w+)", str(r.url)) av = dec(bv[0]) - + if not bv: if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace('av', '') + av = re.findall(r"(av\d+)", msg)[0].replace("av", "") else: return - + if count_list(temp_list, av) == 4: await bot.send(event, "你是怕别人看不到么发这么多次?") temp_list = del_list_aim(temp_list, av) return - + temp_list.append(av) - + URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}" - data = json.loads(await get_bytes(URL))['data'] + data = json.loads(await get_bytes(URL))["data"] repo = ( f"{data['bvid']} INFO:\n" f"Title: {data['title']}\n" f"Link: {data['short_link']}\n" "にまねげぴのTencent rich!" ) - await bot.send( - event, - MessageSegment.image(file=choice(img_url))) + await bot.send(event, MessageSegment.image(file=choice(img_url))) await bilibili_rich.finish(repo) except BaseException: return diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py index 32ac219..59474ff 100644 --- a/ATRI/plugins/rich/data_source.py +++ b/ATRI/plugins/rich/data_source.py @@ -1,4 +1,4 @@ -table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF' +table = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF" tr = {} for i in range(58): tr[table[i]] = i @@ -10,13 +10,13 @@ add = 8728348608 def dec(x) -> int: r = 0 for i in range(6): - r += tr[x[s[i]]] * 58**i + r += tr[x[s[i]]] * 58 ** i return (r - add) ^ xor def enc(x) -> str: x = (x ^ xor) + add - r = list('BV1 4 1 7 ') + r = list("BV1 4 1 7 ") for i in range(6): - r[s[i]] = table[x // 58**i % 58] - return ''.join(r) + r[s[i]] = table[x // 58 ** i % 58] + return "".join(r) diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index f158cf9..b4b1497 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -21,55 +21,53 @@ __doc__ = """ 以图搜图 (pic) """ -saucenao = sv.on_command( - cmd='以图搜图', - docs=__doc__, - rule=is_in_service('以图搜图') -) +saucenao = sv.on_command(cmd="以图搜图", docs=__doc__, rule=is_in_service("以图搜图")) + @saucenao.args_parser # type: ignore -async def _load_saucenao(bot: Bot, event: MessageEvent, - state: T_State) -> None: +async def _load_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message) - quit_list = ['算了', '罢了', '不搜了'] + quit_list = ["算了", "罢了", "不搜了"] if msg in quit_list: - await saucenao.finish('好吧...') - + await saucenao.finish("好吧...") + if not msg: - await saucenao.reject('图呢?') + await saucenao.reject("图呢?") else: - state['pic_sau'] = msg + state["pic_sau"] = msg + @saucenao.handle() async def _sauce_nao(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state['pic_sau'] = msg + state["pic_sau"] = msg [email protected]('pic_sau', prompt='图呢?') + [email protected]("pic_sau", prompt="图呢?") async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state['pic_sau'] - img = re.findall(r'url=(.*?)]', msg) + msg = state["pic_sau"] + img = re.findall(r"url=(.*?)]", msg) if not img: - await saucenao.finish('请发送图片而不是其他东西!!') - + await saucenao.finish("请发送图片而不是其他东西!!") + try: task = SauceNao(api_key=Config.SauceNAO.key) data = json.loads(await task.search(img[0])) except RequestTimeOut: - raise RequestTimeOut('Request failed!') - - res = data['results'] + raise RequestTimeOut("Request failed!") + + res = data["results"] result = list() for i in range(0, 3): data = res[i] - + _result = dict() - _result['similarity'] = data['header']['similarity'] - _result['index_name'] = data['header']['index_name'] - _result['url'] = choice(data['data'].get('ext_urls', ['None'])) + _result["similarity"] = data["header"]["similarity"] + _result["index_name"] = data["header"]["index_name"] + _result["url"] = choice(data["data"].get("ext_urls", ["None"])) result.append(_result) - + msg0 = f"> {MessageSegment.at(event.user_id)}" for i in result: msg0 = msg0 + ( @@ -78,5 +76,5 @@ async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: f"Name: {i['index_name']}\n" f"URL: {i['url'].replace('https://', '')}" ) - + await saucenao.finish(Message(msg0)) diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py index cd88554..efe8fe1 100644 --- a/ATRI/plugins/saucenao/data_source.py +++ b/ATRI/plugins/saucenao/data_source.py @@ -5,23 +5,19 @@ URL = "https://saucenao.com/search.php" class SauceNao: - def __init__(self, - api_key: str, - output_type=2, - testmode=1, - dbmaski=32768, - db=5, - numres=5) -> None: + def __init__( + self, api_key: str, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5 + ) -> None: params = dict() - params['api_key'] = api_key - params['output_type'] = output_type - params['testmode'] = testmode - params['dbmaski'] = dbmaski - params['db'] = db - params['numres'] = numres + params["api_key"] = api_key + params["output_type"] = output_type + params["testmode"] = testmode + params["dbmaski"] = dbmaski + params["db"] = db + params["numres"] = numres self.params = params - + async def search(self, url: str): - self.params['url'] = url + self.params["url"] = url res = await post_bytes(url=URL, params=self.params) return res diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py index 5e48fc3..bd5d9c4 100644 --- a/ATRI/plugins/status.py +++ b/ATRI/plugins/status.py @@ -16,10 +16,8 @@ __doc__ = """ /ping """ -ping = sv.on_command( - cmd="/ping", - docs="测试机器人", - rule=is_in_service('ping')) +ping = sv.on_command(cmd="/ping", docs="测试机器人", rule=is_in_service("ping")) + @ping.handle() async def _ping(bot: Bot, event: MessageEvent) -> None: @@ -33,11 +31,8 @@ __doc__ = """ /status """ -status = sv.on_command( - cmd="/status", - docs=__doc__, - rule=is_in_service('status') -) +status = sv.on_command(cmd="/status", docs=__doc__, rule=is_in_service("status")) + @status.handle() async def _status(bot: Bot, event: MessageEvent) -> None: @@ -45,13 +40,13 @@ async def _status(bot: Bot, event: MessageEvent) -> None: cpu = psutil.cpu_percent(interval=1) mem = psutil.virtual_memory().percent disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore + inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore except GetStatusError: raise GetStatusError("Failed to get status.") - + msg = "アトリは、高性能ですから!" - + if cpu > 80: # type: ignore msg = "咱感觉有些头晕..." if mem > 80: @@ -60,7 +55,7 @@ async def _status(bot: Bot, event: MessageEvent) -> None: msg = "咱感觉有点累..." elif disk > 80: msg = "咱感觉身体要被塞满了..." - + msg0 = ( "Self status:\n" f"* CPU: {cpu}%\n" @@ -69,26 +64,22 @@ async def _status(bot: Bot, event: MessageEvent) -> None: f"* netSENT: {inteSENT}MB\n" f"* netRECV: {inteRECV}MB\n" ) + msg - + await status.finish(msg0) [email protected]_job( - 'interval', - minutes=5, - misfire_grace_time=10 -) [email protected]_job("interval", minutes=5, misfire_grace_time=10) async def _(): log.info("开始自检") try: cpu = psutil.cpu_percent(interval=1) mem = psutil.virtual_memory().percent disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore + inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore except GetStatusError: raise GetStatusError("Failed to get status.") - + msg = "" if cpu > 80: # type: ignore msg = "咱感觉有些头晕..." @@ -101,7 +92,7 @@ async def _(): else: log.info("运作正常") return - + msg0 = ( "Self status:\n" f"* CPU: {cpu}%\n" @@ -110,9 +101,6 @@ async def _(): f"* netSENT: {inteSENT}MB\n" f"* netRECV: {inteRECV}MB\n" ) + msg - + for sup in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( - user_id=sup, - message=msg0 - ) + await sv.NetworkPost.send_private_msg(user_id=sup, message=msg0) diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py index 5d1860a..ff28e59 100644 --- a/ATRI/plugins/utils/__init__.py +++ b/ATRI/plugins/utils/__init__.py @@ -17,26 +17,24 @@ roll一下 /roll 1d10+10d9+4d5+2d3 """ -roll = sv.on_command( - cmd="/roll", - docs=__doc__, - rule=is_in_service('roll') -) +roll = sv.on_command(cmd="/roll", docs=__doc__, rule=is_in_service("roll")) + @roll.handle() async def _roll(bot: Bot, event: MessageEvent, state: dict) -> None: args = str(event.message).strip() if args: - state['resu'] = args + state["resu"] = args + @roll.got("resu", prompt="roll 参数不能为空~!\ndemo:1d10 或 2d10+2d10") async def _(bot: Bot, event: MessageEvent, state: dict) -> None: - resu = state['resu'] - match = re.match(r'^([\dd+\s]+?)$', resu) - + resu = state["resu"] + match = re.match(r"^([\dd+\s]+?)$", resu) + if not match: await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10") - + await roll.finish(roll_dice(resu)) @@ -52,22 +50,19 @@ __doc__ = """ /enc e アトリは高性能ですから! """ -encrypt = sv.on_command( - cmd="/enc", - docs=__doc__, - rule=is_in_service('enc') -) +encrypt = sv.on_command(cmd="/enc", docs=__doc__, rule=is_in_service("enc")) + @encrypt.handle() async def _encrypt(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(' ') + msg = str(event.message).split(" ") _type = msg[0] s = msg[1] e = Encrypt() - + if _type == "e": await encrypt.finish(e.encode(s)) elif _type == "d": await encrypt.finish(e.decode(s)) else: - await encrypt.finish('请检查输入~!') + await encrypt.finish("请检查输入~!") diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py index 18c0492..4ef46a3 100644 --- a/ATRI/plugins/utils/data_source.py +++ b/ATRI/plugins/utils/data_source.py @@ -6,41 +6,41 @@ from typing import Union def roll_dice(par: str) -> str: result = 0 - proc = '' + proc = "" proc_list = [] p = par.split("+") - + for i in p: args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i) args = list(args[0]) - + args[0] = args[0] or 1 if int(args[0]) >= 5000 or int(args[2]) >= 5000: return "阿...好大......" - + for a in range(1, int(args[0]) + 1): rd = random.randint(1, int(args[2])) result = result + rd - + if len(proc_list) <= 10: proc_list.append(rd) - + if len(proc_list) <= 10: proc += "+".join(map(str, proc_list)) elif len(proc_list) > 10: proc += "太长了不展示了就酱w" else: proc += str(result) - + result = f"{par}=({proc})={result}" return result -class Encrypt(): - cr = 'ĀāĂ㥹ÀÁÂÃÄÅ' - cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ' - cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr' - cb = 'ĨĩĪīĬĭĮįİı' +class Encrypt: + cr = "ĀāĂ㥹ÀÁÂÃÄÅ" + cc = "ŢţŤťŦŧṪṫṬṭṮṯṰṱ" + cn = "ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr" + cb = "ĨĩĪīĬĭĮįİı" sr = len(cr) sc = len(cc) @@ -55,7 +55,7 @@ class Encrypt(): def _encodeByte(self, i) -> Union[str, None]: if i > 0xFF: - raise ValueError('ERROR! at/ri overflow') + raise ValueError("ERROR! at/ri overflow") if i > 0x7F: i = i & 0x7F @@ -65,7 +65,7 @@ class Encrypt(): def _encodeShort(self, i) -> str: if i > 0xFFFF: - raise ValueError('ERROR! atri overflow') + raise ValueError("ERROR! atri overflow") reverse = False if i > 0x7FFF: @@ -75,17 +75,15 @@ class Encrypt(): char = [ self._div(i, self.scnb), self._div(i % self.scnb, self.snb), - self._div(i % self.snb, self.sb), i % self.sb - ] - char = [ - self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], - self.cb[char[3]] + self._div(i % self.snb, self.sb), + i % self.sb, ] + char = [self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], self.cb[char[3]]] if reverse: return char[2] + char[3] + char[0] + char[1] - return ''.join(char) + return "".join(char) def _decodeByte(self, c) -> int: nb = False @@ -93,12 +91,11 @@ class Encrypt(): if idx[0] < 0 or idx[1] < 0: idx = [self.cn.index(c[0]), self.cb.index(c[1])] nb = True - raise ValueError('ERROR! at/ri overflow') + raise ValueError("ERROR! at/ri overflow") - result = idx[0] * self.sb + idx[1] \ - if nb else idx[0] * self.sc + idx[1] + result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1] if result > 0x7F: - raise ValueError('ERROR! at/ri overflow') + raise ValueError("ERROR! at/ri overflow") return result | 0x80 if nb else 0 @@ -109,23 +106,22 @@ class Encrypt(): self.cr.index(c[0]), self.cc.index(c[1]), self.cn.index(c[2]), - self.cb.index(c[3]) + self.cb.index(c[3]), ] else: idx = [ self.cr.index(c[2]), self.cc.index(c[3]), self.cn.index(c[0]), - self.cb.index(c[1]) + self.cb.index(c[1]), ] if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0: - raise ValueError('ERROR! not atri') + raise ValueError("ERROR! not atri") - result = idx[0] * self.scnb + idx[1] * self.snb + idx[ - 2] * self.sb + idx[3] + result = idx[0] * self.scnb + idx[1] * self.snb + idx[2] * self.sb + idx[3] if result > 0x7FFF: - raise ValueError('ERROR! atri overflow') + raise ValueError("ERROR! atri overflow") result |= 0x8000 if reverse else 0 return result @@ -138,37 +134,36 @@ class Encrypt(): if len(b) & 1 == 1: result.append(self._encodeByte(b[-1])) - return ''.join(result) + return "".join(result) - def encode(self, s: str, encoding: str = 'utf-8'): + def encode(self, s: str, encoding: str = "utf-8"): if not isinstance(s, str): - raise ValueError('Please enter str instead of other') + raise ValueError("Please enter str instead of other") return self._encodeBytes(s.encode(encoding)) def _decodeBytes(self, s: str): if not isinstance(s, str): - raise ValueError('Please enter str instead of other') + raise ValueError("Please enter str instead of other") if len(s) & 1: - raise ValueError('ERROR length') + raise ValueError("ERROR length") result = [] for i in range(0, (len(s) >> 2)): - result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8])) - result.append(bytes([ - self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF])) + result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) >> 8])) + result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) & 0xFF])) if (len(s) & 2) == 2: result.append(bytes([self._decodeByte(s[-2:])])) - return b''.join(result) + return b"".join(result) - def decode(self, s: str, encoding: str = 'utf-8') -> str: + def decode(self, s: str, encoding: str = "utf-8") -> str: if not isinstance(s, str): - raise ValueError('Please enter str instead of other') + raise ValueError("Please enter str instead of other") try: return self._decodeBytes(s).decode(encoding) except UnicodeDecodeError: - raise ValueError('Decoding failed') + raise ValueError("Decoding failed") |