diff options
46 files changed, 1608 insertions, 447 deletions
@@ -137,4 +137,7 @@ dmypy.json # pytype static type analyzer .pytype/ -# End of https://www.toptal.com/developers/gitignore/api/python
\ No newline at end of file +# End of https://www.toptal.com/developers/gitignore/api/python + +/accounts/* +/data/* diff --git a/ATRI/__init__.py b/ATRI/__init__.py index 5e9db56..5346a7a 100644 --- a/ATRI/__init__.py +++ b/ATRI/__init__.py @@ -1,13 +1,11 @@ from time import sleep import nonebot -from nonebot.adapters.cqhttp import Bot as ATRIBot +from nonebot.adapters.onebot.v11 import Adapter from .config import RUNTIME_CONFIG -from .log import logger - -__version__ = "YHN-001-A04" +__version__ = "YHN-001-A05" def asgi(): @@ -20,12 +18,11 @@ def driver(): def init(): nonebot.init(**RUNTIME_CONFIG) - driver().register_adapter("cqhttp", ATRIBot) + driver().register_adapter(Adapter) nonebot.load_plugins("ATRI/plugins") - if RUNTIME_CONFIG["debug"]: - nonebot.load_plugin("nonebot_plugin_test") + nonebot.load_plugin("nonebot_plugin_gocqhttp") sleep(3) -def run(app): - nonebot.run(app=app) +def run(): + nonebot.run() diff --git a/ATRI/config.py b/ATRI/config.py index d1a9bfd..b911a83 100644 --- a/ATRI/config.py +++ b/ATRI/config.py @@ -30,12 +30,26 @@ class BotSelfConfig: proxy: str = config.get("proxy", None) +class InlineGoCQHTTP: + config: dict = config["InlineGoCQHTTP"] + + accounts: list = config.get("accounts", []) + download_version: str = str(config.get("download_version", "latest")) + + class SauceNAO: config: dict = config["SauceNAO"] key: str = config.get("key", "") +class Setu: + config: dict = config["Setu"] + + reverse_proxy: bool = bool(config.get("reverse_proxy", False)) + reverse_proxy_domain: str = config.get("reverse_proxy_domain", str()) + + RUNTIME_CONFIG = { "host": BotSelfConfig.host, "port": BotSelfConfig.port, @@ -45,4 +59,7 @@ RUNTIME_CONFIG = { "command_start": BotSelfConfig.command_start, "command_sep": BotSelfConfig.command_sep, "session_expire_timeout": BotSelfConfig.session_expire_timeout, + + "gocq_accounts": InlineGoCQHTTP.accounts, + "gocq_version": InlineGoCQHTTP.download_version, } diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py index c95867d..8ce6a85 100644 --- a/ATRI/exceptions.py +++ b/ATRI/exceptions.py @@ -8,9 +8,7 @@ from typing import Optional from traceback import format_exc from pydantic.main import BaseModel -from nonebot.adapters.cqhttp import Bot, Event -from nonebot.matcher import Matcher -from nonebot.typing import T_State +from nonebot.adapters.onebot.v11 import Bot from nonebot.message import run_postprocessor from .log import logger @@ -92,13 +90,10 @@ class ServiceRegisterError(BaseBotException): prompt = "服务注册错误" -@run_postprocessor # type: ignore +@run_postprocessor async def _track_error( - matcher: Matcher, exception: Optional[Exception], bot: Bot, - event: Event, - state: T_State, ) -> None: if not exception: return diff --git a/ATRI/log.py b/ATRI/log.py index cbdc041..39a4864 100644 --- a/ATRI/log.py +++ b/ATRI/log.py @@ -27,6 +27,9 @@ class LoguruNameDealer: if "nonebot.plugin.manager" in log_handle: plugin_name = log_handle.split(".")[-1] record["name"] = f"plugin.{plugin_name}" + elif "nonebot_plugin_gocqhttp" in log_handle: + plugin_name = log_handle.split("_")[-1] + record["name"] = "gocqhttp" else: record["name"] = record["name"].split(".")[0] diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py index 5b4e212..7436aa2 100644 --- a/ATRI/plugins/anime_search.py +++ b/ATRI/plugins/anime_search.py @@ -1,9 +1,9 @@ import re from random import choice -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment +from nonebot.matcher import Matcher +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message, MessageSegment from ATRI.service import Service from ATRI.rule import is_in_service @@ -80,34 +80,25 @@ class Anime(Service): anime_search = Anime().on_command("以图搜番", "发送一张图以搜索可能的番剧") -@anime_search.args_parser # type: ignore -async def _get_anime(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "不搜了", "取消"] - if msg in quit_list: - await anime_search.finish("好吧...") - if not msg: - await anime_search.reject("图呢?") - else: - state["anime"] = msg - - @anime_search.handle() -async def _ready_sear(bot: Bot, event: MessageEvent, state: T_State): +async def _ready_sear( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): user_id = event.get_user_id() if not _anime_flmt.check(user_id): await anime_search.finish(_anime_flmt_notice) - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["anime"] = msg + matcher.set_arg("anime_pic", args) -@anime_search.got("anime", "图呢?") -async def _deal_sear(bot: Bot, event: MessageEvent, state: T_State): +@anime_search.got("anime_pic", "图呢?") +async def _deal_sear( + bot: Bot, event: MessageEvent, pic: str = ArgPlainText("anime_pic") +): user_id = event.get_user_id() - msg = state["anime"] - img = re.findall(r"url=(.*?)]", msg) + img = re.findall(r"url=(.*?)]", pic) if not img: await anime_search.reject("请发送图片而不是其它东西!!") diff --git a/ATRI/plugins/broadcast.py b/ATRI/plugins/broadcast.py new file mode 100644 index 0000000..0948e31 --- /dev/null +++ b/ATRI/plugins/broadcast.py @@ -0,0 +1,134 @@ +import os +import json +import random +import asyncio +from pathlib import Path + +from nonebot.matcher import Matcher +from nonebot.permission import SUPERUSER +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN +from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent + + +from ATRI.service import Service +from ATRI.rule import to_bot + + +BC_PATH = Path(".") / "data" / "database" / "broadcast" +os.makedirs(BC_PATH, exist_ok=True) + +_BROADCAST_BACK = """ +广播报告: +信息:{msg} +预计推送个数(群):{len_g} +成功:{su_g} +失败:{fl_g} +失败列表:{f_g} +""".strip() + + +class BroadCast(Service): + def __init__(self): + Service.__init__(self, "广播", "向bot所在的所有群发送信息", True, to_bot()) + + @staticmethod + def load_rej_list() -> list: + data = list() + path = BC_PATH / "rej_list.json" + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data)) + return data + + return json.loads(path.read_bytes()) + + @classmethod + def store_rej_list(cls, data: list): + path = BC_PATH/ "rej_list.json" + if not path.is_file(): + cls.load_rej_list() + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data)) + + +caster = BroadCast().on_command("广播", "向bot所在的所有群发送信息,有防寄延迟", aliases={"/bc","bc"}, permission=SUPERUSER) + + +async def _(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("bc_msg", args) + + [email protected]("bc_msg", "想要咱群发什么呢?") +async def _(bot: Bot, event: MessageEvent, s_msg: str = ArgPlainText("bc_msg")): + w_group = await bot.get_group_list() + + await bot.send(event, "正在推送...(每个群延迟1~3s)") + + w_msg = f" 来自维护者的信息:\n{s_msg}" + + su_g = list() + fl_g = list() + for i in w_group: + group_id = i["group_id"] + try: + await bot.send_group_msg(group_id=group_id, message=w_msg) + su_g.append(group_id) + except: + fl_g.append(group_id) + + await asyncio.sleep(random.randint(2, 3)) + + repo_msg = _BROADCAST_BACK.format( + msg=s_msg, + len_g=len(w_group), + su_g=su_g, + fl_g=fl_g, + f_g="、".join(map(str, fl_g)) + ) + await caster.finish(repo_msg) + + +rej_broadcast = BroadCast().on_command("拒绝广播", "拒绝来自开发者的广播推送", permission=GROUP_OWNER | GROUP_ADMIN) + + +@rej_broadcast.handle() +async def _(bot: Bot, event: GroupMessageEvent): + group_id = str(event.group_id) + + rej_g = BroadCast().load_rej_list() + if group_id in rej_g: + await rej_broadcast.finish("本群已在推送黑名单内辣!") + else: + rej_g.append(group_id) + BroadCast().store_rej_list(rej_g) + await rej_broadcast.finish("完成~!已将本群列入推送黑名单") + +@rej_broadcast.handle() +async def _(event: PrivateMessageEvent): + await rej_broadcast.finish("该功能仅在群聊中触发...") + + +acc_broadcast = BroadCast().on_command("接受广播", "接受来自开发者的广播推送", permission=GROUP_OWNER | GROUP_ADMIN) + + +@acc_broadcast.handle() +async def _(bot: Bot, event: GroupMessageEvent): + group_id = str(event.group_id) + + rej_g = BroadCast().load_rej_list() + if group_id in rej_g: + rej_g.remove(group_id) + BroadCast().store_rej_list(rej_g) + await rej_broadcast.finish("已将本群移除推送黑名单!") + else: + await rej_broadcast.finish("本群不在推送黑名单里呢...") + +@acc_broadcast.handle() +async def _(event: PrivateMessageEvent): + await rej_broadcast.finish("该功能仅在群聊中触发...") diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py index f401f3f..217c71e 100644 --- a/ATRI/plugins/chat/__init__.py +++ b/ATRI/plugins/chat/__init__.py @@ -1,9 +1,9 @@ from random import choice -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.matcher import Matcher +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message -from ATRI.utils import CoolqCodeChecker from ATRI.utils.limit import FreqLimiter from ATRI.utils.apscheduler import scheduler from .data_source import Chat @@ -17,7 +17,6 @@ chat = Chat().on_message("文爱", "闲聊(文爱") @chat.handle() async def _chat(bot: Bot, event: MessageEvent): - print(1) user_id = event.get_user_id() if not _chat_flmt.check(user_id): await chat.finish(_chat_flmt_notice) @@ -34,33 +33,20 @@ async def _chat(bot: Bot, event: MessageEvent): my_name_is = Chat().on_command("叫我", "更改闲聊(文爱)时的称呼", aliases={"我是"}, priority=1) -@my_name_is.args_parser # type: ignore -async def _get_name(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await my_name_is.finish("好吧...") - if not msg: - await my_name_is.reject("欧尼酱想让咱如何称呼呢!0w0") - else: - state["name"] = msg - - @my_name_is.handle() -async def _name(bot: Bot, event: MessageEvent, state: T_State): +async def _name(matcher: Matcher, event: MessageEvent, args: Message = CommandArg()): user_id = event.get_user_id() if not _chat_flmt.check(user_id): await my_name_is.finish(_chat_flmt_notice) - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["name"] = msg + matcher.set_arg("name", args) @my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0") -async def _deal_name(bot: Bot, event: MessageEvent, state: T_State): +async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")): user_id = event.get_user_id() - new_name = state["name"] repo = choice( [ f"好~w 那咱以后就称呼你为{new_name}!", @@ -77,37 +63,21 @@ async def _deal_name(bot: Bot, event: MessageEvent, state: T_State): say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1) [email protected]_parser # type: ignore -async def _get_say(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了"] - if msg in quit_list: - await say.finish("好吧...") - if not msg: - await say.reject("阿!要咱说啥呢...") - else: - state["say"] = msg - - @say.handle() -async def _ready_say(bot: Bot, event: MessageEvent, state: T_State): +async def _ready_say( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): user_id = event.get_user_id() if not _chat_flmt.check(user_id): await say.finish(_chat_flmt_notice) - msg = str(event.message) + msg = args.extract_plain_text() if msg: - state["say"] = msg - + matcher.set_arg("say", args) [email protected]("say") -async def _deal_say(bot: Bot, event: MessageEvent, state: T_State): - msg = state["say"] - check = CoolqCodeChecker(msg).check - if not check: - repo = choice(["不要...", "这个咱不想复读!", "不可以", "不好!"]) - await say.finish(repo) [email protected]("say", "想要咱复读啥呢...") +async def _deal_say(event: MessageEvent, msg: str = ArgPlainText("say")): user_id = event.get_user_id() _chat_flmt.start_cd(user_id) await say.finish(msg) diff --git a/ATRI/plugins/code_runner/__init__.py b/ATRI/plugins/code_runner/__init__.py index 3f5697b..334c09a 100644 --- a/ATRI/plugins/code_runner/__init__.py +++ b/ATRI/plugins/code_runner/__init__.py @@ -1,7 +1,6 @@ from random import choice -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment +from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment, unescape from ATRI.utils.limit import FreqLimiter from .data_source import CodeRunner @@ -15,7 +14,7 @@ code_runner = CodeRunner().on_command("/code", "在线运行一段代码,帮� @code_runner.handle() -async def _code_runner(bot: Bot, event: MessageEvent): +async def _code_runner(event: MessageEvent): user_id = event.get_user_id() if not _flmt.check(user_id): await code_runner.finish(_flmt_notice) @@ -23,14 +22,14 @@ async def _code_runner(bot: Bot, event: MessageEvent): msg = str(event.get_message()) args = msg.split("\n") - if not args: + if not args[0]: content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!" elif args[0] == "help": content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().help() elif args[0] == "list": content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().list_supp_lang() else: - content = MessageSegment.at(user_id) + await CodeRunner().runner(msg) + content = MessageSegment.at(user_id) + await CodeRunner().runner(unescape(msg)) _flmt.start_cd(user_id) await code_runner.finish(Message(content)) diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index fb493a9..6d981aa 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -8,11 +8,10 @@ from random import choice, randint from pathlib import Path import nonebot -from nonebot.typing import T_State -from nonebot.matcher import Matcher +from nonebot.permission import SUPERUSER from nonebot.message import run_preprocessor from nonebot.exception import IgnoredException -from nonebot.adapters.cqhttp import ( +from nonebot.adapters.onebot.v11 import ( Bot, MessageEvent, GroupMessageEvent, @@ -24,13 +23,15 @@ from nonebot.adapters.cqhttp import ( GroupBanNoticeEvent, GroupRecallNoticeEvent, FriendRecallNoticeEvent, + MessageSegment, + Message ) import ATRI from ATRI.service import Service from ATRI.log import logger as log from ATRI.config import BotSelfConfig -from ATRI.utils import CoolqCodeChecker +from ATRI.utils import MessageChecker from ATRI.utils.apscheduler import scheduler @@ -57,10 +58,8 @@ async def shutdown(): log.info("Thanks for using.") -@run_preprocessor # type: ignore -async def _check_block( - matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State -) -> None: +@run_preprocessor +async def _check_block(event: MessageEvent): user_file = "block_user.json" path = MANEGE_DIR / user_file if not path.is_file(): @@ -261,6 +260,9 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent): await bot.send_private_msg(user_id=int(superuser), message=msg) +_acc_recall = True + + recall_event = Essential().on_notice("撤回事件", "撤回事件检测") @@ -269,6 +271,9 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent): if event.is_tome(): return + if not _acc_recall: + return + try: repo = await bot.get_msg(message_id=event.message_id) except BaseException: @@ -276,14 +281,13 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent): user = event.user_id group = event.group_id - repo = str(repo["message"]) - check = CoolqCodeChecker(repo).check - if not check: - repo = repo.replace("CQ", "QC") + repo: dict = repo["message"] + + m = recall_msg_dealer(repo) - msg = "主人,咱拿到了一条撤回信息!\n" f"{user}@[群:{group}]\n" "撤回了\n" f"{repo}" + msg = f"主人,咱拿到了一条撤回信息!\n{user}@[群:{group}]\n撤回了\n{m}" for superuser in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=Message(msg)) @recall_event.handle() @@ -291,20 +295,40 @@ async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent): if event.is_tome(): return + if not _acc_recall: + return + try: repo = await bot.get_msg(message_id=event.message_id) except BaseException: return - + user = event.user_id - repo = str(repo["message"]) - check = CoolqCodeChecker(repo).check - if not check: - repo = repo.replace("CQ", "QC") + repo: dict = repo["message"] + + m = recall_msg_dealer(repo) - msg = "主人,咱拿到了一条撤回信息!\n" f"{user}@[私聊]" "撤回了\n" f"{repo}" + msg = f"主人,咱拿到了一条撤回信息!\n{user}@[私聊]撤回了\n{m}" for superuser in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=Message(msg)) + + +rej_recall = Essential().on_command("拒绝撤回", "拒绝撤回信息", permission=SUPERUSER) + +@rej_recall.handle() +async def _(): + global _acc_recall + _acc_recall = False + await rej_recall.finish("已拒绝撤回信息...") + + +acc_recall = Essential().on_command("接受撤回", "接受撤回信息", permission=SUPERUSER) + +@acc_recall.handle() +async def _(): + global _acc_recall + _acc_recall = True + await acc_recall.finish("现在可以接受撤回信息啦!") @scheduler.scheduled_job("interval", name="清除缓存", minutes=30, misfire_grace_time=5) @@ -314,3 +338,27 @@ async def _clear_cache(): os.makedirs(TEMP_PATH, exist_ok=True) except Exception: log.warning("清除缓存失败,请手动清除:data/temp") + + +def recall_msg_dealer(msg: dict) -> str: + temp_m = list() + + for i in msg: + _type = i["type"] + _data = i["data"] + if _type == "text": + temp_m.append(_data["text"]) + elif _type == "image": + url = _data["url"] + check = MessageChecker(url).check_image_url + if check: + temp_m.append(MessageSegment.image(url)) + else: + temp_m.append(f"[该图片可能包含非法内容,源url:{url}]") + elif _type == "face": + temp_m.append(MessageSegment.face(_data["id"])) + else: + temp_m.append(f"[未知类型信息:{_data}]") + + repo = str().join(map(str, temp_m)) + return repo diff --git a/ATRI/plugins/funny/__init__.py b/ATRI/plugins/funny/__init__.py index ea60375..8173f3b 100644 --- a/ATRI/plugins/funny/__init__.py +++ b/ATRI/plugins/funny/__init__.py @@ -1,8 +1,8 @@ from random import choice, randint -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent -from nonebot.adapters.cqhttp.message import Message +from nonebot.matcher import Matcher +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import Bot, MessageEvent, GroupMessageEvent, Message from ATRI.utils.limit import FreqLimiter, DailyLimiter from .data_source import Funny @@ -39,38 +39,36 @@ _fake_flmt = FreqLimiter(60) _fake_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) -@fake_msg.args_parser # type: ignore -async def _perp_fake(bot: Bot, event: GroupMessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了"] - if msg in quit_list: - await fake_msg.finish("好吧...") - if not msg: - await fake_msg.reject("内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开") - else: - state["content"] = msg - - @fake_msg.handle() -async def _ready_fake(bot: Bot, event: GroupMessageEvent, state: T_State): +async def _ready_fake( + matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() +): user_id = event.get_user_id() if not _fake_daliy_max.check(user_id): await fake_msg.finish(_fake_max_notice) if not _fake_flmt.check(user_id): await fake_msg.finish(_fake_flmt_notice) - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["content"] = msg + matcher.set_arg("content", args) @fake_msg.got("content", "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开") -async def _deal_fake(bot: Bot, event: GroupMessageEvent, state: T_State): - content = state["content"] +async def _deal_fake( + bot: Bot, event: GroupMessageEvent, content: str = ArgPlainText("content") +): group_id = event.group_id user_id = event.get_user_id() - node = Funny().fake_msg(content) - await bot.send_group_forward_msg(group_id=group_id, messages=node) + try: + node = Funny().fake_msg(content) + except Exception: + await fake_msg.finish("内容格式错误,请检查(") + + try: + await bot.send_group_forward_msg(group_id=group_id, messages=node) + except Exception: + await fake_msg.finish("构造失败惹...可能是被制裁了(") _fake_flmt.start_cd(user_id) _fake_daliy_max.increase(user_id) diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py index a1fce64..8edc88a 100644 --- a/ATRI/plugins/funny/data_source.py +++ b/ATRI/plugins/funny/data_source.py @@ -3,7 +3,7 @@ import os from pathlib import Path from random import choice, randint -from nonebot.adapters.cqhttp.utils import unescape +from nonebot.adapters.onebot.v11 import unescape from ATRI.service import Service from ATRI.log import logger @@ -77,9 +77,9 @@ class Funny(Service): EAT_URL = "https://wtf.hiigara.net/api/run/" params = {"event": "ManualRun"} pattern_0 = r"大?[今明后]天(.*?)吃[什啥]么?" - pattern_1 = r"(今|明|后|大后)天" + pattern_1 = r"[今|明|后|大后]天" arg = re.findall(pattern_0, msg)[0] - day = re.match(pattern_1, msg).group(0) # type: ignore + day = re.findall(pattern_1, msg)[0] if arg == "中午": a = f"LdS4K6/{randint(0, 1145141919810)}" diff --git a/ATRI/plugins/help/__init__.py b/ATRI/plugins/help/__init__.py index 1d1102e..a8465bc 100644 --- a/ATRI/plugins/help/__init__.py +++ b/ATRI/plugins/help/__init__.py @@ -1,5 +1,4 @@ -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.onebot.v11 import MessageEvent from ATRI.rule import to_bot from .data_source import Helper @@ -9,7 +8,7 @@ main_help = Helper().on_command("菜单", "获取食用bot的方法", rule=to_bo @main_help.handle() -async def _main_help(bot: Bot, event: MessageEvent): +async def _main_help(): repo = Helper().menu() await main_help.finish(repo) @@ -18,7 +17,7 @@ about_me = Helper().on_command("关于", "获取关于bot的信息", rule=to_bot @about_me.handle() -async def _about_me(bot: Bot, event: MessageEvent): +async def _about_me(): repo = Helper().about() await about_me.finish(repo) @@ -27,7 +26,7 @@ service_list = Helper().on_command("服务列表", "查看所有可用服务", r @service_list.handle() -async def _service_list(bot: Bot, event: MessageEvent): +async def _service_list(): repo = Helper().service_list() await service_list.finish(repo) @@ -36,7 +35,7 @@ service_info = Helper().on_command("帮助", "获取服务详细帮助", aliases @service_info.handle() -async def _ready_service_info(bot: Bot, event: MessageEvent, state: T_State): +async def _ready_service_info(event: MessageEvent): msg = str(event.message).split(" ") service = msg[0] try: diff --git a/ATRI/plugins/help/data_source.py b/ATRI/plugins/help/data_source.py index 7b2f6f8..cfa0c62 100644 --- a/ATRI/plugins/help/data_source.py +++ b/ATRI/plugins/help/data_source.py @@ -34,7 +34,7 @@ class Helper(Service): "关于 -查看bot基本信息\n" "服务列表 -以查看所有可用服务\n" "帮助 [服务] -以查看对应服务帮助\n" - "Tip: 均需要at触发。菜单 以打开此页面" + "Tip: 均需要at触发。@bot 菜单 以打开此页面" ) @staticmethod @@ -59,10 +59,8 @@ class Helper(Service): service = i.replace(".json", "") temp_list.append(service) - msg0 = "咱搭载了以下服务~\n" services = "、".join(map(str, temp_list)) - msg0 = msg0 + services - repo = msg0 + "\n@ 帮助 [服务] -以查看对应服务帮助" + repo = f"咱搭载了以下服务~\n{services}\n@bot 帮助 [服务] -以查看对应服务帮助" return repo @staticmethod diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py index 77692fd..62ed378 100644 --- a/ATRI/plugins/manage/__init__.py +++ b/ATRI/plugins/manage/__init__.py @@ -1,25 +1,26 @@ import re -from nonebot.typing import T_State from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent -from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN +from nonebot.matcher import Matcher +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN from .data_source import Manage + block_user = Manage().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER) @block_user.handle() -async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_block_user(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["block_user"] = msg + matcher.set_arg("block_user", args) @block_user.got("block_user", "哪位?GKD!") -async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State): - user_id = state["block_user"] +async def _deal_block_user(user_id: str = ArgPlainText("block_user")): quit_list = ["算了", "罢了"] if user_id in quit_list: await block_user.finish("...看来有人逃过一劫呢") @@ -35,15 +36,14 @@ unblock_user = Manage().on_command("解封用户", "对目标用户进行解封" @unblock_user.handle() -async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_unblock_user(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["unblock_user"] = msg + matcher.set_arg("unblock_user", args) @unblock_user.got("unblock_user", "哪位?GKD!") -async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State): - user_id = state["unblock_user"] +async def _deal_unblock_user(user_id: str = ArgPlainText("unblock_user")): quit_list = ["算了", "罢了"] if user_id in quit_list: await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") @@ -59,15 +59,14 @@ block_group = Manage().on_command("封禁群", "对目标群进行封禁", permi @block_group.handle() -async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_block_group(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["block_group"] = msg + matcher.set_arg("block_group", args) @block_group.got("block_group", "哪个群?GKD!") -async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State): - group_id = state["block_group"] +async def _deal_block_group(group_id: str = ArgPlainText("block_group")): quit_list = ["算了", "罢了"] if group_id in quit_list: await block_group.finish("...看来有一群逃过一劫呢") @@ -83,15 +82,14 @@ unblock_group = Manage().on_command("解封群", "对目标群进行解封", per @unblock_group.handle() -async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_unblock_group(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["unblock_group"] = msg + matcher.set_arg("unblock_group", args) @unblock_group.got("unblock_group", "哪个群?GKD!") -async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State): - group_id = state["unblock_group"] +async def _deal_unblock_group(group_id: str = ArgPlainText("unblock_group")): quit_list = ["算了", "罢了"] if group_id in quit_list: await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") @@ -107,15 +105,16 @@ global_block_service = Manage().on_command("全局禁用", "全局禁用某服� @global_block_service.handle() -async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_block_service(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["global_block_service"] = msg + matcher.set_arg("global_block_service", args) @global_block_service.got("global_block_service", "阿...是哪个服务呢") -async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State): - block_service = state["global_block_service"] +async def _deal_global_block_service( + block_service: str = ArgPlainText("global_block_service"), +): quit_list = ["算了", "罢了"] if block_service in quit_list: await global_block_service.finish("好吧...") @@ -131,15 +130,18 @@ global_unblock_service = Manage().on_command("全局启用", "全局启用某服 @global_unblock_service.handle() -async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_unblock_service( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): + msg = args.extract_plain_text() if msg: - state["global_unblock_service"] = msg + matcher.set_arg("global_unblock_service", args) @global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") -async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): - unblock_service = state["global_unblock_service"] +async def _deal_global_unblock_service( + unblock_service: str = ArgPlainText("global_unblock_service"), +): quit_list = ["算了", "罢了"] if unblock_service in quit_list: await global_unblock_service.finish("好吧...") @@ -157,7 +159,7 @@ user_block_service = Manage().on_regex( @user_block_service.handle() -async def _user_block_service(bot: Bot, event: MessageEvent): +async def _user_block_service(event: MessageEvent): msg = str(event.message).strip() pattern = r"对用户(.*?)禁用(.*)" reg = re.findall(pattern, msg) @@ -176,7 +178,7 @@ user_unblock_service = Manage().on_regex( @user_unblock_service.handle() -async def _user_unblock_service(bot: Bot, event: MessageEvent): +async def _user_unblock_service(event: MessageEvent): msg = str(event.message).strip() pattern = r"对用户(.*?)启用(.*)" reg = re.findall(pattern, msg) @@ -196,16 +198,17 @@ group_block_service = Manage().on_command( @group_block_service.handle() async def _ready_group_block_service( - bot: Bot, event: GroupMessageEvent, state: T_State + matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() ): msg = str(event.message).strip() if msg: - state["group_block_service"] = msg + matcher.set_arg("group_block_service", args) @group_block_service.got("group_block_service", "阿...是哪个服务呢") -async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): - aim_service = state["group_block_service"] +async def _deal_group_block_service( + event: GroupMessageEvent, aim_service: str = ArgPlainText("group_block_service") +): group_id = str(event.group_id) quit_list = ["算了", "罢了"] if aim_service in quit_list: @@ -224,18 +227,17 @@ group_unblock_service = Manage().on_command( @group_unblock_service.handle() async def _ready_group_unblock_service( - bot: Bot, event: GroupMessageEvent, state: T_State + matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() ): - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["group_unblock_service"] = msg + matcher.set_arg("group_unblock_service", args) @group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") async def _deal_group_unblock_service( - bot: Bot, event: GroupMessageEvent, state: T_State + event: GroupMessageEvent, aim_service: str = ArgPlainText("group_unblock_service") ): - aim_service = state["group_unblock_service"] group_id = str(event.group_id) quit_list = ["算了", "罢了"] if aim_service in quit_list: @@ -251,7 +253,7 @@ get_friend_add_list = Manage().on_command("获取好友申请", "获取好友申 @get_friend_add_list.handle() -async def _get_friend_add_list(bot: Bot, event: MessageEvent): +async def _get_friend_add_list(): data = Manage().load_friend_apply_list() temp_list = list() for i in data: @@ -270,15 +272,18 @@ approve_friend_add = Manage().on_command("同意好友", "同意好友申请", p @approve_friend_add.handle() -async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_approve_friend_add( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): + msg = args.extract_plain_text() if msg: - state["approve_friend_add"] = msg + matcher.set_arg("approve_friend_add", args) @approve_friend_add.got("approve_friend_add", "申请码GKD!") -async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["approve_friend_add"] +async def _deal_approve_friend_add( + bot: Bot, apply_code: str = ArgPlainText("approve_friend_add") +): quit_list = ["算了", "罢了"] if apply_code in quit_list: await approve_friend_add.finish("好吧...") @@ -297,15 +302,18 @@ refuse_friend_add = Manage().on_command("拒绝好友", "拒绝好友申请", pe @refuse_friend_add.handle() -async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_refuse_friend_add( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): + msg = args.extract_plain_text() if msg: - state["refuse_friend_add"] = msg + matcher.set_arg("refuse_friend_add", args) @refuse_friend_add.got("refuse_friend_add", "申请码GKD!") -async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["refuse_friend_add"] +async def _deal_refuse_friend_add( + bot: Bot, apply_code: str = ArgPlainText("refuse_friend_add") +): quit_list = ["算了", "罢了"] if apply_code in quit_list: await refuse_friend_add.finish("好吧...") @@ -324,7 +332,7 @@ get_group_invite_list = Manage().on_command("获取邀请列表", "获取群邀� @get_group_invite_list.handle() -async def _get_group_invite_list(bot: Bot, event: MessageEvent): +async def _get_group_invite_list(): data = Manage().load_invite_apply_list() temp_list = list() for i in data: @@ -343,15 +351,18 @@ approve_group_invite = Manage().on_command("同意邀请", "同意群聊邀请", @approve_group_invite.handle() -async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_approve_group_invite( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): + msg = args.extract_plain_text() if msg: - state["approve_group_invite"] = msg + matcher.set_arg("approve_group_invite", args) @approve_group_invite.got("approve_group_invite", "申请码GKD!") -async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["approve_group_invite"] +async def _deal_approve_group_invite( + bot: Bot, apply_code: str = ArgPlainText("approve_group_invite") +): quit_list = ["算了", "罢了"] if apply_code in quit_list: await approve_group_invite.finish("好吧...") @@ -372,15 +383,18 @@ refuse_group_invite = Manage().on_command("拒绝邀请", "拒绝群聊邀请", @refuse_group_invite.handle() -async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_refuse_group_invite( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): + msg = args.extract_plain_text() if msg: - state["refuse_group_invite"] = msg + matcher.set_arg("refuse_group_invite", args) @refuse_group_invite.got("refuse_group_invite", "申请码GKD!") -async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["refuse_group_invite"] +async def _deal_refuse_group_invite( + bot: Bot, apply_code: str = ArgPlainText("refuse_group_invite") +): quit_list = ["算了", "罢了"] if apply_code in quit_list: await refuse_group_invite.finish("好吧...") @@ -390,7 +404,7 @@ async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_Stat flag=apply_code, sub_type="invite", approve=False ) except BaseException: - await refuse_group_invite.finish("拒绝失败...尝试下手动?") + await refuse_group_invite.finish("拒绝失败...(可能是小群免验证)尝试下手动?") data = Manage().load_invite_apply_list() data.pop(apply_code) Manage().save_invite_apply_list(data) @@ -401,7 +415,7 @@ track_error = Manage().on_command("追踪", "获取报错信息,传入追踪� @track_error.handle() -async def _track_error(bot: Bot, event: MessageEvent): +async def _track_error(event: MessageEvent): track_id = str(event.message).strip() repo = await Manage().track_error(track_id) await track_error.finish(repo) diff --git a/ATRI/plugins/repo.py b/ATRI/plugins/repo.py index 9c52610..40869eb 100644 --- a/ATRI/plugins/repo.py +++ b/ATRI/plugins/repo.py @@ -1,7 +1,8 @@ from random import choice -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.matcher import Matcher +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message from ATRI.service import Service from ATRI.config import BotSelfConfig @@ -25,46 +26,39 @@ class Repo(Service): Service.__init__(self, "反馈", "向维护者发送消息") -repo = Repo().on_command("来杯红茶", "向维护者发送消息", aliases={"反馈", "报告"}) +reporter = Repo().on_command("来杯红茶", "向维护者发送消息", aliases={"反馈", "报告"}) [email protected]_parser # type: ignore -async def _get_repo(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await repo.finish("好吧...") - if not msg: - await repo.reject("需要反馈的内容呢?~") - else: - state["repo"] = msg - - -async def _ready_repo(bot: Bot, event: MessageEvent, state: T_State): +async def _ready_repo( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): user_id = event.get_user_id() if not _repo_flmt.check(user_id): - await repo.finish(_repo_flmt_notice) + await reporter.finish(_repo_flmt_notice) if not _repo_dlmt.check(user_id): - await repo.finish(_repo_dlmt_notice) + await reporter.finish(_repo_dlmt_notice) - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["repo"] = msg + matcher.set_arg("repo", args) [email protected]("repo", "需要反馈的内容呢?~") -async def _deal_repo(bot: Bot, event: MessageEvent, state: T_State): - msg = state["repo"] [email protected]("repo", "需要反馈的内容呢?~") +async def _deal_repo( + bot: Bot, + event: MessageEvent, + repo_msg: str = ArgPlainText("repo"), +): user_id = event.get_user_id() - repo_0 = REPO_FORMAT.format(user=user_id, msg=msg) + repo_0 = REPO_FORMAT.format(user=user_id, msg=repo_msg) for superuser in BotSelfConfig.superusers: try: await bot.send_private_msg(user_id=superuser, message=repo_0) except BaseException: - await repo.finish("发送失败了呢...") + await reporter.finish("发送失败了呢...") _repo_flmt.start_cd(user_id) _repo_dlmt.increase(user_id) - await repo.finish("吾辈的心愿已由咱转告维护者!") + await reporter.finish("吾辈的心愿已由咱转告维护者!") diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py index bcd58f7..3b49750 100644 --- a/ATRI/plugins/rich/__init__.py +++ b/ATRI/plugins/rich/__init__.py @@ -1,4 +1,4 @@ -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.onebot.v11 import MessageEvent from ATRI.utils.limit import FreqLimiter from ATRI.log import logger as log @@ -10,7 +10,7 @@ bili_rich = Rich().on_message("小程序检测", "小程序爪巴", block=False) @bili_rich.handle() -async def _fk_bili(bot: Bot, event: MessageEvent): +async def _fk_bili(event: MessageEvent): user_id = event.get_user_id() if not _rich_flmt.check(user_id): return diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index fed1096..092db50 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -1,9 +1,9 @@ from re import findall from random import choice -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment +from nonebot.matcher import Matcher +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment from ATRI.config import SauceNAO from ATRI.utils.limit import FreqLimiter @@ -17,33 +17,22 @@ _search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇� saucenao = SaouceNao().on_command("以图搜图", "透过一张图搜索可能的来源") [email protected]_parser # type: ignore -async def _get_img(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "不搜了"] - if msg in quit_list: - await saucenao.finish("好吧...") - if not msg: - await saucenao.reject("图呢?") - else: - state["img"] = msg - - @saucenao.handle() -async def _ready_search(bot: Bot, event: MessageEvent, state: T_State): +async def _ready_search( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): user_id = event.get_user_id() if not _search_flmt.check(user_id): await saucenao.finish(_search_flmt_notice) - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["img"] = msg + matcher.set_arg("saucenao_img", args) [email protected]("img", "图呢?") -async def _deal_search(bot: Bot, event: MessageEvent, state: T_State): [email protected]("saucenao_img", "图呢?") +async def _deal_search(event: MessageEvent, msg: str = ArgPlainText("saucenao_img")): user_id = event.get_user_id() - msg = state["img"] img = findall(r"url=(.*?)]", msg) if not img: await saucenao.reject("请发送图片而不是其他东西!!") diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py index 244e030..6c135aa 100644 --- a/ATRI/plugins/setu/__init__.py +++ b/ATRI/plugins/setu/__init__.py @@ -1,10 +1,11 @@ import re import asyncio from random import choice + from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent, Message -from nonebot.adapters.cqhttp.message import MessageSegment -from nonebot.typing import T_State +from nonebot.matcher import Matcher +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message, MessageSegment from ATRI.config import BotSelfConfig from ATRI.utils.limit import FreqLimiter, DailyLimiter @@ -22,7 +23,7 @@ random_setu = Setu().on_command( @random_setu.handle() -async def _random_setu(bot: Bot, event: MessageEvent): +async def _random_setu(bot: Bot, event: MessageEvent, matcher: Matcher, args: Message = CommandArg()): user_id = event.get_user_id() if not _setu_flmt.check(user_id): await random_setu.finish() @@ -44,12 +45,25 @@ async def _random_setu(bot: Bot, event: MessageEvent): await asyncio.sleep(30) await bot.delete_msg(message_id=event_id) + msg = args.extract_plain_text() + if msg: + matcher.set_arg("r_rush_after_think", args) + +@random_setu.got("r_rush_after_think") +async def _(think: str = ArgPlainText("r_rush_after_think")): + is_repo = will_think(think) + if not is_repo: + await random_setu.finish() + else: + await random_setu.finish(is_repo) + + tag_setu = Setu().on_regex(r"来[张点丶份](.*?)的[涩色🐍]图", "根据提供的tag查找涩图") @tag_setu.handle() -async def _tag_setu(bot: Bot, event: MessageEvent): +async def _tag_setu(bot: Bot, event: MessageEvent, matcher: Matcher, args: Message = CommandArg()): user_id = event.get_user_id() if not _setu_flmt.check(user_id): await random_setu.finish() @@ -77,6 +91,19 @@ async def _tag_setu(bot: Bot, event: MessageEvent): await asyncio.sleep(30) await bot.delete_msg(message_id=event_id) + msg = args.extract_plain_text() + if msg: + matcher.set_arg("r_rush_after_think", args) + + +@tag_setu.got("t_rush_after_think") +async def _(think: str = ArgPlainText("t_rush_after_think")): + is_repo = will_think(think) + if not is_repo: + await random_setu.finish() + else: + await random_setu.finish(is_repo) + _catcher_max_file_size = 128 @@ -129,17 +156,16 @@ nsfw_checker = Setu().on_command("/nsfw", "涩值检测") @nsfw_checker.handle() -async def _nsfw_checker(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _nsfw_checker(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["nsfw_img"] = msg + matcher.set_arg("nsfw_img", args) @nsfw_checker.got("nsfw_img", "图呢?") -async def _deal_check(bot: Bot, event: MessageEvent, state: T_State): - msg = state["nsfw_img"] +async def _deal_check(bot: Bot, img: str = ArgPlainText("nsfw_img")): pattern = r"url=(.*?)]" - args = re.findall(pattern, msg) + args = re.findall(pattern, img) if not args: await nsfw_checker.reject("请发送图片而不是其他东西!!") @@ -167,16 +193,15 @@ catcher_setting = Setu().on_command("嗅探设置", "涩图检测图片文件大 @catcher_setting.handle() -async def _catcher_setting(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _catcher_setting(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["catcher_set"] = msg + matcher.set_arg("catcher_set", args) @catcher_setting.got("catcher_set", "数值呢?(1对应1kb,默认128)") -async def _deal_setting(bot: Bot, event: MessageEvent, state: T_State): +async def _deal_setting(msg: str = ArgPlainText("catcher_set")): global _catcher_max_file_size - msg = state["catcher_set"] try: _catcher_max_file_size = int(msg) except Exception: @@ -205,3 +230,37 @@ async def _scheduler_setu(bot): except Exception: pass + + +_ag_l = ["涩图来", "来点涩图", "来份涩图"] +_ag_patt = r"来[张点丶份](.*?)的[涩色🐍]图" + +_nice_patt = r"[hH好][sS涩色][oO哦]|[嗯恩摁社蛇🐍射]了|(硬|石更)了|[牛🐂][牛🐂]要炸了|[炼恋]起来|开?导" +_nope_patt = r"不够[涩色]|就这|这也[是叫算]|[??]" +_again_patt = r"再来一张|不够" + +_nice_repo = ["w", "好诶!", "ohh", "(///w///)", "🥵", "我也"] +_nope_repo = ["那你来发", "爱看不看", "你看不看吧", "看这种类型的涩图,是一件多么美妙的事情"] +_again_repo = ["没了...", "自己找去"] + + +def will_think(msg: str) -> str: + if msg in _ag_l: + return str() + + ag_jud = re.findall(_ag_patt, msg) + if ag_jud: + return str() + + nice_jud = re.findall(_nice_patt, msg) + nope_jud = re.findall(_nope_patt, msg) + again_jud = re.findall(_again_patt, msg) + + if nice_jud: + return choice(_nice_repo) + elif nope_jud: + return choice(_nope_repo) + elif again_jud: + return choice(_again_repo) + else: + return str() diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py index cca2767..a26bbef 100644 --- a/ATRI/plugins/setu/data_source.py +++ b/ATRI/plugins/setu/data_source.py @@ -1,16 +1,16 @@ -import base64 - -# from pathlib import Path from random import choice -from nonebot.adapters.cqhttp import MessageSegment +from nonebot.adapters.onebot.v11 import MessageSegment from ATRI.service import Service from ATRI.rule import is_in_service from ATRI.utils import request +from ATRI.config import Setu as ST from .tf_dealer import detect_image LOLICON_URL = "https://api.lolicon.app/setu/v2" +DEFAULT_SETU = "https://i.pixiv.cat/img-original/img/2021/02/28/22/44/49/88124144_p0.jpg" + class Setu(Service): @@ -31,9 +31,9 @@ class Setu(Service): data: dict = temp_data[0] title = data.get("title", "木陰のねこ") p_id = data.get("pid", 88124144) - url = data["urls"].get("original", "ignore") + url: str = data["urls"].get("original", "ignore") - setu = MessageSegment.image(url, timeout=114514) + setu = MessageSegment.image(use_proxy(url), timeout=114514) repo = f"Title: {title}\nPid: {p_id}" return repo, setu @@ -55,7 +55,7 @@ class Setu(Service): p_id = data.get("pid", 88124144) url = data["urls"].get( "original", - "https://i.pixiv.cat/img-original/img/2021/02/28/22/44/49/88124144_p0.jpg", + use_proxy(DEFAULT_SETU), ) setu = MessageSegment.image(url, timeout=114514) repo = f"Title: {title}\nPid: {p_id}" @@ -87,8 +87,15 @@ class Setu(Service): url = temp_data[0]["urls"].get( "original", - "https://i.pixiv.cat/img-original/img/2021/02/28/22/44/49/88124144_p0.jpg", + use_proxy(DEFAULT_SETU), ) setu = MessageSegment.image(url, timeout=114514) repo = f"是{tag}哦~❤\n{setu}" return repo + + +def use_proxy(url: str) -> str: + if ST.reverse_proxy: + return url.replace("i.pixiv.cat", ST.reverse_proxy_domain) + else: + return url diff --git a/ATRI/plugins/setu/tf_dealer.py b/ATRI/plugins/setu/tf_dealer.py index bf68020..f966636 100644 --- a/ATRI/plugins/setu/tf_dealer.py +++ b/ATRI/plugins/setu/tf_dealer.py @@ -1,7 +1,6 @@ import io import os import re -import string import asyncio import skimage import skimage.io @@ -9,12 +8,8 @@ import numpy as np from PIL import Image from pathlib import Path from sys import getsizeof -from random import sample -try: - import tflite_runtime.interpreter as tf # type: ignore -except Exception: - import tensorflow as tf +import tensorflow as tf from ATRI.log import logger as log from ATRI.utils import request @@ -116,4 +111,5 @@ async def init_module(): raise WriteError("NSFW TF module init failed!") -asyncio.get_event_loop().run_until_complete(init_module()) +loop = asyncio.get_event_loop() +loop.create_task(init_module()) diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py index 359344b..06afa3b 100644 --- a/ATRI/plugins/status/__init__.py +++ b/ATRI/plugins/status/__init__.py @@ -1,5 +1,3 @@ -from nonebot.adapters.cqhttp import Bot, MessageEvent - from ATRI.utils.apscheduler import scheduler from .data_source import IsSurvive @@ -8,7 +6,7 @@ ping = IsSurvive().on_command("/ping", "检测bot简单信息处理速度") @ping.handle() -async def _ping(bot: Bot, event: MessageEvent): +async def _ping(): await ping.finish(IsSurvive.ping()) @@ -16,7 +14,7 @@ status = IsSurvive().on_command("/status", "查看运行资源占用") @status.handle() -async def _status(bot: Bot, event: MessageEvent): +async def _status(): msg, _ = IsSurvive.get_status() await status.finish(msg) diff --git a/ATRI/plugins/util/__init__.py b/ATRI/plugins/util/__init__.py index 541ca1b..b67a988 100644 --- a/ATRI/plugins/util/__init__.py +++ b/ATRI/plugins/util/__init__.py @@ -1,8 +1,9 @@ import re from random import choice, random -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.matcher import Matcher +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import MessageEvent, Message from ATRI.utils.limit import FreqLimiter from .data_source import Encrypt, Utils, Yinglish @@ -11,62 +12,36 @@ from .data_source import Encrypt, Utils, Yinglish roll = Utils().on_command("/roll", "骰子~用法:1d10 或 2d10+2d10+more") [email protected]_parser # type: ignore -async def _get_roll(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了"] - if msg in quit_list: - await roll.finish("好吧...") - if not msg: - await roll.reject("参数呢?!格式:1d10 或 2d10+2d10+more") - else: - state["roll"] = msg - - @roll.handle() -async def _ready_roll(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_roll(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["roll"] = msg + matcher.set_arg("roll", args) @roll.got("roll", "参数呢?!格式:1d10 或 2d10+2d10+more") -async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State): - text = state["roll"] - match = re.match(r"^([\dd+\s]+?)$", text) +async def _deal_roll(roll_msg: str = ArgPlainText("roll")): + match = re.match(r"^([\dd+\s]+?)$", roll_msg) if not match: await roll.finish("阿——!参数不对!格式:1d10 或 2d10+2d10+more") - msg = Utils().roll_dice(text) + msg = Utils().roll_dice(roll_msg) await roll.finish(msg) -encrypt_en = Utils().on_command("加密", "我们之前的秘密❤") - - -@encrypt_en.args_parser # type: ignore -async def _get_encr_en_text(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了"] - if msg in quit_list: - await roll.finish("好吧...") - if not msg: - await roll.reject("内容呢?!") - else: - state["encr_en_text"] = msg +encrypt_en = Utils().on_command("加密", "我们之间的秘密❤") @encrypt_en.handle() -async def _ready_en(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_en(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["encr_en_text"] = msg + matcher.set_arg("encr_en_text", args) @encrypt_en.got("encr_en_text", "内容呢?!") -async def _deal_en(bot: Bot, event: MessageEvent, state: T_State): - text = state["encr_en_text"] +async def _deal_en(text: str = ArgPlainText("encr_en_text")): is_ok = len(text) if is_ok < 10: await encrypt_en.reject("太短不加密!") @@ -78,28 +53,15 @@ async def _deal_en(bot: Bot, event: MessageEvent, state: T_State): encrypt_de = Utils().on_command("解密", "解开我们的秘密❤") -@encrypt_de.args_parser # type: ignore -async def _get_encr_de_text(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了"] - if msg in quit_list: - await encrypt_de.finish("好吧...") - if not msg: - await encrypt_de.reject("内容呢?!") - else: - state["encr_de_text"] = msg - - @encrypt_de.handle() -async def _ready_de(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() +async def _ready_de(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() if msg: - state["encr_de_text"] = msg + matcher.set_arg("encr_de_text", args) @encrypt_de.got("encr_de_text", "内容呢?!") -async def _deal_de(bot: Bot, event: MessageEvent, state: T_State): - text = state["encr_de_text"] +async def _deal_de(text: str = ArgPlainText("encr_de_text")): en = Encrypt() result = en.decode(text) await encrypt_de.finish(result) @@ -111,33 +73,22 @@ _sepi_flmt = FreqLimiter(3) _sepi_flmt_notice = ["涩批爬", "✌🥵✌"] [email protected]_parser # type: ignore -async def _get_sepi(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await sepi.finish("好吧...") - if not msg: - await sepi.reject("内容呢?!") - else: - state["sepi_text"] = msg - - @sepi.handle() -async def _ready_sepi(bot: Bot, event: MessageEvent, state: T_State): +async def _ready_sepi( + matcher: Matcher, event: MessageEvent, args: Message = CommandArg() +): user_id = event.get_user_id() if not _sepi_flmt.check(user_id): await sepi.finish(choice(_sepi_flmt_notice)) - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["sepi_text"] = msg + matcher.set_arg("sepi_text", args) @sepi.got("sepi_text", "内容呢?!") -async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State): +async def _deal_sepi(event: MessageEvent, msg: str = ArgPlainText("sepi_text")): user_id = event.get_user_id() - msg = state["sepi_text"] if len(msg) < 4: await sepi.finish("这么短?涩不起来!") diff --git a/ATRI/plugins/wife/__init__.py b/ATRI/plugins/wife/__init__.py index 57a5af2..b69315d 100644 --- a/ATRI/plugins/wife/__init__.py +++ b/ATRI/plugins/wife/__init__.py @@ -3,9 +3,10 @@ from random import choice from pydantic import BaseModel from nonebot.rule import Rule -from nonebot.typing import T_State +from nonebot.matcher import Matcher +from nonebot.params import CommandArg, ArgPlainText from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent, Message +from nonebot.adapters.onebot.v11 import Bot, MessageEvent, GroupMessageEvent, Message from ATRI.utils.limit import FreqLimiter from .data_source import Wife @@ -25,7 +26,7 @@ tietie_superuser = Wife().on_message( @tietie_superuser.handle() -async def _tietie_superuser(bot: Bot, event: MessageEvent): +async def _tietie_superuser(event: MessageEvent): if not _is_tietie: await tietie_superuser.finish() @@ -38,25 +39,21 @@ async def _tietie_superuser(bot: Bot, event: MessageEvent): await tietie_superuser.finish(Message(result)) -no_tietie = Wife().on_command( - "不可以贴", docs="拒绝贴贴", rule=Rule(), permission=SUPERUSER, block=False -) +no_tietie = Wife().on_command("不可以贴", docs="拒绝贴贴", rule=Rule(), permission=SUPERUSER) @no_tietie.handle() -async def _no_tietie(bot: Bot, event: MessageEvent): +async def _no_tietie(): global _is_tietie _is_tietie = False await no_tietie.finish("好吧...") -yes_tietie = Wife().on_command( - "来贴贴", docs="继续贴贴", rule=Rule(), permission=SUPERUSER, block=False -) +yes_tietie = Wife().on_command("来贴贴", docs="继续贴贴", rule=Rule(), permission=SUPERUSER) @yes_tietie.handle() -async def _yes_tietie(bot: Bot, event: MessageEvent): +async def _yes_tietie(): global _is_tietie _is_tietie = True await yes_tietie.finish("好欸!") @@ -117,7 +114,7 @@ async def _get_wife(bot: Bot, event: GroupMessageEvent): name=req_user_card, sex=is_nick, wife=user_id ).dict() data[user_id] = MarryInfo( - name=lucky_user_card, sex=lucky_user_sex, wife=lucky_user + name=lucky_user_card, sex=lucky_user_sex, wife=str(lucky_user) ).dict() Wife().save_marry_list(data) @@ -133,7 +130,7 @@ call_wife = Wife().on_command("老婆", "呼唤老婆/老公!", aliases={"老� @call_wife.handle() -async def _call_wife(bot: Bot, event: MessageEvent): +async def _call_wife(event: MessageEvent): user_id = event.get_user_id() if not _wife_flmt.check(user_id): await call_wife.finish() @@ -154,7 +151,9 @@ discard_wife = Wife().on_command("我要离婚", "离婚!") @discard_wife.handle() -async def _discard_wife(bot: Bot, event: GroupMessageEvent, state: T_State): +async def _discard_wife( + matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() +): user_id = event.get_user_id() if not _wife_flmt.check(user_id): await discard_wife.finish() @@ -163,19 +162,20 @@ async def _discard_wife(bot: Bot, event: GroupMessageEvent, state: T_State): if user_id not in data: await discard_wife.finish("你还没对象呐...") - msg = str(event.message).strip() + msg = args.extract_plain_text() if msg: - state["is_disc"] = msg + matcher.set_arg("is_disc", args) @discard_wife.got("is_disc", "真的吗...(y/是)") -async def _deal_discard(bot: Bot, event: GroupMessageEvent, state: T_State): - msg = state["is_disc"] +async def _deal_discard( + bot: Bot, event: GroupMessageEvent, is_disc: str = ArgPlainText("is_disc") +): rd_list = ["y", "Y", "是", "确认", "对"] user_id = event.get_user_id() group_id = event.group_id - if msg not in rd_list: + if is_disc not in rd_list: user_info = await bot.get_group_member_info( group_id=group_id, user_id=int(user_id) ) diff --git a/ATRI/plugins/wife/data_source.py b/ATRI/plugins/wife/data_source.py index 670f862..29d58e7 100644 --- a/ATRI/plugins/wife/data_source.py +++ b/ATRI/plugins/wife/data_source.py @@ -2,7 +2,7 @@ import os import json from random import choice from pathlib import Path -from nonebot.adapters.cqhttp import MessageSegment +from nonebot.adapters.onebot.v11 import MessageSegment from ATRI.service import Service from ATRI.rule import is_in_service diff --git a/ATRI/rule.py b/ATRI/rule.py index e8126e2..925565e 100644 --- a/ATRI/rule.py +++ b/ATRI/rule.py @@ -1,12 +1,12 @@ -from nonebot.adapters.cqhttp.event import PrivateMessageEvent from nonebot.rule import Rule -from nonebot.adapters.cqhttp import MessageEvent, GroupMessageEvent +from nonebot.adapters import Bot, Event +from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent from .service import ServiceTools def is_in_service(service: str) -> Rule: - async def _is_in_service(bot, event, state) -> bool: + async def _is_in_service(bot: Bot, event: Event) -> bool: result = ServiceTools().auth_service(service) if not result: return False @@ -27,7 +27,7 @@ def is_in_service(service: str) -> Rule: def to_bot() -> Rule: - async def _to_bot(bot, event, state) -> bool: + async def _to_bot(bot: Bot, event: Event) -> bool: return event.is_tome() return Rule(_to_bot) diff --git a/ATRI/service.py b/ATRI/service.py index 5379644..9f680c8 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -2,15 +2,22 @@ import os import re import json from pathlib import Path +from types import ModuleType from pydantic import BaseModel from typing import List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING from nonebot.matcher import Matcher from nonebot.permission import Permission -from nonebot.typing import T_State, T_Handler, T_RuleChecker +from nonebot.dependencies import Dependent +from nonebot.typing import ( + T_State, + T_Handler, + T_RuleChecker, + T_PermissionChecker, +) from nonebot.rule import Rule, command, keyword, regex -from ATRI.exceptions import ReadFileError, ServiceRegisterError, WriteError +from ATRI.exceptions import ReadFileError, WriteError if TYPE_CHECKING: from nonebot.adapters import Bot, Event @@ -35,7 +42,7 @@ class ServiceInfo(BaseModel): class CommandInfo(BaseModel): type: str docs: str - aliases: list or set + aliases: Union[list, set] class Service: @@ -148,11 +155,10 @@ class Service: def on_message( self, name: str = None, - docs: str = None, - _from: str = str(), # 供类似 on_command 的方法,提供更直观的 log 中 matcher 触发来源 + docs: str = str(), rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - handlers: Optional[List[T_Handler]] = None, + permission: Optional[Union[Permission, T_PermissionChecker]] = None, + handlers: Optional[List[Union[T_Handler, Dependent]]] = None, block: bool = True, priority: int = None, state: Optional[T_State] = None, @@ -181,15 +187,14 @@ class Service: matcher = Matcher.new( "message", Rule() & rule, - permission or Permission(), - module=self.service + "-" + _from, + Permission() | permission, + module=ModuleType(self.service), temp=self.temp, priority=priority, block=block, handlers=handlers, default_state=state, ) - matcher.module = self.service return matcher def on_notice(self, name: str, docs: str, block: bool = True) -> Type[Matcher]: @@ -204,7 +209,7 @@ class Service: "notice", Rule() & self.rule, Permission(), - module=self.service + "-" + name, + module=ModuleType(self.service), temp=self.temp, priority=self.priority, block=block, @@ -225,7 +230,7 @@ class Service: "request", Rule() & self.rule, Permission(), - module=self.service + "-" + name, + module=ModuleType(self.service), temp=self.temp, priority=self.priority, block=block, @@ -253,22 +258,8 @@ class Service: ).dict() self._save_cmds(cmd_list) - async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): - message = event.get_message() - segment = message.pop(0) - new_message = message.__class__( - str(segment).lstrip()[len(state["_prefix"]["raw_command"]) :].lstrip() - ) # type: ignore - for new_segment in reversed(new_message): - message.insert(0, new_segment) - - handlers = kwargs.pop("handlers", []) - handlers.insert(0, _strip_cmd) - commands = set([cmd]) | (aliases or set()) - return self.on_message( - _from=str(cmd), rule=command(*commands) & rule, handlers=handlers, **kwargs - ) + return self.on_message(rule=command(*commands) & rule, block=True, **kwargs) def on_keyword( self, @@ -287,7 +278,7 @@ class Service: cmd_list[name] = CommandInfo(type="keyword", docs=docs, aliases=keywords).dict() self._save_cmds(cmd_list) - return self.on_message(_from=name, rule=keyword(*keywords) & rule, **kwargs) + return self.on_message(rule=keyword(*keywords) & rule, **kwargs) def on_regex( self, @@ -304,9 +295,7 @@ class Service: cmd_list[pattern] = CommandInfo(type="regex", docs=docs, aliases=list()).dict() self._save_cmds(cmd_list) - return self.on_message( - _from=pattern, rule=regex(pattern, flags) & rule, **kwargs - ) + return self.on_message(rule=regex(pattern, flags) & rule, **kwargs) class ServiceTools(object): diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py index 68a69b6..480d00e 100644 --- a/ATRI/utils/__init__.py +++ b/ATRI/utils/__init__.py @@ -55,9 +55,9 @@ class ListDealer: return self.lst -class CoolqCodeChecker: +class MessageChecker: """ - 检查所传回的cq码是否存在被注入可能 + 检查所传回的信息是否存在被注入可能 """ tenc_gchat_url: str = "gchat.qpic.cn" @@ -67,7 +67,7 @@ class CoolqCodeChecker: self.text = text @property - def check(self) -> bool: + def check_cq_code(self) -> bool: _type = re.findall(r"CQ:(.*?),", self.text) for i in _type: if i == "image": @@ -83,6 +83,13 @@ class CoolqCodeChecker: return True else: return True + + @property + def check_image_url(self) -> bool: + if self.tenc_gchat_url not in self.text: + return False + else: + return True class FileDealer: diff --git a/ATRI/utils/apscheduler.py b/ATRI/utils/apscheduler.py index 2680f19..daf37f4 100644 --- a/ATRI/utils/apscheduler.py +++ b/ATRI/utils/apscheduler.py @@ -1,9 +1,10 @@ -# Fork from: https://github.com/nonebot/plugin-apscheduler - +""" +Fork from: https://github.com/nonebot/plugin-apscheduler +""" import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler -from nonebot import get_driver, export +from nonebot import get_driver from nonebot.log import logger, LoguruHandler @@ -13,7 +14,6 @@ apscheduler_config: dict = {"apscheduler.timezone": "Asia/Shanghai"} driver = get_driver() scheduler = AsyncIOScheduler() -export().scheduler = scheduler async def _start_scheduler(): diff --git a/ATRI/utils/request.py b/ATRI/utils/request.py index 2cfce85..da0cdd4 100644 --- a/ATRI/utils/request.py +++ b/ATRI/utils/request.py @@ -2,16 +2,17 @@ import httpx from ATRI.config import BotSelfConfig -proxy = BotSelfConfig.proxy -if not proxy: +if not BotSelfConfig.proxy: proxy = dict() +else: + proxy = {"all://": BotSelfConfig.proxy} async def get(url: str, **kwargs): - async with httpx.AsyncClient(proxies=proxy) as client: + async with httpx.AsyncClient(proxies=proxy) as client: # type: ignore return await client.get(url, **kwargs) async def post(url: str, **kwargs): - async with httpx.AsyncClient(proxies=proxy) as client: + async with httpx.AsyncClient(proxies=proxy) as client: # type: ignore return await client.post(url, **kwargs) diff --git a/changelog.md b/changelog.md index 47e5578..5c6b518 100644 --- a/changelog.md +++ b/changelog.md @@ -1,24 +1,91 @@ -> 此处仅为记录新功能更新,修复 BUG/以及其它 请关注[`GitHub commits`](https://github.com/Kyomotoi/ATRI/commits/main) +> 此处仅为记录重大更新,修复 BUG/以及其它 请关注[`GitHub commits`](https://github.com/Kyomotoi/ATRI/commits/main) + +## Comming soon... +- Next: `YHN-001-A06` + +--- + +## Jan 31, 2022 + +> 新年快乐! + +- 更新版本至: `YHN-001-A05` +- 新增: + - 全面适配 `NoneBot v2.0.0 beta1` + - 插件:[`broadcast`](user/plugin-broadcast.md) + - 现在可以向机器人所在群推送同一消息 + - 插件:[`essential`](user/plugin-essential.md) + - 现在可以接受、拒绝撤回消息推送 + - 内置 [`gocqhttp`](https://github.com/mnixry/nonebot-plugin-gocqhttp) + - 现在可以连同机器人一同启动了 + - 同时包含了可视化gocq控制台 + - 可快捷、快速添加新账号 + - 生产环境资源可视化 + - [单元测试](/test),加快了机器人开发上线前自检的速度 + - `config.yml` 新增以下内容: + ```yaml + InlineGoCQHTTP: + accounts: # 可多个账号,具体请参考文档 + - uin: 1234567890 + password: "" + protocol: 3 + + download_version: "latest" + + Setu: + reverse_proxy: true # 请参考文档 + reverse_proxy_domain: "i.pixiv.re" + ``` + - `config` 中 `proxy` 真正意义上生效 +- 其它: + - 移除插件: `cause` + - 原因:过于无趣,遭大多用户反对,且源API已失效,不想再替换新的第三方API + +--- ## Oct 24, 2021 + +- 更新版本至: `YHN-001-A04` - 新增: - - nsfw检测(主动/被动) + - nsfw检测(主动/被动)又名 `涩图嗅探` + - 可选代理 +- 修复: + - plugin/chat 在 nb2-a14+ 版本 finish 内为空时会报错 +- 其他: + - 对定时任务进行中文命名 +- 移除: + - 前端界面(赶工而导致成品炸裂) + +--- ## Jul 31, 2021 -- 新增: + +- 新增: - 前端(单主页) - 和维护者贴贴w +--- + ## Jul 8, 2021 -- 更新版本至:YHN-001-A03,请关注commit:[传送](https://github.com/Kyomotoi/ATRI/commit/be2747e4d4b820ca0f1f988d3b77a628da26fe7b) + +- 更新版本至: `YHN-001-A03`,请关注commit: [传送](https://github.com/Kyomotoi/ATRI/commit/be2747e4d4b820ca0f1f988d3b77a628da26fe7b) + +--- ## May 4, 2021 -- 新增: + +- 新增: - 涩图 - 群老婆! +--- + ## Apr 11, 2021 + - 正式重构完成 +--- + ## 更早记录 + - 请自行查看[`GitHub commits`](https://github.com/Kyomotoi/ATRI/commits/main) @@ -7,7 +7,19 @@ BotSelfConfig: command_start: ["", "/"] command_sep: ["."] session_expire_timeout: 60 - proxy: "" + proxy: "" # 请参考文档 + +InlineGoCQHTTP: + accounts: # 可多个账号,具体请参考文档 + - uin: 1234567890 + password: "" + protocol: 3 + + download_version: "latest" SauceNAO: key: "" + +Setu: + reverse_proxy: true # 请参考文档 + reverse_proxy_domain: "i.pixiv.re" @@ -6,4 +6,4 @@ ATRI.init() app = ATRI.asgi() if __name__ == "__main__": - ATRI.run("main:app") + ATRI.run() diff --git a/pyproject.toml b/pyproject.toml index 56dbf61..3f573e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,9 @@ python = "^3.8" [tool.poetry.dev-dependencies] black = "^21.4-beta.0" +[tool.pytest.ini_options] +addopts = '-p no:warnings' + [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" diff --git a/requirements.txt b/requirements.txt index bef0a64..af70f71 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ -aiohttp>=3.6.2 aiofiles>=0.6.0 APScheduler>=3.7.0 Pillow>=8.1.1 -nonebot-adapter-cqhttp>=2.0.0a14 -nonebot-plugin-test>=0.2.0 -nonebot2>=2.0.0a13.post1 +nonebot2>=2.0.0b1 +nonebot-adapter-onebot>=2.0.0b1 +nonebug>=0.2.0 +nonebot-plugin-gocqhttp>=0.3.2 psutil>=5.7.2 pathlib>=1.0.1 pytz>=2020.1 diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/__init__.py diff --git a/test/test_plugin_anime_search.py b/test/test_plugin_anime_search.py new file mode 100644 index 0000000..2782ba9 --- /dev/null +++ b/test/test_plugin_anime_search.py @@ -0,0 +1,32 @@ +import pytest +from nonebug import App + +from nonebot.adapters.onebot.v11 import MessageSegment + +from .utils import make_fake_message, make_fake_event + + +async def test_saucenao(app: App): + from ATRI.plugins.saucenao import saucenao + + Message = make_fake_message() + + async with app.test_matcher(saucenao) as ctx: + bot = ctx.create_bot() + + msg = Message("以图搜图") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "图呢?", True) + + msg = Message( + MessageSegment.image( + "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png" + ) + ) + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "失败了...", False) diff --git a/test/test_plugin_chat.py b/test/test_plugin_chat.py new file mode 100644 index 0000000..e04f1d3 --- /dev/null +++ b/test/test_plugin_chat.py @@ -0,0 +1,84 @@ +import pytest +from nonebug import App +from nonebot.adapters.onebot.v11 import MessageSegment + +from .utils import make_fake_message, make_fake_event + + +async def test_chat(app: App): + from ATRI.plugins.chat import chat + + Message = make_fake_message() + + async with app.test_matcher(chat) as ctx: + bot = ctx.create_bot() + + msg = Message("爱你") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True) + + +async def test_my_name_is(app: App): + from ATRI.plugins.chat import my_name_is + + Message = make_fake_message() + + async with app.test_matcher(my_name_is) as ctx: + bot = ctx.create_bot() + + msg = Message("叫我") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True) + + msg = Message("欧尼酱") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True) + + +async def test_say(app: App): + from ATRI.plugins.chat import say + + Message = make_fake_message() + + async with app.test_matcher(say) as ctx: + bot = ctx.create_bot() + + msg = Message("说") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "想要咱复读啥呢...", True) + + msg = Message("nya~") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "nya~", True) + + async with app.test_matcher(say) as ctx: + bot = ctx.create_bot() + + msg = Message("说") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "想要咱复读啥呢...", True) + + msg = Message( + MessageSegment.image( + "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png" + ) + ) + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "不要...", True) diff --git a/test/test_plugin_code_runner.py b/test/test_plugin_code_runner.py new file mode 100644 index 0000000..584ebb0 --- /dev/null +++ b/test/test_plugin_code_runner.py @@ -0,0 +1,75 @@ +import pytest +from nonebug import App + +from .utils import make_fake_message, make_fake_event + + +async def test_code_runner(app: App): + from ATRI.plugins.code_runner import code_runner + + Message = make_fake_message() + + async with app.test_matcher(code_runner) as ctx: + bot = ctx.create_bot() + + msg = Message("/code") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "请键入 /code help 以获取帮助~!", True) + + async with app.test_matcher(code_runner) as ctx: + bot = ctx.create_bot() + + msg = Message("/code help") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + /code {语言} + {代码} + For example: + /code python + print('hello world') + """, + True, + ) + + async with app.test_matcher(code_runner) as ctx: + bot = ctx.create_bot() + + msg = Message("/code list") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + 咱现在支持的语言如下: + assembly, bash, c, clojure, + coffeescript, cpp, csharp, + erlang, fsharp, go, groovy, + haskell, java, javascript, + julia, kotlin, lua, perl, + php, python, ruby, rust, + scala, swift, typescript + """, + True, + ) + + async with app.test_matcher(code_runner) as ctx: + bot = ctx.create_bot() + + msg = Message( + """ + /code python + print("hello world") + """ + ) + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "stdout:\nhello world", True) diff --git a/test/test_plugin_funny.py b/test/test_plugin_funny.py new file mode 100644 index 0000000..851a922 --- /dev/null +++ b/test/test_plugin_funny.py @@ -0,0 +1,117 @@ +import pytest +from nonebug import App + +from .utils import make_fake_message, make_fake_event + + +async def test_get_laugh(app: App): + from ATRI.plugins.funny import get_laugh + + Message = make_fake_message() + + async with app.test_matcher(get_laugh) as ctx: + bot = ctx.create_bot() + + msg = Message("来句笑话") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "哈 哈 哈", True) + + +async def test_me_re_you(app: App): + from ATRI.plugins.funny import me_re_you + + Message = make_fake_message() + + async with app.test_matcher(me_re_you) as ctx: + bot = ctx.create_bot() + + msg = Message("超市我") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "超市你", True) + + +async def test_fake_msg(app: App): + from ATRI.plugins.funny import fake_msg + + Message = make_fake_message() + + async with app.test_matcher(fake_msg) as ctx: + bot = ctx.create_bot() + + msg = Message("/fakemsg") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开", True) + + msg = Message("114514") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "内容格式错误,请检查(", True) + + async with app.test_matcher(fake_msg) as ctx: + bot = ctx.create_bot() + + msg = Message("/fakemsg") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开", True) + + msg = Message("114514-0w0-testing") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + "[{'type': 'node', 'data': {'name': '0w0', 'uin': '114514', 'content': 'testing'}}]", + True, + ) + + async with app.test_matcher(fake_msg) as ctx: + bot = ctx.create_bot() + + msg = Message("/fakemsg") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开", True) + + msg = Message("114514-0w0-testing") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "构造失败惹...可能是被制裁了(", True) + + async with app.test_matcher(fake_msg) as ctx: + bot = ctx.create_bot() + + msg = Message("/fakemsg") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "慢...慢一..点❤", True) + + +async def test_eat_what(app: App): + from ATRI.plugins.funny import eat_what + + Message = make_fake_message() + + async with app.test_matcher(eat_what) as ctx: + bot = ctx.create_bot() + + msg = Message("今天吃什么") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "欧尼酱的智商低下想不到今天要吃甚么,所以由我来选择,我给的答案是(串烧)。选我正解!!", True) diff --git a/test/test_plugin_help.py b/test/test_plugin_help.py new file mode 100644 index 0000000..e107150 --- /dev/null +++ b/test/test_plugin_help.py @@ -0,0 +1,154 @@ +import pytest +from nonebug import App + +from .utils import make_fake_message, make_fake_event + + +async def test_main_help(app: App): + from ATRI.plugins.help import main_help + + Message = make_fake_message() + + async with app.test_matcher(main_help) as ctx: + bot = ctx.create_bot() + + msg = Message("菜单") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + 哦呀?~需要帮助? + 关于 -查看bot基本信息 + 服务列表 -以查看所有可用服务 + 帮助 [服务] -以查看对应服务帮助 + Tip: 均需要at触发。@bot 菜单 以打开此页面 + """, + True, + ) + + +async def test_about_me(app: App): + from ATRI import __version__ + from ATRI.config import BotSelfConfig + from ATRI.plugins.help import about_me + + temp_list = list() + for i in BotSelfConfig.nickname: + temp_list.append(i) + nickname = "、".join(map(str, temp_list)) + + Message = make_fake_message() + + async with app.test_matcher(about_me) as ctx: + bot = ctx.create_bot() + + msg = Message("关于") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + f""" + 唔...是来认识咱的么 + 可以称呼咱:{nickname} + 咱的型号是:{__version__} + 想进一步了解: + https://github.com/Kyomotoi/ATRI + """, + True, + ) + + +async def test_service_list(app: App): + import os + + from ATRI.service import SERVICES_DIR + from ATRI.plugins.help import service_list + + files = os.listdir(SERVICES_DIR) + temp_list = list() + for i in files: + service = i.replace(".json", "") + temp_list.append(service) + + services = "、".join(map(str, temp_list)) + + Message = make_fake_message() + + async with app.test_matcher(service_list) as ctx: + bot = ctx.create_bot() + + msg = Message("服务列表") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + f""" + 咱搭载了以下服务~ + {services} + @bot 帮助 [服务] -以查看对应服务帮助 + """, + True, + ) + + +async def test_service_info(app: App): + from ATRI.plugins.help import service_info + + Message = make_fake_message() + + async with app.test_matcher(service_info) as ctx: + bot = ctx.create_bot() + + msg = Message("帮助") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "请检查是否输入错误...", True) + + async with app.test_matcher(service_info) as ctx: + bot = ctx.create_bot() + + msg = Message("帮助 状态") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + 服务名:状态 + 说明: + 检查咱自身状态 + + 可用命令: + /ping、/status + 是否全局启用:True + Tip: 帮助 [服务] [命令] 以查看对应命令详细信息 + """, + True, + ) + + async with app.test_matcher(service_info) as ctx: + bot = ctx.create_bot() + + msg = Message("帮助 状态 /ping") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + 命令:/ping + 类型:command + 说明:检测bot简单信息处理速度 + 更多触发方式:[] + """, + True, + ) diff --git a/test/test_plugin_manage.py b/test/test_plugin_manage.py new file mode 100644 index 0000000..8f3042a --- /dev/null +++ b/test/test_plugin_manage.py @@ -0,0 +1,293 @@ +import pytest +from nonebug import App + +from ATRI.config import RUNTIME_CONFIG +from .utils import make_fake_message, make_fake_event + + +async def test_block_user(app: App): + from ATRI.plugins.manage import block_user + + Message = make_fake_message() + + async with app.test_matcher(block_user) as ctx: + bot = ctx.create_bot() + + msg = Message("封禁用户") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "哪位?GKD!", True) + + msg = Message("114514") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "用户 114514 危!", True) + + +async def test_unblock_user(app: App): + from ATRI.plugins.manage import unblock_user + + Message = make_fake_message() + + async with app.test_matcher(unblock_user) as ctx: + bot = ctx.create_bot() + + msg = Message("解封用户") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "哪位?GKD!", True) + + msg = Message("114514") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "好欸! 114514 重获新生!", True) + + +async def test_block_group(app: App): + from ATRI.plugins.manage import block_group + + Message = make_fake_message() + + async with app.test_matcher(block_group) as ctx: + bot = ctx.create_bot() + + msg = Message("封禁群") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "哪位?GKD!", True) + + msg = Message("114514") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "群 114514 危!", True) + + +async def test_unblock_group(app: App): + from ATRI.plugins.manage import unblock_group + + Message = make_fake_message() + + async with app.test_matcher(unblock_group) as ctx: + bot = ctx.create_bot() + + msg = Message("解封群") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "哪个群?GKD!", True) + + msg = Message("114514") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "好欸! 114514 重获新生!", True) + + +async def test_global_block_service(app: App): + from ATRI.plugins.manage import global_block_service + + Message = make_fake_message() + + async with app.test_matcher(global_block_service) as ctx: + bot = ctx.create_bot() + + msg = Message("全局封禁") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "阿...是哪个服务呢", True) + + msg = Message("状态") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "服务 状态 已被禁用", True) + + +async def test_global_unblock_service(app: App): + from ATRI.plugins.manage import global_unblock_service + + Message = make_fake_message() + + async with app.test_matcher(global_unblock_service) as ctx: + bot = ctx.create_bot() + + msg = Message("全局启用") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "阿...是哪个服务呢", True) + + msg = Message("状态") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "服务 状态 已启用", True) + + +async def test_user_block_service(app: App): + from ATRI.plugins.manage import user_block_service + + Message = make_fake_message() + + async with app.test_matcher(user_block_service) as ctx: + bot = ctx.create_bot() + + msg = Message("对用户114514禁用状态") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "完成~已禁止用户 114514 使用 状态", True) + + +async def test_user_unblock_service(app: App): + from ATRI.plugins.manage import user_unblock_service + + Message = make_fake_message() + + async with app.test_matcher(user_unblock_service) as ctx: + bot = ctx.create_bot() + + msg = Message("对用户114514启用状态") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "完成~已允许用户 114514 使用 状态", True) + + +async def test_group_block_service(app: App): + from ATRI.plugins.manage import group_block_service + + Message = make_fake_message() + + async with app.test_matcher(group_block_service) as ctx: + bot = ctx.create_bot() + + msg = Message("禁用") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "阿...是哪个服务呢", True) + + msg = Message("状态") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "完成!~已禁止本群使用服务:状态", True) + + +async def test_group_unblock_service(app: App): + from ATRI.plugins.manage import group_unblock_service + + Message = make_fake_message() + + async with app.test_matcher(group_unblock_service) as ctx: + bot = ctx.create_bot() + + msg = Message("启用") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "阿...是哪个服务呢", True) + + msg = Message("状态") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "完成!~已允许本群使用服务:状态", True) + + +async def test_get_friend_add_list(app: App): + from ATRI.plugins.manage import get_friend_add_list + + Message = make_fake_message() + + async with app.test_matcher(get_friend_add_list) as ctx: + bot = ctx.create_bot() + + msg = Message("获取好友申请") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + 申请人ID | 申请信息 | 申请码 + Tip: 使用 同意/拒绝好友 [申请码] 以决定 + """, + True, + ) + + +# @pytest.mark.asyncio +# async def test_approve_friend_add(app: App): +# from ATRI.plugins.manage import approve_friend_add + +# Message = make_fake_message() + +# async with app.test_matcher(approve_friend_add) as ctx: +# bot = ctx.create_bot() + +# msg = Message("同意好友") +# event = make_fake_event(_message=msg)() + +# ctx.receive_event(bot, event) +# ctx.should_call_send(event, "申请码GKD!", True) + +# msg = Message() + + +async def test_get_group_invite_list(app: App): + from ATRI.plugins.manage import get_group_invite_list + + Message = make_fake_message() + + async with app.test_matcher(get_group_invite_list) as ctx: + bot = ctx.create_bot() + + msg = Message("获取邀请列表") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + 申请人ID | 申请信息 | 申请码 + Tip: 使用 同意/拒绝邀请 [申请码] 以决定 + """, + True, + ) + + +async def test_track_error(app: App): + from ATRI.plugins.manage import track_error + + Message = make_fake_message() + + async with app.test_matcher(track_error) as ctx: + bot = ctx.create_bot() + + msg = Message("/track") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "请检查ID是否正确...", True) diff --git a/test/test_plugin_rich.py b/test/test_plugin_rich.py new file mode 100644 index 0000000..3e63f4a --- /dev/null +++ b/test/test_plugin_rich.py @@ -0,0 +1,28 @@ +import pytest +from nonebug import App + +from .utils import make_fake_message, make_fake_event + + +async def test_bili_rich(app: App): + from ATRI.plugins.rich import bili_rich + + Message = make_fake_message() + + async with app.test_matcher(bili_rich) as ctx: + bot = ctx.create_bot() + + msg = Message("BV1Ff4y1C7YR") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send( + event, + """ + BV1Ff4y1C7YR INFO: + Title: 【8K30fps】这可能是画质最高的Rick Roll (doge) + Link: https://b23.tv/BV1Ff4y1C7YR + """, + True, + ) diff --git a/test/test_plugin_saucenao.py b/test/test_plugin_saucenao.py new file mode 100644 index 0000000..2782ba9 --- /dev/null +++ b/test/test_plugin_saucenao.py @@ -0,0 +1,32 @@ +import pytest +from nonebug import App + +from nonebot.adapters.onebot.v11 import MessageSegment + +from .utils import make_fake_message, make_fake_event + + +async def test_saucenao(app: App): + from ATRI.plugins.saucenao import saucenao + + Message = make_fake_message() + + async with app.test_matcher(saucenao) as ctx: + bot = ctx.create_bot() + + msg = Message("以图搜图") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "图呢?", True) + + msg = Message( + MessageSegment.image( + "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png" + ) + ) + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "失败了...", False) diff --git a/test/test_plugin_setu.py b/test/test_plugin_setu.py new file mode 100644 index 0000000..b5a7ff2 --- /dev/null +++ b/test/test_plugin_setu.py @@ -0,0 +1,20 @@ +import pytest +from nonebug import App + +from .utils import make_fake_message, make_fake_event + + +async def test_random_setu(app: App): + from ATRI.plugins.setu import random_setu + + Message = make_fake_message() + + async with app.test_matcher(random_setu) as ctx: + bot = ctx.create_bot() + + msg = Message("来张涩图") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "hso(发不出", False) diff --git a/test/utils.py b/test/utils.py new file mode 100644 index 0000000..70d1a27 --- /dev/null +++ b/test/utils.py @@ -0,0 +1,87 @@ +""" +Fork from: https://github.com/nonebot/nonebot2/blob/master/tests/utils.py +""" +from typing import TYPE_CHECKING, Type, Optional + +from pydantic import create_model + +if TYPE_CHECKING: + from nonebot.adapters import Event, Message + + +def make_fake_message() -> Type["Message"]: + from nonebot.adapters import Message, MessageSegment + + class FakeMessageSegment(MessageSegment): + @classmethod + def get_message_class(cls): + return FakeMessage + + def __str__(self) -> str: + return self.data["text"] if self.type == "text" else f"[fake:{self.type}]" + + @classmethod + def text(cls, text: str): + return cls("text", {"text": text}) + + @classmethod + def image(cls, url: str): + return cls("image", {"url": url}) + + def is_text(self) -> bool: + return self.type == "text" + + class FakeMessage(Message): + @classmethod + def get_segment_class(cls): + return FakeMessageSegment + + @staticmethod + def _construct(msg: str): + yield FakeMessageSegment.text(msg) + + return FakeMessage + + +def make_fake_event( + _type: str = "message", + _name: str = "test", + _description: str = "test", + _user_id: str = "test", + _session_id: str = "test", + _message: Optional["Message"] = None, + _to_me: bool = True, + **fields, +) -> Type["Event"]: + from nonebot.adapters import Event + + _Fake = create_model("_Fake", __base__=Event, **fields) + + class FakeEvent(_Fake): + def get_type(self) -> str: + return _type + + def get_event_name(self) -> str: + return _name + + def get_event_description(self) -> str: + return _description + + def get_user_id(self) -> str: + return _user_id + + def get_session_id(self) -> str: + return _session_id + + def get_message(self) -> "Message": + if _message is not None: + return _message + raise NotImplementedError + + def is_tome(self) -> bool: + return _to_me + + class Config: + extra = "forbid" + + return FakeEvent |