diff options
Diffstat (limited to 'ATRI/plugins')
32 files changed, 1741 insertions, 685 deletions
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py index 1c7e5a3..68a5add 100644 --- a/ATRI/plugins/anime_search.py +++ b/ATRI/plugins/anime_search.py @@ -8,7 +8,7 @@ from nonebot.typing import T_State from ATRI.service import Service as sv from ATRI.rule import is_in_service -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError from ATRI.utils.request import get_bytes from ATRI.utils.translate import to_simple_string from ATRI.utils.ub_paste import paste @@ -24,40 +24,45 @@ __doc__ = """ 以图搜番 (pic) """ -anime_search = sv.on_command(cmd="以图搜番", docs=__doc__, rule=is_in_service("以图搜番")) - +anime_search = sv.on_command( + cmd="以图搜番", + docs=__doc__, + rule=is_in_service('以图搜番') +) @anime_search.args_parser # type: ignore async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message) - quit_list = ["算了", "罢了", "不搜了", "取消"] + quit_list = ['算了', '罢了', '不搜了', '取消'] if msg in quit_list: - await anime_search.finish("好吧...") + await anime_search.finish('好吧...') if not msg: - await anime_search.reject("图呢?") + await anime_search.reject('图呢?') else: - state["pic_anime"] = msg - + state['pic_anime'] = msg @anime_search.handle() -async def _anime_search(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _anime_search(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() if msg: - state["pic_anime"] = msg - + state['pic_anime'] = msg -@anime_search.got("pic_anime", prompt="图呢?") -async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["pic_anime"] +@anime_search.got('pic_anime', prompt='图呢?') +async def _deal_search(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + msg = state['pic_anime'] img = re.findall(r"url=(.*?)]", msg) if not img: await anime_search.reject("请发送图片而不是其它东西!!") - + try: req = await get_bytes(URL + img[0]) - except RequestTimeOut: - raise RequestTimeOut("Request failed!") - + except RequestError: + raise RequestError("Request failed!") + data = json.loads(req)["docs"] try: d = dict() @@ -67,24 +72,28 @@ async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: else: m = data[i]["at"] / 60 s = data[i]["at"] % 60 - + if not data[i]["episode"]: n = 1 else: n = data[i]["episode"] - + d[to_simple_string(data[i]["title_chinese"])] = [ data[i]["similarity"], f"第{n}集", - f"{int(m)}分{int(s)}秒处", + f"{int(m)}分{int(s)}秒处" ] except Exception as err: raise Exception(f"Invalid data.\n{err}") - - result = sorted(d.items(), key=lambda x: x[1], reverse=True) - + + result = sorted( + d.items(), + key=lambda x:x[1], + reverse=True + ) + t = 0 - + msg0 = f"> {event.sender.nickname}" for i in result: t += 1 @@ -95,15 +104,15 @@ async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: f"Name: {i[0]}\n" f"Time: {i[1][1]} {i[1][2]}" ) - + if len(result) == 2: await anime_search.finish(Message(msg0)) else: data = FormData() - data.add_field("poster", "ATRI running log") - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", msg0) + data.add_field('poster', 'ATRI running log') + data.add_field('syntax', 'text') + data.add_field('expiration', 'day') + data.add_field('content', msg0) repo = f"> {event.sender.nickname}\n" repo = repo + f"详细请移步此处~\n{await paste(data)}" diff --git a/ATRI/plugins/call_owner.py b/ATRI/plugins/call_owner.py index d282824..3fb5a01 100644 --- a/ATRI/plugins/call_owner.py +++ b/ATRI/plugins/call_owner.py @@ -1,9 +1,12 @@ from nonebot.permission import SUPERUSER from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp import ( + Bot, + MessageEvent +) from ATRI.service import Service as sv -from ATRI.config import Config +from ATRI.config import BotSelfConfig from ATRI.utils.apscheduler import scheduler from ATRI.utils.list import count_list @@ -18,44 +21,48 @@ __doc__ = """ repo = sv.on_command(cmd="来杯红茶", docs=__doc__) - @repo.args_parser # type: ignore async def _repo_load(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message) if msg == "算了": - await repo.finish("好吧") - + await repo.finish('好吧') + if not msg: - await repo.reject("话呢?") + await repo.reject('话呢?') else: - state["msg_repo"] = msg - + state['msg_repo'] = msg @repo.handle() async def _repo(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state["msg_repo"] = msg - + state['msg_repo'] = msg [email protected]("msg_repo", prompt="请告诉咱需要反馈的内容~!") [email protected]('msg_repo', prompt="请告诉咱需要反馈的内容~!") async def _repo_deal(bot: Bot, event: MessageEvent, state: T_State) -> None: global repo_list - msg = state["msg_repo"] + msg = state['msg_repo'] user = event.user_id - + if count_list(repo_list, user) == 5: await repo.finish("吾辈已经喝了五杯红茶啦!明天再来吧。") - + repo_list.append(user) - for sup in Config.BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}") - + for sup in BotSelfConfig.superusers: + await bot.send_private_msg( + user_id=sup, + message=f"来自用户[{user}]反馈:\n{msg}" + ) + await repo.finish("吾辈的心愿已由咱转告给咱的维护者了~!") [email protected]_job("cron", hour=0, misfire_grace_time=60) [email protected]_job( + "cron", + hour=0, + misfire_grace_time=60 +) async def _() -> None: global repo_list repo_list.clear() @@ -68,8 +75,11 @@ __doc__ = """ /重置红茶 """ -reset_repo = sv.on_command(cmd="重置红茶", docs=__doc__, permission=SUPERUSER) - +reset_repo = sv.on_command( + cmd="重置红茶", + docs=__doc__, + permission=SUPERUSER +) @reset_repo.handle() async def _reset_repo(bot: Bot, event: MessageEvent) -> None: diff --git a/ATRI/plugins/code_runner.py b/ATRI/plugins/code_runner.py index d34872f..3c2b196 100644 --- a/ATRI/plugins/code_runner.py +++ b/ATRI/plugins/code_runner.py @@ -7,36 +7,36 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent from ATRI.rule import is_in_service from ATRI.service import Service as sv from ATRI.utils.request import post_bytes -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError -RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest" +RUN_API_URL_FORMAT = 'https://glot.io/run/{}?version=latest' SUPPORTED_LANGUAGES = { - "assembly": {"ext": "asm"}, - "bash": {"ext": "sh"}, - "c": {"ext": "c"}, - "clojure": {"ext": "clj"}, - "coffeescript": {"ext": "coffe"}, - "cpp": {"ext": "cpp"}, - "csharp": {"ext": "cs"}, - "erlang": {"ext": "erl"}, - "fsharp": {"ext": "fs"}, - "go": {"ext": "go"}, - "groovy": {"ext": "groovy"}, - "haskell": {"ext": "hs"}, - "java": {"ext": "java", "name": "Main"}, - "javascript": {"ext": "js"}, - "julia": {"ext": "jl"}, - "kotlin": {"ext": "kt"}, - "lua": {"ext": "lua"}, - "perl": {"ext": "pl"}, - "php": {"ext": "php"}, - "python": {"ext": "py"}, - "ruby": {"ext": "rb"}, - "rust": {"ext": "rs"}, - "scala": {"ext": "scala"}, - "swift": {"ext": "swift"}, - "typescript": {"ext": "ts"}, + 'assembly': {'ext': 'asm'}, + 'bash': {'ext': 'sh'}, + 'c': {'ext': 'c'}, + 'clojure': {'ext': 'clj'}, + 'coffeescript': {'ext': 'coffe'}, + 'cpp': {'ext': 'cpp'}, + 'csharp': {'ext': 'cs'}, + 'erlang': {'ext': 'erl'}, + 'fsharp': {'ext': 'fs'}, + 'go': {'ext': 'go'}, + 'groovy': {'ext': 'groovy'}, + 'haskell': {'ext': 'hs'}, + 'java': {'ext': 'java', 'name': 'Main'}, + 'javascript': {'ext': 'js'}, + 'julia': {'ext': 'jl'}, + 'kotlin': {'ext': 'kt'}, + 'lua': {'ext': 'lua'}, + 'perl': {'ext': 'pl'}, + 'php': {'ext': 'php'}, + 'python': {'ext': 'py'}, + 'ruby': {'ext': 'rb'}, + 'rust': {'ext': 'rs'}, + 'scala': {'ext': 'scala'}, + 'swift': {'ext': 'swift'}, + 'typescript': {'ext': 'ts'}, } @@ -50,55 +50,54 @@ __doc__ = """ print('Hello world!') """ -code_runner = sv.on_command(cmd="/code", docs=__doc__, rule=is_in_service("code")) - +code_runner = sv.on_command( + cmd="/code", + docs=__doc__, + rule=is_in_service('code') +) @code_runner.handle() async def _code_runner(bot: Bot, event: MessageEvent) -> None: msg = str(event.message).split("\n") - + if msg[0] == "list": msg0 = "咱现在支持的语言如下:\n" msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys())) - + await code_runner.finish(msg0) elif not msg[0]: await code_runner.finish("请键入/help以获取更多支持...") - + laug = msg[0].replace("\r", "") if laug not in SUPPORTED_LANGUAGES: await code_runner.finish("该语言暂不支持...") - + del msg[0] code = "\n".join(map(str, msg)) try: req = await post_bytes( RUN_API_URL_FORMAT.format(laug), json={ - "files": [ - { - "name": ( - SUPPORTED_LANGUAGES[laug].get("name", "main") - + f".{SUPPORTED_LANGUAGES[laug]['ext']}" - ), - "content": code, - } - ], + "files": [{ + "name": (SUPPORTED_LANGUAGES[laug].get("name", "main") + + f".{SUPPORTED_LANGUAGES[laug]['ext']}"), + "content": code + }], "stdin": "", - "command": "", - }, + "command": "" + } ) - except RequestTimeOut: - raise RequestTimeOut("Failed to request!") - + except RequestError: + raise RequestError("Failed to request!") + payload = json.loads(req) sent = False - for k in ["stdout", "stderr", "error"]: + for k in ['stdout', 'stderr', 'error']: v = payload.get(k) lines = v.splitlines() lines, remained_lines = lines[:10], lines[10:] - out = "\n".join(lines) - out, remained_out = out[: 60 * 10], out[60 * 10 :] + out = '\n'.join(lines) + out, remained_out = out[:60 * 10], out[60 * 10:] if remained_lines or remained_out: out += f"\n(太多了太多了...)" @@ -106,6 +105,6 @@ async def _code_runner(bot: Bot, event: MessageEvent) -> None: if out: await bot.send(event, f"{k}:\n\n{out}") sent = True - + if not sent: await code_runner.finish("Running success! Nothing print.") diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py index acfaa9b..a646eaa 100644 --- a/ATRI/plugins/curse/__init__.py +++ b/ATRI/plugins/curse/__init__.py @@ -4,7 +4,7 @@ from ATRI.service import Service as sv from ATRI.rule import is_in_service from ATRI.utils.list import count_list, del_list_aim from ATRI.utils.request import get_text -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError URL = "https://zuanbot.com/api.php?level=min&lang=zh_cn" @@ -19,17 +19,23 @@ __doc__ = """ """ curse = sv.on_command( - cmd="口臭", docs=__doc__, aliases={"口臭一下,骂我"}, rule=is_in_service("口臭") + cmd='口臭', + docs=__doc__, + aliases={'口臭一下,骂我'}, + rule=is_in_service('口臭') ) - @curse.handle() async def _curse(bot: Bot, event: MessageEvent) -> None: global sick_list user = event.get_user_id() if count_list(sick_list, user) == 3: sick_list.append(user) - repo = "不是??你这么想被咱骂的嘛??" "被咱骂就这么舒服的吗?!" "该......你该不会是.....M吧!" + repo = ( + "不是??你这么想被咱骂的嘛??" + "被咱骂就这么舒服的吗?!" + "该......你该不会是.....M吧!" + ) await curse.finish(repo) elif count_list(sick_list, user) == 6: sick_list = del_list_aim(sick_list, user) @@ -38,5 +44,5 @@ async def _curse(bot: Bot, event: MessageEvent) -> None: sick_list.append(user) try: await curse.finish(await get_text(URL)) - except RequestTimeOut: - raise RequestTimeOut("Time out!") + except RequestError: + raise RequestError("Time out!") diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 7f14882..72090e3 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -24,19 +24,19 @@ from nonebot.adapters.cqhttp import ( LuckyKingNotifyEvent, GroupUploadNoticeEvent, GroupRecallNoticeEvent, - FriendRecallNoticeEvent, + FriendRecallNoticeEvent ) import ATRI from ATRI.log import logger from ATRI.exceptions import WriteError -from ATRI.config import Config +from ATRI.config import BotSelfConfig from ATRI.service import Service as sv from ATRI.utils.cqcode import coolq_code_check -PLUGIN_INFO_DIR = Path(".") / "ATRI" / "data" / "service" / "services" -ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" +PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services' +ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' os.makedirs(PLUGIN_INFO_DIR, exist_ok=True) os.makedirs(ESSENTIAL_DIR, exist_ok=True) @@ -57,57 +57,71 @@ async def shutdown() -> None: shutil.rmtree(PLUGIN_INFO_DIR) logger.debug("成功!") except Exception: - repo = ("清理插件信息失败", "请前往 ATRI/data/service/services 下", "将 services 整个文件夹删除") + repo = ( + '清理插件信息失败', + '请前往 ATRI/data/service/services 下', + '将 services 整个文件夹删除' + ) time.sleep(10) raise Exception(repo) @driver.on_bot_connect async def connect(bot) -> None: - for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 成功连接,数据开始传输。") + for superuser in BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + int(superuser), + "WebSocket 成功连接,数据开始传输。" + ) @driver.on_bot_disconnect async def disconnect(bot) -> None: - for superuser in Config.BotSelfConfig.superusers: + for superuser in BotSelfConfig.superusers: try: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 貌似断开了呢...") + await sv.NetworkPost.send_private_msg( + int(superuser), + "WebSocket 貌似断开了呢..." + ) except: logger.error("WebSocket 已断开,等待重连") @run_preprocessor # type: ignore -async def _check_block( - matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State -) -> None: +async def _check_block(matcher: Matcher, + bot: Bot, + event: MessageEvent, + state: T_State) -> None: user = str(event.user_id) try: msg = str(event.message) except: msg = "" if not sv.BlockSystem.auth_user(user): - raise IgnoredException(f"Block user: {user}") - + raise IgnoredException(f'Block user: {user}') + if not sv.Dormant.is_dormant(): if "/dormant" not in msg: - raise IgnoredException("Bot has been dormant.") - + raise IgnoredException('Bot has been dormant.') + if isinstance(event, GroupMessageEvent): group = str(event.group_id) if not sv.BlockSystem.auth_group(group): - raise IgnoredException(f"Block group: {group}") + raise IgnoredException(f'Block group: {group}') @run_preprocessor # type: ignore -async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> None: +async def _store_message(matcher: Matcher, + bot: Bot, + event, + state: T_State) -> None: if isinstance(event, GroupMessageEvent): if event.sub_type == "normal": - now_time = datetime.now().strftime("%Y-%m-%d") - GROUP_DIR = ESSENTIAL_DIR / "chat_history" / f"{event.group_id}" + now_time = datetime.now().strftime('%Y-%m-%d') + GROUP_DIR = ESSENTIAL_DIR / 'chat_history' / f'{event.group_id}' os.makedirs(GROUP_DIR, exist_ok=True) path = GROUP_DIR / f"{now_time}.chat.json" - now_time = datetime.now().strftime("%Y%m%d-%H%M%S") + now_time = datetime.now().strftime('%Y%m%d-%H%M%S') try: data = json.loads(path.read_bytes()) @@ -133,12 +147,12 @@ async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> N "area": event.sender.area, "level": event.sender.level, "role": event.sender.role, - "title": event.sender.title, + "title": event.sender.title }, - "to_me": str(event.to_me), + "to_me": str(event.to_me) } try: - with open(path, "w", encoding="utf-8") as r: + with open(path, 'w', encoding='utf-8') as r: r.write(json.dumps(data, indent=4)) logger.debug(f"写入消息成功,id: {event.message_id}") except WriteError: @@ -154,44 +168,52 @@ async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> N # 处理:好友请求 request_friend_event = sv.on_request() - @request_friend_event.handle() async def _request_friend_event(bot, event: FriendRequestEvent) -> None: file_name = "request_friend.json" path = ESSENTIAL_DIR / file_name path.parent.mkdir(exist_ok=True, parents=True) - + try: data = json.loads(path.read_bytes()) except: data = dict() - data[event.flag] = {"user_id": event.user_id, "comment": event.comment} + data[event.flag] = { + "user_id": event.user_id, + "comment": event.comment + } try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) + with open(path, 'w', encoding='utf-8') as r: + r.write( + json.dumps( + data, indent=4 + ) + ) except WriteError: raise WriteError("Writing file failed!") - - for superuser in Config.BotSelfConfig.superusers: + + for superuser in BotSelfConfig.superusers: msg = ( "主人,收到一条好友请求:\n" f"请求人:{event.get_user_id()}\n" f"申请信息:{event.comment}\n" f"申请码:{event.flag}" ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg + ) # 处理:邀请入群,如身为管理,还附有入群请求 request_group_event = sv.on_request() - @request_group_event.handle() async def _request_group_event(bot, event: GroupRequestEvent) -> None: file_name = "request_group.json" path = ESSENTIAL_DIR / file_name path.parent.mkdir(exist_ok=True, parents=True) - + try: data = json.loads(path.read_bytes()) except: @@ -200,42 +222,56 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None: "user_id": event.user_id, "group_id": event.group_id, "sub_type": event.sub_type, - "comment": event.comment, + "comment": event.comment } try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) + with open(path, 'w', encoding='utf-8') as r: + r.write( + json.dumps( + data, indent=4 + ) + ) except WriteError: raise WriteError("Writing file failed!") - - for superuser in Config.BotSelfConfig.superusers: + + for superuser in BotSelfConfig.superusers: msg = ( "主人,收到一条入群请求:\n" f"请求人:{event.get_user_id()}\n" f"申请信息:{event.comment}\n" f"申请码:{event.flag}" ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) - + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg + ) + # 处理群成员变动 group_member_event = sv.on_notice() - @group_member_event.handle() async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None: - msg = "好欸!事新人!\n" f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!" + msg = ( + "好欸!事新人!\n" + f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!" + ) await group_member_event.finish(msg) - @group_member_event.handle() async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: if event.is_tome(): if event.user_id != event.self_id: return - msg = "呜呜呜,主人" f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." - for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + msg = ( + "呜呜呜,主人" + f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." + ) + for superuser in BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg + ) else: await group_member_event.finish("阿!有人离开了我们...") @@ -243,82 +279,98 @@ async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: # 处理群管理事件 group_admin_event = sv.on_notice() - @group_admin_event.handle() async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: if not event.is_tome(): return - - for superuser in Config.BotSelfConfig.superusers: + + for superuser in BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( - user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" + user_id=int(superuser), + message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" ) # 处理群禁言事件 group_ban_event = sv.on_notice() - @group_ban_event.handle() async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: if not event.is_tome(): return - + if event.duration: msg = ( "那个..。,主人\n" f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n" f"时长...是 {event.duration} 秒" ) - for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + for superuser in BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg + ) else: - msg = "好欸!主人\n" f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!" - for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + msg = ( + "好欸!主人\n" + f"咱在群 {event.group_id} 的口球被 {event.operator_id} 解除了!" + ) + for superuser in BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg + ) # 处理群红包运气王事件 lucky_read_bag_event = sv.on_notice() - @lucky_read_bag_event.handle() async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: - msg = "8行,这可忍?" f"gkd [CQ:at,qq={event.user_id}] 发一个!" + msg = ( + "8行,这可忍?" + f"gkd [CQ:at,qq={event.user_id}] 发一个!" + ) await lucky_read_bag_event.finish(Message(msg)) # 处理群文件上传事件 group_file_upload_event = sv.on_notice() - @group_file_upload_event.handle() -async def _group_file_upload_event(bot, event: GroupUploadNoticeEvent) -> None: +async def _group_file_upload_event(bot, + event: GroupUploadNoticeEvent) -> None: await group_file_upload_event.finish("让我康康传了啥好东西") # 处理撤回事件 recall_event = sv.on_notice() - @recall_event.handle() async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None: try: repo = await bot.get_msg(message_id=event.message_id) except: return - + group = event.group_id repo = str(repo["message"]) check = await coolq_code_check(repo, group=group) if not check: repo = repo.replace("CQ", "QC") - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[群:{event.group_id}]\n" "撤回了\n" f"{repo}" - - for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{event.user_id}@[群:{event.group_id}]\n" + "撤回了\n" + f"{repo}" + ) + for superuser in BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg + ) @recall_event.handle() async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: @@ -326,15 +378,23 @@ async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: repo = await bot.get_msg(message_id=event.message_id) except: return - + user = event.user_id repo = str(repo["message"]) check = await coolq_code_check(repo, user) if not check: repo = repo.replace("CQ", "QC") - - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[私聊]" "撤回了\n" f"{repo}" + + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{event.user_id}@[私聊]" + "撤回了\n" + f"{repo}" + ) await bot.send(event, "咱看到惹~!") - for superuser in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + for superuser in BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg + ) diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py index 72e5abb..39b9f28 100644 --- a/ATRI/plugins/funny.py +++ b/ATRI/plugins/funny.py @@ -12,7 +12,7 @@ from ATRI.utils.limit import is_too_exciting from ATRI.rule import is_in_service from ATRI.utils.request import post_bytes from ATRI.utils.translate import to_simple_string -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError __doc__ = """ @@ -22,25 +22,27 @@ __doc__ = """ 来句笑话 """ -get_laugh = sv.on_command(cmd="来句笑话", docs=__doc__, rule=is_in_service("来句笑话")) - +get_laugh = sv.on_command( + cmd='来句笑话', + docs=__doc__, + rule=is_in_service('来句笑话') +) @get_laugh.handle() async def _get_laugh(bot: Bot, event: MessageEvent) -> None: user_name = event.sender.nickname laugh_list = [] - - FILE = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt" - with open(FILE, "r", encoding="utf-8") as r: + + FILE = Path('.') / 'ATRI' / 'data' / 'database' / 'funny' / 'laugh.txt' + with open(FILE, 'r', encoding='utf-8') as r: for line in r: - laugh_list.append(line.strip("\n")) - + laugh_list.append(line.strip('\n')) + result = choice(laugh_list) await get_laugh.finish(result.replace("%name", user_name)) -me_to_you = sv.on_message() - +me_to_you = sv.on_message(priority=5) @me_to_you.handle() async def _me_to_you(bot: Bot, event: MessageEvent) -> None: @@ -51,38 +53,6 @@ async def _me_to_you(bot: Bot, event: MessageEvent) -> None: __doc__ = """ -随机选择一位群友成为我的老婆! -权限组:所有人 -用法: - 抽老婆 -""" - -roll_wife = sv.on_command(cmd="抽老婆", docs=__doc__, rule=is_in_service("抽老婆")) - - -@roll_wife.handle() -async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None: - user = event.user_id - group = event.group_id - - user_name = await bot.get_group_member_info(group_id=group, user_id=user) - user_name = user_name["nickname"] - run = await is_too_exciting(user, group, 5, True) - if not run: - return - - luck_list = await bot.get_group_member_list(group_id=group) - luck_user = choice(luck_list) - luck_qq = luck_user["user_id"] - luck_user = luck_user["nickname"] - msg = "5秒后咱将随机抽取一位群友成为\n" f"{user_name} 的老婆!究竟是谁呢~?" - await bot.send(event, msg) - await asyncio.sleep(5) - msg = f"> {luck_user}({luck_qq})\n" f"恭喜成为 {user_name} 的老婆~⭐" - await bot.send(event, msg) - - -__doc__ = """ 伪造转发 权限组:所有人 用法: @@ -95,34 +65,43 @@ __doc__ = """ /fakemsg 123456789*生草人*草 114514*仙贝*臭死了 """ -fake_msg = sv.on_command(cmd="/fakemsg", docs=__doc__, rule=is_in_service("fakemsg")) - +fake_msg = sv.on_command( + cmd="/fakemsg", + docs=__doc__, + rule=is_in_service('fakemsg') +) @fake_msg.handle() async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None: - msg = str(event.message).split(" ") + msg = str(event.message).split(' ') user = event.user_id group = event.group_id - node = [] - check = await is_too_exciting(user, group, 1, True) - + node = list() + check = is_too_exciting(user, 1, seconds=600) + if check: for i in msg: - args = i.split("*") + args = i.split('*') qq = args[0] - name = args[1].replace("[", "[") - name = name.replace("]", "]") - repo = args[2].replace("[", "[") - repo = repo.replace("]", "]") - dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}} + name = args[1].replace('[', '[') + name = name.replace(']', ']') + repo = args[2].replace('[', '[') + repo = repo.replace(']', ']') + dic = { + "type": "node", + "data": { + "name": name, + "uin": qq, + "content": repo + } + } node.append(dic) await bot.send_group_forward_msg(group_id=group, messages=node) EAT_URL = "https://wtf.hiigara.net/api/run/{}" -eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service("今天吃什么")) - +eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service('今天吃什么')) @eat_wat.handle() async def _eat(bot: Bot, event: MessageEvent) -> None: @@ -131,32 +110,32 @@ async def _eat(bot: Bot, event: MessageEvent) -> None: user_n = event.sender.nickname arg = re.findall(r"[今|明|后|大后]天(.*?)吃什么", msg)[0] nd = re.match(r"[今|明|后|大后][天]", msg)[0] - + if arg == "中午": a = f"LdS4K6/{randint(0, 999999)}" url = EAT_URL.format(a) params = {"event": "ManualRun"} try: data = json.loads(await post_bytes(url, params)) - except RequestTimeOut: - raise RequestTimeOut("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", nd) + except RequestError: + raise RequestError('Request failed!') + + text = to_simple_string(data['text']).replace('今天', nd) get_a = re.search(r"非常(.*?)的", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, "") - + result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, '') + elif arg == "晚上": a = f"KaTMS/{randint(0, 999999)}" url = EAT_URL.format(a) params = {"event": "ManualRun"} try: data = json.loads(await post_bytes(url, params)) - except RequestTimeOut: - raise RequestTimeOut("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", "") + except RequestError: + raise RequestError('Request failed!') + + text = to_simple_string(data['text']).replace('今天', '') result = f"> {MessageSegment.at(user)}\n" + text - + else: rd = randint(1, 10) if rd == 5: @@ -167,13 +146,11 @@ async def _eat(bot: Bot, event: MessageEvent) -> None: params = {"event": "ManualRun"} try: data = json.loads(await post_bytes(url, params)) - except RequestTimeOut: - raise RequestTimeOut("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", nd) + except RequestError: + raise RequestError('Request failed!') + + text = to_simple_string(data['text']).replace('今天', nd) get_a = re.match(r"(.*?)的智商", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace( - get_a, f"{user_n}的智商" - ) - + result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, f'{user_n}的智商') + await eat_wat.finish(Message(result)) diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py index e7c0d80..27c5328 100644 --- a/ATRI/plugins/github.py +++ b/ATRI/plugins/github.py @@ -4,7 +4,7 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent from ATRI.service import Service as sv from ATRI.utils.request import get_bytes -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}" @@ -12,7 +12,6 @@ URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}" github_issues = sv.on_message() - @github_issues.handle() async def _github_issues(bot: Bot, event: MessageEvent) -> None: msg = str(event.message) @@ -20,19 +19,21 @@ async def _github_issues(bot: Bot, event: MessageEvent) -> None: need_info = re.findall(patt, msg) if not need_info: return - + for i in need_info: need_info = list(i) owner = need_info[0] repo = need_info[1] issue_number = need_info[2] - url = URL.format(owner=owner, repo=repo, issue_number=issue_number) - + url = URL.format(owner=owner, + repo=repo, + issue_number=issue_number) + try: data = await get_bytes(url) - except RequestTimeOut: + except RequestError: return - + data = json.loads(data) msg0 = ( f"{repo}: #{issue_number} {data['state']}\n" diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py index d2754b1..32d3f65 100644 --- a/ATRI/plugins/help.py +++ b/ATRI/plugins/help.py @@ -7,7 +7,7 @@ from ATRI.service import SERVICE_DIR from ATRI.service import Service as sv -SERVICE_DIR = SERVICE_DIR / "services" +SERVICE_DIR = SERVICE_DIR / 'services' __doc__ = """ @@ -19,12 +19,14 @@ __doc__ = """ /help info (cmd) """ -help = sv.on_command(cmd="/help", docs=__doc__) - +help = sv.on_command( + cmd="/help", + docs=__doc__ +) @help.handle() async def _help(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") + msg = str(event.message).split(' ') if msg[0] == "": msg = ( "呀?找不到路了?\n" @@ -39,7 +41,7 @@ async def _help(bot: Bot, event: MessageEvent) -> None: for _, _, i in os.walk(SERVICE_DIR): for a in i: f = SERVICE_DIR / a - files.append(json.loads(f.read_bytes())["command"]) + files.append(json.loads(f.read_bytes())['command']) cmds = " | ".join(map(str, files)) msg = "咱能做很多事!比如:\n" + cmds msg0 = msg + "\n没反应可能是没权限...或者为探测类型...不属于可直接触发命令..." @@ -51,9 +53,13 @@ async def _help(bot: Bot, event: MessageEvent) -> None: try: data = json.loads(path.read_bytes()) except: - await help.finish("未找到相关命令...") + await help.finish('未找到相关命令...') - msg = f"{cmd} INFO:\n" f"Enabled: {data['enabled']}\n" f"{data['docs']}" + msg = ( + f"{cmd} INFO:\n" + f"Enabled: {data['enabled']}\n" + f"{data['docs']}" + ) await help.finish(msg) else: - await help.finish("请检查输入...") + await help.finish('请检查输入...') diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py index 1078ca3..09ff2c4 100644 --- a/ATRI/plugins/hitokoto.py +++ b/ATRI/plugins/hitokoto.py @@ -4,14 +4,14 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent from ATRI.rule import is_in_service, to_bot from ATRI.service import Service as sv -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError from ATRI.utils.list import count_list, del_list_aim from ATRI.utils.request import get_bytes URL = [ "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/a.json", "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/b.json", - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json", + "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json" ] sick_list = [] @@ -24,10 +24,12 @@ __doc__ = """ """ hitokoto = sv.on_command( - cmd="一言", aliases={"抑郁一下", "网抑云"}, docs=__doc__, rule=is_in_service("一言") & to_bot() + cmd='一言', + aliases={'抑郁一下', '网抑云'}, + docs=__doc__, + rule=is_in_service('一言') & to_bot() ) - @hitokoto.handle() async def _hitokoto(bot: Bot, event: MessageEvent) -> None: global sick_list @@ -39,13 +41,16 @@ async def _hitokoto(bot: Bot, event: MessageEvent) -> None: await hitokoto.finish("额......需要咱安慰一下嘛~?") elif count_list(sick_list, user) == 6: sick_list = del_list_aim(sick_list, user) - msg = "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" "我...我会守在你身边的!...嗯..一定" + msg = ( + "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" + "我...我会守在你身边的!...嗯..一定" + ) await hitokoto.finish(msg) else: sick_list.append(user) url = choice(URL) try: data = json.loads(await get_bytes(url)) - except RequestTimeOut: - raise RequestTimeOut("Request failed!") - await hitokoto.finish(data[randint(1, len(data) - 1)]["hitokoto"]) + except RequestError: + raise RequestError("Request failed!") + await hitokoto.finish(data[randint(1, len(data) - 1)]['hitokoto']) diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py index b316020..d9f1ffe 100644 --- a/ATRI/plugins/manage/__init__.py +++ b/ATRI/plugins/manage/__init__.py @@ -4,4 +4,5 @@ from pathlib import Path _sub_plugins = set() -_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve())) +_sub_plugins |= nonebot.load_plugins( + str((Path(__file__).parent / 'modules').resolve())) diff --git a/ATRI/plugins/manage/modules/block.py b/ATRI/plugins/manage/modules/block.py index 0c2fcec..7b982d4 100644 --- a/ATRI/plugins/manage/modules/block.py +++ b/ATRI/plugins/manage/modules/block.py @@ -12,24 +12,36 @@ __doc__ = """ 封禁用户 QQ号 """ -block_user = sv.on_command(cmd="封禁用户", docs=__doc__, permission=SUPERUSER) - +block_user = sv.on_command( + cmd="封禁用户", + docs=__doc__, + permission=SUPERUSER +) @block_user.args_parser # type: ignore -async def _block_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _block_user_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - cancel = ["算了", "罢了"] + cancel = ['算了', '罢了'] if msg in cancel: - await block_user.finish("好吧...") + await block_user.finish('好吧...') if not msg: - await block_user.reject("是谁呢?!GKD!") + await block_user.reject('是谁呢?!GKD!') else: - state["noob"] = msg - + state['noob'] = msg @block_user.handle() async def _block_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob = state["noob"] + msg = str(event.message).strip() + if msg: + state['noob'] = msg + +@block_user.got('noob', prompt='是谁呢?!GKD!') +async def _deal_block_user(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + noob = state['noob'] sv.BlockSystem.control_list(True, user=noob) msg = f"用户[{noob}]已被封禁(;′⌒`)" await block_user.finish(msg) @@ -42,24 +54,36 @@ __doc__ = """ 解封用户 QQ号 """ -unblock_user = sv.on_command(cmd="解封用户", docs=__doc__, permission=SUPERUSER) - +unblock_user = sv.on_command( + cmd="解封用户", + docs=__doc__, + permission=SUPERUSER +) @unblock_user.args_parser # type: ignore -async def _unblock_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _unblock_user_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - cancel = ["算了", "罢了"] + cancel = ['算了', '罢了'] if msg in cancel: - await unblock_user.finish("好吧...") + await unblock_user.finish('好吧...') if not msg: - await unblock_user.reject("要原谅谁呢...") + await unblock_user.reject('要原谅谁呢...') else: - state["forgive"] = msg - + state['forgive'] = msg @unblock_user.handle() async def _unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - forgive = state["forgive"] + msg = str(event.message).strip() + if msg: + state['forgive'] = msg + +@unblock_user.got('forgive', prompt='要原谅谁呢...') +async def _deal_unblock_user(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + forgive = state['forgive'] sv.BlockSystem.control_list(False, user=forgive) msg = f"用户[{forgive}]已被解封ヾ(´・ω・`)ノ" await unblock_user.finish(msg) @@ -72,24 +96,36 @@ __doc__ = """ 封禁群 群号 """ -block_group = sv.on_command(cmd="封禁群", docs=__doc__, permission=SUPERUSER) - +block_group = sv.on_command( + cmd="封禁群", + docs=__doc__, + permission=SUPERUSER +) @block_group.args_parser # type: ignore -async def _block_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _block_group_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - cancel = ["算了", "罢了"] + cancel = ['算了', '罢了'] if msg in cancel: - await block_user.finish("好吧...") + await block_user.finish('好吧...') if not msg: - await block_user.reject("是哪个群?!GKD!") + await block_user.reject('是哪个群?!GKD!') else: - state["noob_g"] = msg - + state['noob_g'] = msg @block_group.handle() async def _block_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob_g = state["noob_g"] + msg = str(event.message).strip() + if msg: + state['noob_g'] = msg + +@block_group.got('noob_g', prompt='是哪个群?!GKD!') +async def _deal_block_group(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + noob_g = state['noob_g'] sv.BlockSystem.control_list(True, group=noob_g) msg = f"群[{noob_g}]已被封禁(;′⌒`)" await block_user.finish(msg) @@ -102,24 +138,38 @@ __doc__ = """ 解封 群号 """ -unblock_group = sv.on_command(cmd="解封群", docs=__doc__, permission=SUPERUSER) - +unblock_group = sv.on_command( + cmd="解封群", + docs=__doc__, + permission=SUPERUSER +) @unblock_group.args_parser # type: ignore -async def _unblock_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _unblock_group_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - cancel = ["算了", "罢了"] + cancel = ['算了', '罢了'] if msg in cancel: - await block_user.finish("好吧...") + await block_user.finish('好吧...') if not msg: - await block_user.reject("要原谅哪个群呢...") + await block_user.reject('要原谅哪个群呢...') else: - state["forgive_g"] = msg - + state['forgive_g'] = msg @unblock_group.handle() -async def _unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - forgive_g = state["forgive_g"] +async def _unblock_group(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + msg = str(event.message).strip() + if msg: + state['forgive_g'] = msg + +@unblock_group.got('forgive_g', prompt='要原谅哪个群呢...') +async def _deal_unblock_group(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + forgive_g = state['forgive_g'] sv.BlockSystem.control_list(False, group=forgive_g) msg = f"群[{forgive_g}]已被解封ヾ(´・ω・`)ノ" await unblock_user.finish(msg) diff --git a/ATRI/plugins/manage/modules/broadcast.py b/ATRI/plugins/manage/modules/broadcast.py index 7f7816d..5086fcf 100644 --- a/ATRI/plugins/manage/modules/broadcast.py +++ b/ATRI/plugins/manage/modules/broadcast.py @@ -15,46 +15,52 @@ __doc__ = """ 广播 内容 """ -broadcast = sv.on_command(cmd="广播", docs=__doc__, permission=SUPERUSER) - +broadcast = sv.on_command( + cmd="广播", + docs=__doc__, + permission=SUPERUSER +) @broadcast.args_parser # type: ignore -async def _broadcast_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _broadcast_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message) - quit_list = ["算了", "罢了", "取消"] + quit_list = ['算了', '罢了', '取消'] if msg in quit_list: - await broadcast.finish("好吧...") + await broadcast.finish('好吧...') if not msg: - await broadcast.reject("想群发啥呢0w0") + await broadcast.reject('想群发啥呢0w0') else: - state["msg"] = msg - + state['msg'] = msg @broadcast.handle() async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state["msg"] = msg - - [email protected]("msg", prompt="请告诉咱需要群发的内容~!") -async def _deal_broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["msg"] + state['msg'] = msg + [email protected]('msg', prompt='请告诉咱需要群发的内容~!') +async def _deal_broadcast(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + msg = state['msg'] group_list = await bot.get_group_list() succ_list = [] err_list = [] - + for group in group_list: await asyncio.sleep(randint(0, 2)) try: - await bot.send_group_msg(group_id=group["group_id"], message=msg) + await bot.send_group_msg(group_id=group["group_id"], + message=msg) except BaseException: err_list.append(group["group_id"]) - + msg0 = "" for i in err_list: msg0 += f" {i}\n" - + repo_msg = ( f"推送消息:\n{msg}\n" "————————\n" diff --git a/ATRI/plugins/manage/modules/debug.py b/ATRI/plugins/manage/modules/debug.py index 7bd3992..259e791 100644 --- a/ATRI/plugins/manage/modules/debug.py +++ b/ATRI/plugins/manage/modules/debug.py @@ -11,7 +11,7 @@ from ATRI.utils.ub_paste import paste from ATRI.exceptions import load_error -level_list = ["info", "warning", "error", "debug"] +level_list = ['info', 'warning', 'error', 'debug'] __doc__ = """ @@ -25,47 +25,48 @@ __doc__ = """ get_console = sv.on_command( cmd="获取log", - aliases={"获取LOG", "获取控制台", "获取控制台信息"}, + aliases={'获取LOG', '获取控制台', '获取控制台信息'}, docs=__doc__, - permission=SUPERUSER, + permission=SUPERUSER ) - @get_console.handle() async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() + msg = str(event.message).split(' ') if msg: - state["level"] = msg + state['level'] = msg[0] + try: + state['line'] = msg[1] + except Exception: + pass - -@get_console.got("level", prompt="需要获取的等级是?") +@get_console.got('level', prompt='需要获取的等级是?') async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None: - quit_list = ["算了", "罢了", "不了"] - if state["level"] in quit_list: - await get_console.finish("好吧...") - + quit_list = ['算了', '罢了', '不了'] + if state['level'] in quit_list: + await get_console.finish('好吧...') -@get_console.got("line", prompt="需要获取的行数是?") +@get_console.got('line', prompt='需要获取的行数是?') async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None: - level = state["level"] - line = state["line"] + level = state['level'] + line = state['line'] repo = str() - + path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log" - logs = await open_file(path, "readlines") - + logs = await open_file(path, 'readlines') + try: - content = logs[int(line) :] # type: ignore + content = logs[int(line):] # type: ignore repo = "\n".join(content).replace("[36mATRI[0m", "ATRI") except IndexError: - await get_console.finish(f"行数错误...max: {len(logs)}") # type: ignore - + await get_console.finish(f'行数错误...max: {len(logs)}') # type: ignore + data = FormData() - data.add_field("poster", "ATRI running log") - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", repo) - + data.add_field('poster', 'ATRI running log') + data.add_field('syntax', 'text') + data.add_field('expiration', 'day') + data.add_field('content', repo) + msg0 = f"> {event.sender.nickname}\n" msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" await track_error.finish(msg0) @@ -79,39 +80,41 @@ __doc__ = """ """ track_error = sv.on_command( - cmd="track", aliases={"追踪"}, docs=__doc__, permission=SUPERUSER + cmd="track", + aliases={'追踪'}, + docs=__doc__, + permission=SUPERUSER ) - @track_error.args_parser # type: ignore -async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _track_error_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - cancel = ["算了", "罢了"] + cancel = ['算了', '罢了'] if msg in cancel: - await track_error.finish("好吧...") + await track_error.finish('好吧...') if not msg: - await track_error.reject("欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") + await track_error.reject('欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ') else: - state["track"] = msg - + state['track'] = msg @track_error.handle() async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state["track"] = msg - + state['track'] = msg -@track_error.got("track", prompt="欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") +@track_error.got('track', prompt='欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ') async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None: - track_id = state["track"] + track_id = state['track'] data = dict() - + try: data = load_error(track_id) except BaseException: - await track_error.finish("未发现对应ID呢...(⇀‸↼‶)") - + await track_error.finish('未发现对应ID呢...(⇀‸↼‶)') + msg = ( f"ID: [{track_id}]\n" f"Time: [{data['time']}]\n" @@ -119,13 +122,13 @@ async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None: "——————\n" f"{data['content']}" ) - + data = FormData() - data.add_field("poster", track_id) - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", msg) - + data.add_field('poster', track_id) + data.add_field('syntax', 'text') + data.add_field('expiration', 'day') + data.add_field('content', msg) + msg0 = f"> {event.sender.nickname}\n" msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" await track_error.finish(msg0) diff --git a/ATRI/plugins/manage/modules/dormant.py b/ATRI/plugins/manage/modules/dormant.py index cd72d47..d5ae321 100644 --- a/ATRI/plugins/manage/modules/dormant.py +++ b/ATRI/plugins/manage/modules/dormant.py @@ -13,10 +13,12 @@ __doc__ = """ """ dormant_enabled = sv.on_command( - cmd="休眠", docs=__doc__, rule=to_bot(), permission=SUPERUSER + cmd='休眠', + docs=__doc__, + rule=to_bot(), + permission=SUPERUSER ) - @dormant_enabled.handle() async def _dormant_enabled(bot: Bot, event: MessageEvent) -> None: sv.Dormant.control_dormant(True) @@ -32,10 +34,12 @@ __doc__ = """ """ dormant_disabled = sv.on_command( - cmd="苏醒", docs=__doc__, rule=to_bot(), permission=SUPERUSER + cmd='苏醒', + docs=__doc__, + rule=to_bot(), + permission=SUPERUSER ) - @dormant_disabled.handle() async def _dormant_disabled(bot: Bot, event: MessageEvent) -> None: sv.Dormant.control_dormant(False) diff --git a/ATRI/plugins/manage/modules/request.py b/ATRI/plugins/manage/modules/request.py index 762e918..a09b721 100644 --- a/ATRI/plugins/manage/modules/request.py +++ b/ATRI/plugins/manage/modules/request.py @@ -9,7 +9,7 @@ from ATRI.service import Service as sv from ATRI.exceptions import ReadFileError, FormatError -ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" +ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' __doc__ = """ @@ -19,13 +19,16 @@ __doc__ = """ 查看申请列表 """ -req_list = sv.on_command(cmd="申请列表", docs=__doc__, permission=SUPERUSER) - +req_list = sv.on_command( + cmd="申请列表", + docs=__doc__, + permission=SUPERUSER +) @req_list.handle() async def _req_list(bot: Bot, event: MessageEvent) -> None: - path_f = ESSENTIAL_DIR / "request_friend.json" - path_g = ESSENTIAL_DIR / "request_group.json" + path_f = ESSENTIAL_DIR / 'request_friend.json' + path_g = ESSENTIAL_DIR / 'request_group.json' data_f, data_g = dict() try: data_f = json.loads(path_f.read_bytes()) @@ -35,7 +38,7 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None: data_g = json.loads(path_g.read_bytes()) except ReadFileError: msg_g = "[读取文件失败]" - + msg_f = str() for i in data_f.keys(): msg_f += f"{i} | {data_f[i]['user_id']} | {data_f[i]['comment']}\n" @@ -43,65 +46,82 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None: msg_g = str() for i in data_g.keys(): msg_g += f"{i} | {data_g[i]['sub_type']} | {data_g[i]['user_id']} | {data_g[i]['comment']}\n" - - msg = "好友/群申请列表如下:\n" "· 好友:\n" f"{msg_f}" "· 群:\n" f"{msg_g}" + + msg = ( + "好友/群申请列表如下:\n" + "· 好友:\n" + f"{msg_f}" + "· 群:\n" + f"{msg_g}" + ) await req_list.finish(msg) -req_deal = sv.on_regex(r"[同意|拒绝][好友|群]申请", permission=SUPERUSER) - +req_deal = sv.on_regex( + r'(同意|拒绝)(好友|群)申请', + permission=SUPERUSER +) @req_deal.handle() async def _req_deal(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - arg = re.findall(r"[同意|拒绝][好友|群]申请", msg[0]) + msg = str(event.message).split(' ') + arg = re.findall(r'(同意|拒绝)(好友|群)申请', msg[0])[0] app = arg[0] _type = arg[1] - + if not msg[1]: - await req_deal.finish(f"正确用法!速看\n{app}{_type}申请 (reqid)") - - path = ESSENTIAL_DIR / "request_group.json" - data_g = dict() - try: - data_g = json.loads(path.read_bytes()) - except FileExistsError: - await req_deal.finish("读取群数据失败,可能并没有请求...") - + await req_deal.finish(f'正确用法!速看\n{app}{_type}申请 (reqid)') + reqid = msg[1] if app == "同意": if _type == "好友": try: - await bot.set_friend_add_request(flag=reqid, approve=True) - await req_deal.finish("完成~!已同意申请") + await bot.set_friend_add_request(flag=reqid, + approve=True) + await req_deal.finish('完成~!已同意申请') except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") + await req_deal.finish('请检查输入的值是否正确——!') elif _type == "群": + path = ESSENTIAL_DIR / "request_group.json" + data_g = dict() try: - await bot.set_group_add_request( - flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=True - ) - await req_deal.finish("完成~!已同意申请") + data_g = json.loads(path.read_bytes()) + except FileExistsError: + await req_deal.finish("读取群数据失败,可能并没有请求...") + + try: + await bot.set_group_add_request(flag=reqid, + sub_type=data_g[reqid]['sub_type'], + approve=True) + await req_deal.finish('完成~!已同意申请') except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") + await req_deal.finish('请检查输入的值是否正确——!') else: - await req_deal.finish("请检查输入的值是否正确——!") + await req_deal.finish('请检查输入的值是否正确——!') elif app == "拒绝": if _type == "好友": try: - await bot.set_friend_add_request(flag=reqid, approve=False) - await req_deal.finish("完成~!已拒绝申请") + await bot.set_friend_add_request(flag=reqid, + approve=False) + await req_deal.finish('完成~!已拒绝申请') except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") + await req_deal.finish('请检查输入的值是否正确——!') elif _type == "群": + path = ESSENTIAL_DIR / "request_group.json" + data_g = dict() + try: + data_g = json.loads(path.read_bytes()) + except FileExistsError: + await req_deal.finish("读取群数据失败,可能并没有请求...") + try: - await bot.set_group_add_request( - flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=False - ) - await req_deal.finish("完成~!已拒绝申请") + await bot.set_group_add_request(flag=reqid, + sub_type=data_g[reqid]['sub_type'], + approve=False) + await req_deal.finish('完成~!已拒绝申请') except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") + await req_deal.finish('请检查输入的值是否正确——!') else: - await req_deal.finish("请检查输入的值是否正确——!") + await req_deal.finish('请检查输入的值是否正确——!') else: - await req_deal.finish("请检查输入的值是否正确——!") + await req_deal.finish('请检查输入的值是否正确——!') diff --git a/ATRI/plugins/manage/modules/service.py b/ATRI/plugins/manage/modules/service.py index a3624df..6489ad6 100644 --- a/ATRI/plugins/manage/modules/service.py +++ b/ATRI/plugins/manage/modules/service.py @@ -5,7 +5,7 @@ from nonebot.adapters.cqhttp import ( Bot, MessageEvent, GroupMessageEvent, - PrivateMessageEvent, + PrivateMessageEvent ) from ATRI.service import Service as sv @@ -21,40 +21,46 @@ __doc__ = """ """ cur_service_ena = sv.on_command( - cmd="启用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER + cmd="启用功能", + docs=__doc__, + permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER ) - @cur_service_ena.args_parser # type: ignore -async def _cur_ena_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: +async def _cur_ena_load(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] + quit_list = ['算了', '罢了', '取消'] if msg in quit_list: - await cur_service_ena.finish("好吧...") + await cur_service_ena.finish('好吧...') if not msg: - await cur_service_ena.reject("请告诉咱目标命令!") + await cur_service_ena.reject('请告诉咱目标命令!') else: - state["service_e"] = msg - + state['service_e'] = msg @cur_service_ena.handle() -async def _cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: +async def _cur_ena(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: msg = str(event.message).strip() if msg: - state["service_e"] = msg + state['service_e'] = msg - -@cur_service_ena.got("service_e", prompt="请告诉咱目标命令!") -async def _deal_cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - cmd = state["service_e"] +@cur_service_ena.got('service_e', prompt='请告诉咱目标命令!') +async def _deal_cur_ena(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: + cmd = state['service_e'] group = str(event.group_id) sv.control_service(cmd, False, True, group=group) - await cur_service_ena.finish(f"成功!本群已启用:{cmd}") - + await cur_service_ena.finish(f'成功!本群已启用:{cmd}') @cur_service_ena.handle() -async def _refuse_cur_ena(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: - await cur_service_ena.finish("只能在群聊中决定哦...") +async def _refuse_cur_ena(bot: Bot, + event: PrivateMessageEvent, + state: T_State) -> None: + await cur_service_ena.finish('只能在群聊中决定哦...') __doc__ = """ @@ -67,40 +73,46 @@ __doc__ = """ """ cur_service_dis = sv.on_command( - cmd="禁用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER + cmd="禁用功能", + docs=__doc__, + permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER ) - @cur_service_dis.args_parser # type: ignore -async def _cur_dis_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: +async def _cur_dis_load(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] + quit_list = ['算了', '罢了', '取消'] if msg in quit_list: - await cur_service_dis.finish("好吧...") + await cur_service_dis.finish('好吧...') if not msg: - await cur_service_dis.reject("请告诉咱目标命令!") + await cur_service_dis.reject('请告诉咱目标命令!') else: - state["service_d"] = msg - + state['service_d'] = msg @cur_service_dis.handle() -async def _cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: +async def _cur_dis(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: msg = str(event.message).strip() if msg: - state["service_d"] = msg + state['service_d'] = msg - -@cur_service_dis.got("service_d", prompt="请告诉咱目标命令!") -async def _deal_cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - cmd = state["service_d"] +@cur_service_dis.got('service_d', prompt='请告诉咱目标命令!') +async def _deal_cur_dis(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: + cmd = state['service_d'] group = str(event.group_id) sv.control_service(cmd, False, False, group=group) - await cur_service_dis.finish(f"成功!本群已禁用:{cmd}") - + await cur_service_dis.finish(f'成功!本群已禁用:{cmd}') @cur_service_dis.handle() -async def _refuse_cur_dis(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: - await cur_service_dis.finish("只能在群聊中决定哦...") +async def _refuse_cur_dis(bot: Bot, + event: PrivateMessageEvent, + state: T_State) -> None: + await cur_service_dis.finish('只能在群聊中决定哦...') __doc__ = """ @@ -112,33 +124,40 @@ __doc__ = """ 全局启用 以图搜图 """ -glo_service_ena = sv.on_command(cmd="全局启用", docs=__doc__, permission=SUPERUSER) - +glo_service_ena = sv.on_command( + cmd="全局启用", + docs=__doc__, + permission=SUPERUSER +) @glo_service_ena.args_parser # type: ignore -async def _glo_ena_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _glo_ena_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] + quit_list = ['算了', '罢了', '取消'] if msg in quit_list: - await glo_service_ena.finish("好吧...") + await glo_service_ena.finish('好吧...') if not msg: - await glo_service_ena.reject("请告诉咱目标命令!") + await glo_service_ena.reject('请告诉咱目标命令!') else: - state["service_e_g"] = msg - + state['service_e_g'] = msg @glo_service_ena.handle() -async def _glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _glo_ena(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() if msg: - state["service_e_g"] = msg - + state['service_e_g'] = msg -@glo_service_ena.got("service_e_g", prompt="请告诉咱目标命令!") -async def _deal_glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: - cmd = state["service_e_g"] +@glo_service_ena.got('service_e_g', prompt='请告诉咱目标命令!') +async def _deal_glo_ena(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + cmd = state['service_e_g'] sv.control_service(cmd, True, True) - await glo_service_ena.finish(f"成功!已全局启用:{cmd}") + await glo_service_ena.finish(f'成功!已全局启用:{cmd}') __doc__ = """ @@ -150,30 +169,37 @@ __doc__ = """ 全局禁用 以图搜图 """ -glo_service_dis = sv.on_command(cmd="全局禁用", docs=__doc__, permission=SUPERUSER) - +glo_service_dis = sv.on_command( + cmd="全局禁用", + docs=__doc__, + permission=SUPERUSER +) @glo_service_dis.args_parser # type: ignore -async def _glo_dis_load(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _glo_dis_load(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] + quit_list = ['算了', '罢了', '取消'] if msg in quit_list: - await glo_service_dis.finish("好吧...") + await glo_service_dis.finish('好吧...') if not msg: - await glo_service_dis.reject("请告诉咱目标命令!") + await glo_service_dis.reject('请告诉咱目标命令!') else: - state["service_d_g"] = msg - + state['service_d_g'] = msg @glo_service_dis.handle() -async def _glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _glo_dis(bot: Bot, + event: MessageEvent, + state: T_State) -> None: msg = str(event.message).strip() if msg: - state["service_d_g"] = msg - + state['service_d_g'] = msg -@glo_service_dis.got("service_d_g", prompt="请告诉咱目标命令!") -async def _deal_glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: - cmd = state["service_d_g"] +@glo_service_dis.got('service_d_g', prompt='请告诉咱目标命令!') +async def _deal_glo_dis(bot: Bot, + event: MessageEvent, + state: T_State) -> None: + cmd = state['service_d_g'] sv.control_service(cmd, True, False) - await glo_service_dis.finish(f"成功!已全局禁用:{cmd}") + await glo_service_dis.finish(f'成功!已全局禁用:{cmd}') diff --git a/ATRI/plugins/manage/modules/shutdown.py b/ATRI/plugins/manage/modules/shutdown.py index 78f23e8..11b2b1b 100644 --- a/ATRI/plugins/manage/modules/shutdown.py +++ b/ATRI/plugins/manage/modules/shutdown.py @@ -12,8 +12,11 @@ __doc__ = """ @ 关机 """ -shutdown = sv.on_command(cmd="关机", docs=__doc__, permission=SUPERUSER) - +shutdown = sv.on_command( + cmd="关机", + docs=__doc__, + permission=SUPERUSER +) @shutdown.handle() async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: @@ -21,10 +24,9 @@ async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: if msg: state["msg"] = msg - @shutdown.got("msg", prompt="[WARNING]此项操作将强行终止bot运行,是否继续(y/n)") async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: - t = ["y", "Y", "是"] + t = ['y', 'Y', '是'] if state["msg"] in t: await bot.send(event, "咱还会醒来的,一定") exit(0) diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py index 076ce03..34ea61d 100644 --- a/ATRI/plugins/nsfw.py +++ b/ATRI/plugins/nsfw.py @@ -5,46 +5,43 @@ from nonebot.adapters.cqhttp import Bot, GroupMessageEvent from nonebot.typing import T_State from ATRI.log import logger as log -from ATRI.config import Config +from ATRI.config import BotSelfConfig, NsfwCheck from ATRI.service import Service as sv -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError from ATRI.rule import is_in_service from ATRI.utils.request import get_bytes from ATRI.utils.cqcode import coolq_code_check -nsfw_url = f"http://{Config.NsfwCheck.host}:" f"{Config.NsfwCheck.port}/?url=" +nsfw_url = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url=" nsfw_checking = sv.on_message() - @nsfw_checking.handle() async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None: - if Config.NsfwCheck.enabled: + if NsfwCheck.enabled: msg = str(event.message) user = event.user_id group = event.group_id check = await coolq_code_check(msg, user, group) - + if check: if "image" not in msg: return - + url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0] try: data = json.loads(await get_bytes(url)) except: - log.warning("检测涩图失败,请查阅文档以获取帮助") + log.warning('检测涩图失败,请查阅文档以获取帮助') return - if round(data["score"], 4) > Config.NsfwCheck.passing_rate: - score = "{:.2%}".format(round(data["score"], 4)) - log.debug(f"截获涩图,得分:{score}") - await bot.send(event, f"好涩哦!涩值:{score}\n不行了咱要发给主人看!") - for sup in Config.BotSelfConfig.superusers: - await bot.send_private_msg( - user_id=sup, message=f"{msg}\n涩值: {score}" - ) + if round(data['score'], 4)*100 >= NsfwCheck.passing_rate: + score = "{:.2%}".format(round(data['score'], 4)) + log.debug(f'截获涩图,得分:{score}') + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=f"{msg}\n涩值: {score}") + await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!') else: pass @@ -60,57 +57,60 @@ __doc__ = """ /nsfw 然后Bot会向你索取图片 """ -nsfw_reading = sv.on_command(cmd="/nsfw", docs=__doc__, rule=is_in_service("nsfw")) - +nsfw_reading = sv.on_command( + cmd="/nsfw", + docs=__doc__, + rule=is_in_service('nsfw') +) @nsfw_reading.args_parser # type: ignore async def _nsfw(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: msg = str(event.message) - quit_list = ["算了", "罢了", "不搜了"] + quit_list = ['算了', '罢了', '不搜了'] if msg in quit_list: - await nsfw_reading.finish("好吧") - + await nsfw_reading.finish('好吧') + if not msg: - await nsfw_reading.reject("图呢?") + await nsfw_reading.reject('图呢?') else: - state["pic_nsfw"] = msg - + state['pic_nsfw'] = msg @nsfw_reading.handle() -async def _nsfw_r(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: +async def _nsfw_r(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: user = event.user_id group = event.group_id msg = str(event.message).strip() check = await coolq_code_check(msg, user, group) if check and msg: - state["pic_nsfw"] = msg - + state['pic_nsfw'] = msg -@nsfw_reading.got("pic_nsfw", prompt="图呢?") -async def _nsfw_reading(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = state["pic_nsfw"] +@nsfw_reading.got('pic_nsfw', prompt='图呢?') +async def _nsfw_reading(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: + msg = state['pic_nsfw'] pic = re.findall(r"url=(.*?)]", msg) if not pic: - await nsfw_reading.reject("请发送图片而不是其它东西!!") - + await nsfw_reading.reject('请发送图片而不是其它东西!!') + url = nsfw_url + pic[0] try: data = json.loads(await get_bytes(url)) - except RequestTimeOut: - raise RequestTimeOut("Time out!") - - score = round(data["score"], 4) - result = "{:.2%}".format(round(data["score"], 4)) - if score > 0.9: + except RequestError: + raise RequestError('Time out!') + + score = round(data['score'], 4) + result = "{:.2%}".format(round(data['score'], 4)) + if score >= 0.9: level = "hso! 我要发给主人看!" - for sup in Config.BotSelfConfig.superusers: - await bot.send_private_msg( - user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}" - ) + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}") elif 0.9 > score >= 0.6: level = "嗯,可冲" else: level = "?能不能换张55完全冲不起来" - + repo = f"涩值:{result}\n{level}" await nsfw_reading.finish(repo) diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py index 14b9534..ce5d422 100644 --- a/ATRI/plugins/rich/__init__.py +++ b/ATRI/plugins/rich/__init__.py @@ -8,7 +8,7 @@ from nonebot.adapters.cqhttp.message import MessageSegment from ATRI.service import Service as sv from ATRI.utils.request import get_bytes -from ATRI.utils.list import count_list, del_list_aim +from ATRI.utils.limit import is_too_exciting from .data_source import dec @@ -16,23 +16,22 @@ from .data_source import dec temp_list = [] img_url = [ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/fkrich.png", - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg", + "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg" ] bilibili_rich = sv.on_message() - @bilibili_rich.handle() async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: global temp_list try: msg = str(event.raw_message).replace("\\", "") bv = False - + if "qqdocurl" not in msg: if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace("av", "") + av = re.findall(r"(av\d+)", msg)[0].replace('av', '') else: bv = re.findall(r"(BV\w+)", msg) av = str(dec(bv[0])) @@ -44,29 +43,29 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: async with session.get(url=bv_url) as r: bv = re.findall(r"(BV\w+)", str(r.url)) av = dec(bv[0]) - + if not bv: if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace("av", "") + av = re.findall(r"(av\d+)", msg)[0].replace('av', '') else: return - - if count_list(temp_list, av) == 4: - await bot.send(event, "你是怕别人看不到么发这么多次?") - temp_list = del_list_aim(temp_list, av) + + user = event.user_id + check = is_too_exciting(user, 1, 10) + if not check: return - - temp_list.append(av) - + URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}" - data = json.loads(await get_bytes(URL))["data"] + data = json.loads(await get_bytes(URL))['data'] repo = ( f"{data['bvid']} INFO:\n" f"Title: {data['title']}\n" f"Link: {data['short_link']}\n" "にまねげぴのTencent rich!" ) - await bot.send(event, MessageSegment.image(file=choice(img_url))) + await bot.send( + event, + MessageSegment.image(file=choice(img_url))) await bilibili_rich.finish(repo) except BaseException: return diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py index 59474ff..32ac219 100644 --- a/ATRI/plugins/rich/data_source.py +++ b/ATRI/plugins/rich/data_source.py @@ -1,4 +1,4 @@ -table = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF" +table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF' tr = {} for i in range(58): tr[table[i]] = i @@ -10,13 +10,13 @@ add = 8728348608 def dec(x) -> int: r = 0 for i in range(6): - r += tr[x[s[i]]] * 58 ** i + r += tr[x[s[i]]] * 58**i return (r - add) ^ xor def enc(x) -> str: x = (x ^ xor) + add - r = list("BV1 4 1 7 ") + r = list('BV1 4 1 7 ') for i in range(6): - r[s[i]] = table[x // 58 ** i % 58] - return "".join(r) + r[s[i]] = table[x // 58**i % 58] + return ''.join(r) diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index b4b1497..b1ab9ab 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -6,10 +6,10 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent from nonebot.adapters.cqhttp.message import Message, MessageSegment from nonebot.typing import T_State -from ATRI.config import Config +from ATRI.config import SauceNAO from ATRI.service import Service as sv from ATRI.rule import is_in_service -from ATRI.exceptions import RequestTimeOut +from ATRI.exceptions import RequestError from .data_source import SauceNao @@ -21,53 +21,55 @@ __doc__ = """ 以图搜图 (pic) """ -saucenao = sv.on_command(cmd="以图搜图", docs=__doc__, rule=is_in_service("以图搜图")) - +saucenao = sv.on_command( + cmd='以图搜图', + docs=__doc__, + rule=is_in_service('以图搜图') +) @saucenao.args_parser # type: ignore -async def _load_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _load_saucenao(bot: Bot, event: MessageEvent, + state: T_State) -> None: msg = str(event.message) - quit_list = ["算了", "罢了", "不搜了"] + quit_list = ['算了', '罢了', '不搜了'] if msg in quit_list: - await saucenao.finish("好吧...") - + await saucenao.finish('好吧...') + if not msg: - await saucenao.reject("图呢?") + await saucenao.reject('图呢?') else: - state["pic_sau"] = msg - + state['pic_sau'] = msg @saucenao.handle() async def _sauce_nao(bot: Bot, event: MessageEvent, state: T_State) -> None: msg = str(event.message).strip() if msg: - state["pic_sau"] = msg + state['pic_sau'] = msg - [email protected]("pic_sau", prompt="图呢?") [email protected]('pic_sau', prompt='图呢?') async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["pic_sau"] - img = re.findall(r"url=(.*?)]", msg) + msg = state['pic_sau'] + img = re.findall(r'url=(.*?)]', msg) if not img: - await saucenao.finish("请发送图片而不是其他东西!!") - + await saucenao.finish('请发送图片而不是其他东西!!') + try: - task = SauceNao(api_key=Config.SauceNAO.key) + task = SauceNao(api_key=SauceNAO.key) data = json.loads(await task.search(img[0])) - except RequestTimeOut: - raise RequestTimeOut("Request failed!") - - res = data["results"] + except RequestError: + raise RequestError('Request failed!') + + res = data['results'] result = list() for i in range(0, 3): data = res[i] - + _result = dict() - _result["similarity"] = data["header"]["similarity"] - _result["index_name"] = data["header"]["index_name"] - _result["url"] = choice(data["data"].get("ext_urls", ["None"])) + _result['similarity'] = data['header']['similarity'] + _result['index_name'] = data['header']['index_name'] + _result['url'] = choice(data['data'].get('ext_urls', ['None'])) result.append(_result) - + msg0 = f"> {MessageSegment.at(event.user_id)}" for i in result: msg0 = msg0 + ( @@ -76,5 +78,5 @@ async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: f"Name: {i['index_name']}\n" f"URL: {i['url'].replace('https://', '')}" ) - + await saucenao.finish(Message(msg0)) diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py index efe8fe1..cd88554 100644 --- a/ATRI/plugins/saucenao/data_source.py +++ b/ATRI/plugins/saucenao/data_source.py @@ -5,19 +5,23 @@ URL = "https://saucenao.com/search.php" class SauceNao: - def __init__( - self, api_key: str, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5 - ) -> None: + def __init__(self, + api_key: str, + output_type=2, + testmode=1, + dbmaski=32768, + db=5, + numres=5) -> None: params = dict() - params["api_key"] = api_key - params["output_type"] = output_type - params["testmode"] = testmode - params["dbmaski"] = dbmaski - params["db"] = db - params["numres"] = numres + params['api_key'] = api_key + params['output_type'] = output_type + params['testmode'] = testmode + params['dbmaski'] = dbmaski + params['db'] = db + params['numres'] = numres self.params = params - + async def search(self, url: str): - self.params["url"] = url + self.params['url'] = url res = await post_bytes(url=URL, params=self.params) return res diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py new file mode 100644 index 0000000..d9f1ffe --- /dev/null +++ b/ATRI/plugins/setu/__init__.py @@ -0,0 +1,8 @@ +import nonebot +from pathlib import Path + + +_sub_plugins = set() + +_sub_plugins |= nonebot.load_plugins( + str((Path(__file__).parent / 'modules').resolve())) diff --git a/ATRI/plugins/setu/modules/data_source.py b/ATRI/plugins/setu/modules/data_source.py new file mode 100644 index 0000000..72e2270 --- /dev/null +++ b/ATRI/plugins/setu/modules/data_source.py @@ -0,0 +1,178 @@ +import os +import json +import string +import aiosqlite +from aiosqlite.core import Connection +from pathlib import Path +from random import sample, choice +from aiohttp import ClientSession +from nonebot.adapters.cqhttp.message import MessageSegment, Message + +from ATRI.log import logger as log +from ATRI.config import NsfwCheck +from ATRI.exceptions import RequestError, WriteError +from ATRI.utils.request import get_bytes +from ATRI.utils.img import compress_image + + +TEMP_DIR: Path = Path('.') / 'ATRI' / 'data' / 'temp' / 'setu' +SETU_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'setu' +os.makedirs(TEMP_DIR, exist_ok=True) +os.makedirs(SETU_DIR, exist_ok=True) +NSFW_URL = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url=" +SIZE_REDUCE: bool = True + + +class Hso: + @staticmethod + async def nsfw_check(url: str) -> float: + url = NSFW_URL + url + try: + data = json.loads(await get_bytes(url)) + except RequestError: + raise RequestError('Request failed!') + return round(data['score'], 4) + + @staticmethod + async def _comp_setu(url: str) -> str: + temp_id = ''.join(sample(string.ascii_letters + string.digits, 8)) + file = TEMP_DIR / f'{temp_id}.png' + + try: + async with ClientSession() as session: + async with session.get(url) as r: + data = await r.read() + except RequestError: + raise RequestError('Request img failed!') + + try: + with open(file, 'wb') as r: + r.write(data) + except WriteError: + raise WriteError('Writing img failed!') + + return compress_image(os.path.abspath(file)) + + @classmethod + async def setu(cls, data: dict) -> str: + pid = data['pid'] + title = data['title'] + if SIZE_REDUCE: + img = MessageSegment.image( + 'file:///' + await cls._comp_setu(data['url']), proxy=False) + else: + img = MessageSegment.image(data['url'], proxy=False) + + msg = ( + f"Pid: {pid}\n" + f"Title: {title}\n" + f"{img}" + ) + return msg + + @classmethod + async def acc_setu(cls, d: list) -> str: + data: dict = choice(d) + + for i in data['tags']: + if i['name'] == "R-18": + return "太涩了不方便发w" + + pid = data['id'] + title = data['title'] + try: + pic = data['meta_single_page']['original_image_url'] \ + .replace('pximg.net', 'pixiv.cat') + except Exception: + pic = choice(data['meta_pages'])['original']['image_urls'] \ + .replace('pximg.net', 'pixiv.cat') + if SIZE_REDUCE: + img = MessageSegment.image( + 'file:///' + await cls._comp_setu(pic), proxy=False) + else: + img = MessageSegment.image(pic, proxy=False) + + msg = ( + f"Pid: {pid}\n" + f"Title: {title}\n" + f"{img}" + ) + return msg + + +class SetuData: + SETU_DATA = SETU_DIR / 'setu.db' + + @classmethod + async def _check_database(cls) -> bool: + if not cls.SETU_DATA.exists(): + log.warning(f'未发现数据库\n-> {cls.SETU_DATA}\n将开始创建') + async with aiosqlite.connect(cls.SETU_DATA) as db: + cur = await db.cursor() + await cur.execute( + """ + CREATE TABLE setu( + pid PID, title TITLE, tags TAGS, + user_id USER_ID, user_name USER_NAME, + user_account USER_ACCOUNT, url URL, + UNIQUE( + pid, title, tags, user_id, + user_name, user_account, url + ) + ); + """ + ) + await db.commit() + log.warning(f'...创建数据库\n-> {cls.SETU_DATA}\n完成!') + return True + return True + + @classmethod + async def add_data(cls, d: dict) -> None: + data = ( + d['pid'], d['title'], d['tags'], d['user_id'], + d['user_name'], d['user_account'], d['url'] + ) + + check = await cls._check_database() + if check: + async with aiosqlite.connect(cls.SETU_DATA) as db: + await db.execute( + """ + INSERT INTO setu( + pid, title, tags, user_id, + user_name, user_account, url + ) VALUES( + ?, ?, ?, ?, ?, ?, ? + ); + """, + data + ) + await db.commit() + + @classmethod + async def del_data(cls, pid: int) -> None: + if not isinstance(pid, int): # 防注入 + raise ValueError('Please provide int.') + + check = await cls._check_database() + if check: + async with aiosqlite.connect(cls.SETU_DATA) as db: + await db.execute(f"DELETE FROM setu WHERE pid = {str(pid)};") + await db.commit() + + @classmethod + async def count(cls): + check = await cls._check_database() + if check: + async with aiosqlite.connect(cls.SETU_DATA) as db: + async with db.execute("SELECT * FROM setu") as cursor: + return len(await cursor.fetchall()) # type: ignore + + @classmethod + async def get_setu(cls): + check = await cls._check_database() + if check: + async with aiosqlite.connect(cls.SETU_DATA) as db: + async with db.execute("SELECT * FROM setu ORDER BY RANDOM() limit 1;") as cursor: + return await cursor.fetchall() diff --git a/ATRI/plugins/setu/modules/main_setu.py b/ATRI/plugins/setu/modules/main_setu.py new file mode 100644 index 0000000..649cd6a --- /dev/null +++ b/ATRI/plugins/setu/modules/main_setu.py @@ -0,0 +1,193 @@ +import re +import json +from random import choice, random + +from nonebot.permission import SUPERUSER +from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp.message import Message + +from ATRI.service import Service as sv +from ATRI.rule import is_in_service +from ATRI.utils.request import get_bytes, post_bytes +from ATRI.utils.limit import is_too_exciting +from ATRI.config import Setu, BotSelfConfig +from ATRI.exceptions import RequestError + +from .data_source import Hso, SIZE_REDUCE, SetuData + + +LOLICON_URL: str = "https://api.lolicon.app/setu/" +PIXIV_URL:str = "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word=" +R18_ENABLED: int = 0 +USE_LOCAL_DATA: bool = False +MIX_LOCAL_DATA: bool = False + + +setu = sv.on_regex(r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]", + rule=is_in_service('setu')) + +async def _setu(bot: Bot, event: MessageEvent) -> None: + user = event.user_id + check = is_too_exciting(user, 3, hours=1) + if not check: + return + + await bot.send(event, "别急,在找了!") + params = { + "apikey": Setu.key, + "r18": str(R18_ENABLED), + "size1200": "true" + } + try: + data = json.loads(await post_bytes(LOLICON_URL, params))['data'][0] + except RequestError: + raise RequestError('Request failed!') + + check = await Hso.nsfw_check(data['url']) + score = "{:.2%}".format(check, 4) + + if not MIX_LOCAL_DATA: + if USE_LOCAL_DATA: + data = (await SetuData.get_setu())[0] # type: ignore + data = { + "pid": data[0], + "title": data[1], + "url": data[6] + } + if random() <= 0.1: + await bot.send(event, '我找到图了,但我发给主人了❤') + msg = await Hso.setu(data) + f"\n由用户({user})提供" + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=msg) + else: + await setu.finish(Message(await Hso.setu(data))) + else: + if check >= 0.9: + if random() <= 0.2: + repo = ( + "我找到图了,但我发给主人了❤\n" + f"涩值:{score}" + ) + await bot.send(event, repo) + msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=msg) + else: + await setu.finish(Message(await Hso.setu(data))) + else: + if random() <= 0.1: + await bot.send(event, '我找到图了,但我发给主人了❤') + msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=msg) + else: + await setu.finish(Message(await Hso.setu(data))) + else: + if random() <= 0.5: + if random() <= 0.1: + await bot.send(event, '我找到图了,但我发给主人了❤') + msg = await Hso.setu(data) + f"\n由用户({user})提供" + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=msg) + else: + await setu.finish(Message(await Hso.setu(data))) + else: + data = (await SetuData.get_setu())[0] # type: ignore + data = { + "pid": data[0], + "title": data[1], + "url": data[6] + } + if random() <= 0.1: + await bot.send(event, '我找到图了,但我发给主人了❤') + msg = await Hso.setu(data) + f"\n由用户({user})提供" + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=msg) + else: + await setu.finish(Message(await Hso.setu(data))) + + +key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service('setu')) + +@key_setu.handle() +async def _key_setu(bot: Bot, event: MessageEvent) -> None: + user = event.user_id + check = is_too_exciting(user, 10, hours=1) + if not check: + await setu.finish('休息一下吧❤') + + await bot.send(event, "别急,在找了!") + msg = str(event.message).strip() + tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0] + URL = PIXIV_URL + tag + + try: + data = json.loads(await get_bytes(URL))['illusts'] + except RequestError: + raise RequestError('Request msg failed!') + + if random() <= 0.1: + await bot.send(event, '我找到图了,但我发给主人了❤') + msg = await Hso.acc_setu(data) + f"\n由用户({user})提供" + for sup in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=msg) + else: + await setu.finish(Message(await Hso.acc_setu(data))) + + +setu_config = sv.on_command(cmd='涩图设置', permission=SUPERUSER) + +@setu_config.handle() +async def _setu_config(bot: Bot, event: MessageEvent) -> None: + global R18_ENABLED, SIZE_REDUCE, USE_LOCAL_DATA, MIX_LOCAL_DATA + msg = str(event.message).split(' ') + if msg[0] == "": + repo = ( + "可用设置如下:\n" + "启用/禁用r18\n" + "启用/禁用压缩\n" + "启用/禁用本地涩图\n" + "启用/禁用混合本地涩图" + ) + await setu_config.finish(repo) + elif msg[0] == "启用r18": + R18_ENABLED = 1 + await setu_config.finish('已启用r18') + elif msg[0] == "禁用r18": + R18_ENABLED = 0 + await setu_config.finish('已禁用r18') + elif msg[0] == "启用压缩": + SIZE_REDUCE = True + await setu_config.finish('已启用图片压缩') + elif msg[0] == "禁用压缩": + SIZE_REDUCE = False + await setu_config.finish('已禁用图片压缩') + elif msg[0] == "启用本地涩图": + USE_LOCAL_DATA = True + await setu_config.finish('已启用本地涩图') + elif msg[0] == "禁用本地涩图": + USE_LOCAL_DATA = False + await setu_config.finish('已禁用本地涩图') + elif msg[0] == "启用混合本地涩图": + MIX_LOCAL_DATA = True + await setu_config.finish('启用混合本地涩图') + elif msg[0] == "禁用混合本地涩图": + MIX_LOCAL_DATA = False + await setu_config.finish('禁用混合本地涩图') + else: + await setu_config.finish('阿!请检查拼写') + + +not_get_se = sv.on_command("不够涩") + +@not_get_se.handle() +async def _not_se(bot: Bot, event: MessageEvent) -> None: + user = event.user_id + check = is_too_exciting(user, 1, 120) + if check: + msg = choice([ + "那你来发", + "那你来发❤" + ]) + await not_get_se.finish(msg) diff --git a/ATRI/plugins/setu/modules/scheduler.py b/ATRI/plugins/setu/modules/scheduler.py new file mode 100644 index 0000000..3030881 --- /dev/null +++ b/ATRI/plugins/setu/modules/scheduler.py @@ -0,0 +1,19 @@ +import shutil +from ATRI.log import logger as log +from ATRI.utils.apscheduler import scheduler + +from .data_source import TEMP_DIR + + [email protected]_job( + 'interval', + days=7, + misfire_grace_time=10 +) +async def clear_temp(): + log.info('正在清除涩图缓存') + try: + shutil.rmtree(TEMP_DIR) + log.info('清除缓存成功!') + except Exception: + log.warn('清除图片缓存失败!') diff --git a/ATRI/plugins/setu/modules/store.py b/ATRI/plugins/setu/modules/store.py new file mode 100644 index 0000000..6f383f4 --- /dev/null +++ b/ATRI/plugins/setu/modules/store.py @@ -0,0 +1,138 @@ +import json +from random import choice + +from nonebot.typing import T_State +from nonebot.permission import SUPERUSER +from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp.message import Message, MessageSegment + +from ATRI.service import Service as sv +from ATRI.utils.request import get_bytes +from ATRI.exceptions import RequestError + +from .data_source import SetuData + + +API_URL: str = "https://api.kyomotoi.moe/api/pixiv/illust?id=" + + +__doc__ = """ +为本地添加涩图! +权限组:维护者 +用法: + 添加涩图 (pid) +补充: + pid: Pixiv 作品id +""" + + +add_setu = sv.on_command( + cmd="添加涩图", + docs=__doc__, + permission=SUPERUSER +) + +@add_setu.args_parser # type: ignore +async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + cancel = ['算了', '罢了'] + if msg in cancel: + await add_setu.finish('好吧...') + if not msg: + await add_setu.reject('涩图(pid)速发!') + else: + state['setu_add'] = msg + +@add_setu.handle() +async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + if msg: + state['setu_add'] = msg + +@add_setu.got('setu_add', prompt='涩图(pid)速发!') +async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: + pid = state['setu_add'] + + URL = API_URL + pid + try: + data = json.loads(await get_bytes(URL))['illust'] + except RequestError: + raise RequestError('Request failed!') + + try: + pic = data['meta_single_page']['original_image_url'] \ + .replace('pximg.net', 'pixiv.cat') + except Exception: + pic = choice(data['meta_pages'])['image_urls']['original'] \ + .replace('pximg.net', 'pixiv.cat') + + d = { + "pid": pid, + "title": data['title'], + "tags": str(data['tags']), + "user_id": data['user']['id'], + "user_name": data['user']['name'], + "user_account": data['user']['account'], + "url": pic + } + await SetuData.add_data(d) + + show_img = data['image_urls']['medium'].replace('pximg.net', 'pixiv.cat') + msg = ( + "好欸!是新涩图:\n" + f"Pid: {pid}\n" + f"Title: {data['title']}\n" + f"{MessageSegment.image(show_img)}" + ) + await add_setu.finish(Message(msg)) + + +__doc__ = """ +删除涩图! +权限组:维护者 +用法: + 删除涩图 (pid) +补充: + pid: Pixiv 作品id +""" + + +del_setu = sv.on_command( + cmd="删除涩图", + docs=__doc__, + permission=SUPERUSER +) + +@del_setu.args_parser # type: ignore +async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + cancel = ['算了', '罢了'] + if msg in cancel: + await add_setu.finish('好吧...') + if not msg: + await add_setu.reject('涩图(pid)速发!') + else: + state['setu_del'] = msg + +@del_setu.handle() +async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + if msg: + state['setu_del'] = msg + +@del_setu.got('setu_del', prompt='涩图(pid)速发!') +async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: + pid = int(state['setu_del']) + await SetuData.del_data(pid) + await del_setu.finish(f'涩图({pid})已删除...') + + +count_setu = sv.on_command( + cmd="涩图总量", + permission=SUPERUSER +) + +@count_setu.handle() +async def _count_setu(bot: Bot, event: MessageEvent) -> None: + msg = f"咱本地搭载了 {await SetuData.count()} 张涩图!" + await count_setu.finish(msg) diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py index bd5d9c4..efdd412 100644 --- a/ATRI/plugins/status.py +++ b/ATRI/plugins/status.py @@ -1,4 +1,6 @@ +import time import psutil +from datetime import datetime from nonebot.adapters.cqhttp import Bot, MessageEvent from ATRI.log import logger as log @@ -6,7 +8,7 @@ from ATRI.service import Service as sv from ATRI.rule import is_in_service from ATRI.exceptions import GetStatusError from ATRI.utils.apscheduler import scheduler -from ATRI.config import Config +from ATRI.config import BotSelfConfig __doc__ = """ @@ -16,8 +18,10 @@ __doc__ = """ /ping """ -ping = sv.on_command(cmd="/ping", docs="测试机器人", rule=is_in_service("ping")) - +ping = sv.on_command( + cmd="/ping", + docs="测试机器人", + rule=is_in_service('ping')) @ping.handle() async def _ping(bot: Bot, event: MessageEvent) -> None: @@ -31,8 +35,11 @@ __doc__ = """ /status """ -status = sv.on_command(cmd="/status", docs=__doc__, rule=is_in_service("status")) - +status = sv.on_command( + cmd="/status", + docs=__doc__, + rule=is_in_service('status') +) @status.handle() async def _status(bot: Bot, event: MessageEvent) -> None: @@ -40,22 +47,28 @@ async def _status(bot: Bot, event: MessageEvent) -> None: cpu = psutil.cpu_percent(interval=1) mem = psutil.virtual_memory().percent disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore + inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + now = time.time() + boot = psutil.boot_time() + up_time = str( + datetime.utcfromtimestamp(now).replace(microsecond=0) + - datetime.utcfromtimestamp(boot).replace(microsecond=0) + ) except GetStatusError: raise GetStatusError("Failed to get status.") - + msg = "アトリは、高性能ですから!" - - if cpu > 80: # type: ignore + + if cpu > 90: # type: ignore msg = "咱感觉有些头晕..." - if mem > 80: + if mem > 90: msg = "咱感觉有点头晕并且有点累..." - elif mem > 80: + elif mem > 90: msg = "咱感觉有点累..." - elif disk > 80: + elif disk > 90: msg = "咱感觉身体要被塞满了..." - + msg0 = ( "Self status:\n" f"* CPU: {cpu}%\n" @@ -63,36 +76,41 @@ async def _status(bot: Bot, event: MessageEvent) -> None: f"* DISK: {disk}%\n" f"* netSENT: {inteSENT}MB\n" f"* netRECV: {inteRECV}MB\n" + f"* Runtime: {up_time}\n" ) + msg - + await status.finish(msg0) [email protected]_job("interval", minutes=5, misfire_grace_time=10) [email protected]_job( + 'interval', + minutes=5, + misfire_grace_time=10 +) async def _(): log.info("开始自检") try: cpu = psutil.cpu_percent(interval=1) mem = psutil.virtual_memory().percent disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore + inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore except GetStatusError: raise GetStatusError("Failed to get status.") - - msg = "" - if cpu > 80: # type: ignore + + msg = str() + if cpu > 90: # type: ignore msg = "咱感觉有些头晕..." - if mem > 80: + if mem > 90: msg = "咱感觉有点头晕并且有点累..." - elif mem > 80: + elif mem > 90: msg = "咱感觉有点累..." - elif disk > 80: + elif disk > 90: msg = "咱感觉身体要被塞满了..." else: log.info("运作正常") return - + msg0 = ( "Self status:\n" f"* CPU: {cpu}%\n" @@ -101,6 +119,9 @@ async def _(): f"* netSENT: {inteSENT}MB\n" f"* netRECV: {inteRECV}MB\n" ) + msg - - for sup in Config.BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=sup, message=msg0) + + for sup in BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=sup, + message=msg0 + ) diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py index ff28e59..fa59690 100644 --- a/ATRI/plugins/utils/__init__.py +++ b/ATRI/plugins/utils/__init__.py @@ -1,9 +1,12 @@ import re +from random import random + +from nonebot.typing import T_State from nonebot.adapters.cqhttp import Bot, MessageEvent from ATRI.service import Service as sv from ATRI.rule import is_in_service -from .data_source import roll_dice, Encrypt +from .data_source import roll_dice, Encrypt, Yinglish __doc__ = """ @@ -17,24 +20,37 @@ roll一下 /roll 1d10+10d9+4d5+2d3 """ -roll = sv.on_command(cmd="/roll", docs=__doc__, rule=is_in_service("roll")) - +roll = sv.on_command( + cmd="/roll", + docs=__doc__, + rule=is_in_service('roll') +) + [email protected]_parser # type: ignore +async def _load_roll(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + quit_list = ['算了', '罢了', '取消'] + if msg in quit_list: + await roll.finish('好吧...') + if not msg: + await roll.reject('点呢?(1d10+...)') + else: + state['resu'] = msg @roll.handle() -async def _roll(bot: Bot, event: MessageEvent, state: dict) -> None: +async def _roll(bot: Bot, event: MessageEvent, state: T_State) -> None: args = str(event.message).strip() if args: - state["resu"] = args - + state['resu'] = args @roll.got("resu", prompt="roll 参数不能为空~!\ndemo:1d10 或 2d10+2d10") -async def _(bot: Bot, event: MessageEvent, state: dict) -> None: - resu = state["resu"] - match = re.match(r"^([\dd+\s]+?)$", resu) - +async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State) -> None: + resu = state['resu'] + match = re.match(r'^([\dd+\s]+?)$', resu) + if not match: await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10") - + await roll.finish(roll_dice(resu)) @@ -50,19 +66,60 @@ __doc__ = """ /enc e アトリは高性能ですから! """ -encrypt = sv.on_command(cmd="/enc", docs=__doc__, rule=is_in_service("enc")) - +encrypt = sv.on_command( + cmd="/enc", + docs=__doc__, + rule=is_in_service('enc') +) @encrypt.handle() async def _encrypt(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") + msg = str(event.message).split(' ') _type = msg[0] s = msg[1] e = Encrypt() - + if _type == "e": await encrypt.finish(e.encode(s)) elif _type == "d": await encrypt.finish(e.decode(s)) else: - await encrypt.finish("请检查输入~!") + await encrypt.finish('请检查输入~!') + + +__doc__ = """ +涩批一下! +权限组:所有人 +用法: + 涩批一下 (msg) +""" + +sepi = sv.on_command( + cmd="涩批一下", + docs=__doc__, + rule=is_in_service('涩批一下') +) + +async def _load_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + quit_list = ['算了', '罢了', '取消'] + if msg in quit_list: + await sepi.finish('好吧...') + if not msg: + await sepi.reject('话呢?') + else: + state['sepi_msg'] = msg + +async def _sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + if msg: + state['sepi_msg'] = msg + [email protected]('sepi_msg', prompt='话呢?') +async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = state['sepi_msg'] + if len(msg) < 4: + await sepi.finish('这么短?涩不起来!') + await sepi.finish(Yinglish.deal(msg, random()))
\ No newline at end of file diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py index 4ef46a3..98b2b37 100644 --- a/ATRI/plugins/utils/data_source.py +++ b/ATRI/plugins/utils/data_source.py @@ -1,46 +1,47 @@ import re -import random from math import floor -from typing import Union +import jieba.posseg as pseg +from typing import Union, Optional +from random import random, choice, randint def roll_dice(par: str) -> str: result = 0 - proc = "" + proc = '' proc_list = [] p = par.split("+") - + for i in p: args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i) args = list(args[0]) - + args[0] = args[0] or 1 if int(args[0]) >= 5000 or int(args[2]) >= 5000: return "阿...好大......" - + for a in range(1, int(args[0]) + 1): - rd = random.randint(1, int(args[2])) + rd = randint(1, int(args[2])) result = result + rd - + if len(proc_list) <= 10: proc_list.append(rd) - + if len(proc_list) <= 10: proc += "+".join(map(str, proc_list)) elif len(proc_list) > 10: proc += "太长了不展示了就酱w" else: proc += str(result) - + result = f"{par}=({proc})={result}" return result class Encrypt: - cr = "ĀāĂ㥹ÀÁÂÃÄÅ" - cc = "ŢţŤťŦŧṪṫṬṭṮṯṰṱ" - cn = "ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr" - cb = "ĨĩĪīĬĭĮįİı" + cr = 'ĀāĂ㥹ÀÁÂÃÄÅ' + cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ' + cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr' + cb = 'ĨĩĪīĬĭĮįİı' sr = len(cr) sc = len(cc) @@ -55,7 +56,7 @@ class Encrypt: def _encodeByte(self, i) -> Union[str, None]: if i > 0xFF: - raise ValueError("ERROR! at/ri overflow") + raise ValueError('ERROR! at/ri overflow') if i > 0x7F: i = i & 0x7F @@ -65,7 +66,7 @@ class Encrypt: def _encodeShort(self, i) -> str: if i > 0xFFFF: - raise ValueError("ERROR! atri overflow") + raise ValueError('ERROR! atri overflow') reverse = False if i > 0x7FFF: @@ -75,15 +76,17 @@ class Encrypt: char = [ self._div(i, self.scnb), self._div(i % self.scnb, self.snb), - self._div(i % self.snb, self.sb), - i % self.sb, + self._div(i % self.snb, self.sb), i % self.sb + ] + char = [ + self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], + self.cb[char[3]] ] - char = [self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], self.cb[char[3]]] if reverse: return char[2] + char[3] + char[0] + char[1] - return "".join(char) + return ''.join(char) def _decodeByte(self, c) -> int: nb = False @@ -91,11 +94,12 @@ class Encrypt: if idx[0] < 0 or idx[1] < 0: idx = [self.cn.index(c[0]), self.cb.index(c[1])] nb = True - raise ValueError("ERROR! at/ri overflow") + raise ValueError('ERROR! at/ri overflow') - result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1] + result = idx[0] * self.sb + idx[1] \ + if nb else idx[0] * self.sc + idx[1] if result > 0x7F: - raise ValueError("ERROR! at/ri overflow") + raise ValueError('ERROR! at/ri overflow') return result | 0x80 if nb else 0 @@ -106,22 +110,23 @@ class Encrypt: self.cr.index(c[0]), self.cc.index(c[1]), self.cn.index(c[2]), - self.cb.index(c[3]), + self.cb.index(c[3]) ] else: idx = [ self.cr.index(c[2]), self.cc.index(c[3]), self.cn.index(c[0]), - self.cb.index(c[1]), + self.cb.index(c[1]) ] if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0: - raise ValueError("ERROR! not atri") + raise ValueError('ERROR! not atri') - result = idx[0] * self.scnb + idx[1] * self.snb + idx[2] * self.sb + idx[3] + result = idx[0] * self.scnb + idx[1] * self.snb + idx[ + 2] * self.sb + idx[3] if result > 0x7FFF: - raise ValueError("ERROR! atri overflow") + raise ValueError('ERROR! atri overflow') result |= 0x8000 if reverse else 0 return result @@ -134,36 +139,64 @@ class Encrypt: if len(b) & 1 == 1: result.append(self._encodeByte(b[-1])) - return "".join(result) + return ''.join(result) - def encode(self, s: str, encoding: str = "utf-8"): + def encode(self, s: str, encoding: str = 'utf-8'): if not isinstance(s, str): - raise ValueError("Please enter str instead of other") + raise ValueError('Please enter str instead of other') return self._encodeBytes(s.encode(encoding)) def _decodeBytes(self, s: str): if not isinstance(s, str): - raise ValueError("Please enter str instead of other") + raise ValueError('Please enter str instead of other') if len(s) & 1: - raise ValueError("ERROR length") + raise ValueError('ERROR length') result = [] for i in range(0, (len(s) >> 2)): - result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) >> 8])) - result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) & 0xFF])) + result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8])) + result.append(bytes([ + self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF])) if (len(s) & 2) == 2: result.append(bytes([self._decodeByte(s[-2:])])) - return b"".join(result) + return b''.join(result) - def decode(self, s: str, encoding: str = "utf-8") -> str: + def decode(self, s: str, encoding: str = 'utf-8') -> str: if not isinstance(s, str): - raise ValueError("Please enter str instead of other") + raise ValueError('Please enter str instead of other') try: return self._decodeBytes(s).decode(encoding) except UnicodeDecodeError: - raise ValueError("Decoding failed") + raise ValueError('Decoding failed') + + +class Yinglish: + @staticmethod + def _to_ying(x, y, ying) -> str: + if random() > ying: + return x + if x in [',', '。']: + return str(choice(['..', '...', '....', '......'])) + if x in ['!', '!']: + return "❤" + if len(x) > 1 and random() < 0.5: + return str(choice([ + f"{x[0]}..{x}", f"{x[0]}...{x}", + f"{x[0]}....{x}", f"{x[0]}......{x}" + ])) + else: + if y == "n" and random() < 0.5: + x = "〇" * len(x) + return str(choice([ + f"...{x}", f"....{x}", + f".....{x}", f"......{x}" + ])) + + @classmethod + def deal(cls, text, ying: Optional[float] = 0.5) -> str: + return "".join([cls._to_ying(x, y, ying) for x, y in pseg.cut(text)]) diff --git a/ATRI/plugins/wife/__init__.py b/ATRI/plugins/wife/__init__.py new file mode 100644 index 0000000..3f89214 --- /dev/null +++ b/ATRI/plugins/wife/__init__.py @@ -0,0 +1,129 @@ +import re +import asyncio +from random import choice + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import ( + Bot, + MessageEvent, + GroupMessageEvent, + PrivateMessageEvent +) + +from ATRI.service import Service as sv +from ATRI.rule import is_in_service +from ATRI.utils.limit import is_too_exciting + +from .data_source import Tsuma + + +__doc__ = """ +好欸!是老婆! +权限组:所有人 +用法: + 抽老婆 # 获取一位老婆 + 查老婆 # 查询老婆,如+at对象可查询对方 + 我要离婚 # 离婚... +""" + +roll_wife = sv.on_command( + cmd='抽老婆', + docs=__doc__, + rule=is_in_service('抽老婆') +) + +@roll_wife.handle() +async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None: + user = event.user_id + gender = event.sender.sex + group = event.group_id + user_name = await bot.get_group_member_info(group_id=group, + user_id=user) + user_name = user_name['nickname'] + run = is_too_exciting(user, 1, seconds=5) + if not run: + return + + check_repo, if_h = Tsuma.check_tsuma(str(user)) + if if_h: + await roll_wife.finish(check_repo) + + msg = ( + "5秒后咱将随机抽取一位群友成为\n" + f"{user_name} 的老婆!究竟是谁呢~?" + ) + await bot.send(event, msg) + await asyncio.sleep(5) + + async def get_luck_user(): + luck_list = await bot.get_group_member_list(group_id=group) + return choice(luck_list) + + while True: + luck_user = await get_luck_user() + luck_qq = luck_user['user_id'] + if user != luck_qq: + break + + luck_gender = luck_user['sex'] + luck_user = luck_user['nickname'] + d = { + "nickname": user_name, + "gender": gender, + "lassie": { + "nickname": luck_user, + "qq": luck_qq, + "gender": luck_gender + } + } + + if str(luck_qq) == str(event.self_id): + Tsuma.got_tsuma(str(user), d) + msg = "老婆竟是我自己~❤" + else: + msg = Tsuma.got_tsuma(str(user), d) + + await roll_wife.finish(msg) + +@roll_wife.handle() +async def _no_pr(bot: Bot, event: PrivateMessageEvent) -> None: + await roll_wife.finish('对8起...该功能只对群聊开放(') + + +inquire_wife = sv.on_command( + cmd="查老婆", + rule=is_in_service('抽老婆') +) + +@inquire_wife.handle() +async def _inq_wife(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + if msg[0] == "": + user = str(event.user_id) + await inquire_wife.finish(Tsuma.inquire_tsuma(user)) + else: + aim = re.findall(r"qq=(.*?)]", msg[0])[0] + await inquire_wife.finish(Tsuma.inquire_tsuma(aim).replace('你', 'ta')) + + +want_divorce = sv.on_command( + cmd="我要离婚", + rule=is_in_service('抽老婆') +) + +@want_divorce.handle() +async def _want_div(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = str(event.message).strip() + if msg: + state['is_d'] = msg + +@want_divorce.got('is_d', prompt="你确定吗?(是/否)") +async def _deal_div(bot: Bot, event: MessageEvent, state: T_State) -> None: + msg = state['is_d'] + user = str(event.user_id) + name = event.sender.nickname + + if msg in ['是', '确定']: + await want_divorce.finish(Tsuma.divorce(user)) + else: + await want_divorce.finish(f'({name})回心转意了!') diff --git a/ATRI/plugins/wife/data_source.py b/ATRI/plugins/wife/data_source.py new file mode 100644 index 0000000..f01985a --- /dev/null +++ b/ATRI/plugins/wife/data_source.py @@ -0,0 +1,90 @@ +import os +import json +from pathlib import Path + + +WIFE_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'wife' +MERRY_LIST_PATH = WIFE_DIR / 'merry_list.json' +os.makedirs(WIFE_DIR, exist_ok=True) + + +class Tsuma: + @staticmethod + def _load_tsuma() -> dict: + try: + return json.loads(MERRY_LIST_PATH.read_bytes()) + except FileNotFoundError: + with open(MERRY_LIST_PATH, 'w') as r: + r.write(json.dumps({}, indent=4)) + return dict() + + @staticmethod + def _store_tsuma(data: dict) -> None: + with open(MERRY_LIST_PATH, 'w') as r: + r.write(json.dumps(data, indent=4)) + + @classmethod + def check_tsuma(cls, user: str): + data = cls._load_tsuma() + if user in data: + msg = ( + "阿,你已经有老婆惹!" + f"ta是:{data[user]['lassie']['nickname']}" + ) + return msg, True + else: + return "悲——你还没老婆...", False + + @classmethod + def inquire_tsuma(cls, user: str) -> str: + data = cls._load_tsuma() + if user in data: + return f"你的老婆是:{data[user]['lassie']['nickname']} 哦~❤" + else: + return "悲——你还没老婆..." + + @classmethod + def got_tsuma(cls, user: str, d: dict) -> str: + check_repo, if_h = cls.check_tsuma(user) # 防止出现多人同时操作导致 NTR 事件 + if if_h: + return check_repo + else: + data = cls._load_tsuma() + data[user] = { + "nickname": d['nickname'], + "gender": d['gender'], + "lassie": { + "nickname": d['lassie']['nickname'], + "qq": d['lassie']['qq'], + "gender": d['lassie']['gender'] + } + } + cls._store_tsuma(data) + + data[d['lassie']['qq']] = { + "nickname": d['lassie']['nickname'], + "gender": d['lassie']['gender'], + "lassie": { + "nickname": d['nickname'], + "qq": user, + "gender": d['gender'] + } + } + cls._store_tsuma(data) + + msg = ( + f"> {d['lassie']['nickname']}({d['lassie']['qq']})\n" + f"恭喜成为 {d['nickname']} 的老婆~⭐" + ) + return msg + + @classmethod + def divorce(cls, user: str) -> str: + data = cls._load_tsuma() + if not user in data: + return "悲——你还没老婆。。" + + msg = f"悲——,({data[user]['nickname']})抛弃了({data[user]['lassie']['nickname']})" + del data[user] + cls._store_tsuma(data) + return msg |