diff options
35 files changed, 453 insertions, 363 deletions
@@ -144,3 +144,6 @@ dmypy.json /accounts/* /data/* + +# for MacOS +.DS_Store diff --git a/ATRI/database/db.py b/ATRI/database/db.py index c2aa015..68db8cf 100644 --- a/ATRI/database/db.py +++ b/ATRI/database/db.py @@ -1,7 +1,12 @@ +from pathlib import Path from tortoise import Tortoise +from nonebot import get_driver from ATRI.database import models -from nonebot import get_driver + + +DB_DIR = Path(".") / "data" / "database" / "sql" +DB_DIR.mkdir(exist_ok=True) # 关于数据库的操作类,只实现与数据库有关的CRUD @@ -17,7 +22,7 @@ class DB: from ATRI.database import models await Tortoise.init( - db_url="sqlite://ATRI/database/db.sqlite3", + db_url=f"sqlite://{DB_DIR}/db.sqlite3", modules={"models": [locals()["models"]]}, ) # Generate the schema diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py index 3bfeedb..773748d 100644 --- a/ATRI/plugins/anime_search.py +++ b/ATRI/plugins/anime_search.py @@ -8,17 +8,16 @@ from ATRI.rule import is_in_service from ATRI.utils import request, Translate from ATRI.exceptions import RequestError + URL = "https://api.trace.moe/search?anilistInfo=true&url=" _anime_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) -__doc__ = """ -通过一张图片搜索你需要的番!据说里*也可以 -""" - class Anime(Service): def __init__(self): - Service.__init__(self, "以图搜番", __doc__, rule=is_in_service("以图搜番")) + Service.__init__( + self, "以图搜番", "通过一张图片搜索你需要的番!据说里*也可以", rule=is_in_service("以图搜番") + ) @staticmethod async def _request(url: str) -> dict: diff --git a/ATRI/plugins/applet/data_source.py b/ATRI/plugins/applet/data_source.py index 3fc1bc5..60ca581 100644 --- a/ATRI/plugins/applet/data_source.py +++ b/ATRI/plugins/applet/data_source.py @@ -15,12 +15,12 @@ s = [11, 10, 3, 8, 4, 6] xor = 177451812 add = 8728348608 -__doc__ = "啥b腾讯小程序给👴爪巴\n目前只整了b站的" - class Applet(Service): def __init__(self): - Service.__init__(self, "小程序处理", __doc__, rule=is_in_service("小程序处理")) + Service.__init__( + self, "小程序处理", "啥b腾讯小程序给👴爪巴\n目前只整了b站的", rule=is_in_service("小程序处理") + ) @staticmethod def _bv_dec(x) -> str: diff --git a/ATRI/plugins/bilibili_dynamic/__init__.py b/ATRI/plugins/bilibili_dynamic/__init__.py index 82fbb4d..a42a0e3 100644 --- a/ATRI/plugins/bilibili_dynamic/__init__.py +++ b/ATRI/plugins/bilibili_dynamic/__init__.py @@ -1,35 +1,37 @@ +import re +from tabulate import tabulate +from datetime import datetime, timedelta + import pytz from apscheduler.triggers.base import BaseTrigger from apscheduler.triggers.combining import AndTrigger from apscheduler.triggers.interval import IntervalTrigger -from ATRI.utils.apscheduler import scheduler -from ATRI.utils import timestamp2datetime - from nonebot.params import State from nonebot.adapters.onebot.v11 import MessageSegment, GroupMessageEvent, Message from nonebot.typing import T_State from nonebot import get_bot -from .data_source import BilibiliDynamicSubscriptor -import re -from tabulate import tabulate -from datetime import datetime, timedelta +from ATRI.utils.apscheduler import scheduler +from ATRI.utils import timestamp2datetime from ATRI.log import logger +from .data_source import BilibiliDynamicSubscriptor + + bilibili_dynamic = BilibiliDynamicSubscriptor().on_command( - "/bilibili_dynamic", "b站动态订阅助手", aliases={"b站动态"} + "/bilibili_dynamic", "b站动态订阅助手", aliases={"/bd", "b站动态"} ) -__help__ = """欢迎使用【b站动态订阅助手】~ -目前支持的功能如下...请选择: +__help__ = """好哦!是b站动态订阅诶~ +目前支持的功能如下...请键入对应关键词: 1.添加订阅 2.取消订阅 3.订阅列表 ----------------------------------- -用法示例1:/bilibili_dynamic 添加订阅 -用法示例2:/bilibili_dynamic 取消订阅 401742377(数字uid) -用法示例3:/bilibili_dynamic 订阅列表""" +用法示例1:/bd 添加订阅 +用法示例2:/bd 取消订阅 401742377(数字uid) +用法示例3:/bd 订阅列表""" def help() -> str: @@ -39,7 +41,6 @@ def help() -> str: @bilibili_dynamic.handle() async def _menu(event: GroupMessageEvent, state: T_State = State()): args = str(event.get_plaintext()).strip().lower().split()[1:] - # print(args) if not args: await bilibili_dynamic.finish(help()) elif args and len(args) == 1: @@ -58,7 +59,6 @@ async def handle_subcommand(event: GroupMessageEvent, state: T_State = State()): if state["sub_command"] == "订阅列表": subscriptor = BilibiliDynamicSubscriptor() - # print(event.group_id) r = await subscriptor.get_subscriptions(query_map={"groupid": event.group_id}) subs = [] for s in r: @@ -80,8 +80,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()): if uid == "-1": await bilibili_dynamic.finish("已经成功退出订阅~") - # print(state) - # print(uid) if not re.match(r"^\d+$", uid): await bilibili_dynamic.reject("这似乎不是UID呢, 请重新输入:") uid = int(uid) @@ -107,7 +105,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()): ) ) success = await subscriptor.add_subscription(uid, event.group_id) - print(success) success = success and ( await subscriptor.update_subscription_by_uid( uid=uid, @@ -143,7 +140,7 @@ class BilibiliDynamicCheckEnabledTrigger(BaseTrigger): # 实现abstract方法 <get_next_fire_time> def get_next_fire_time(self, previous_fire_time, now): subscriptor = BilibiliDynamicSubscriptor() - config = subscriptor.load_service() + config = subscriptor.load_service("b站动态订阅") if config["enabled"] == False: return None else: @@ -156,8 +153,8 @@ class BilibiliDynamicCheckEnabledTrigger(BaseTrigger): @scheduler.scheduled_job( AndTrigger([IntervalTrigger(seconds=10), BilibiliDynamicCheckEnabledTrigger()]), name="b站动态检查", - max_instances=3, - misfire_grace_time=60, + max_instances=3, # type: ignore + misfire_grace_time=60, # type: ignore ) async def _check_dynamic(): from ATRI.database.models import Subscription @@ -171,7 +168,6 @@ async def _check_dynamic(): d: Subscription = tq.get() logger.info("准备查询UP【{up}】的动态 队列剩余{size}".format(up=d.nickname, size=tq.qsize())) ts = int(d.last_update.timestamp()) - # logger.info("上一次更新的时间戳{time}".format(time=ts)) info: dict = await subscriptor.get_recent_dynamic_by_uid(d.uid) res = [] if info: @@ -181,11 +177,9 @@ async def _check_dynamic(): if len(res) == 0: logger.warning("获取UP【{up}】的动态为空".format(up=d.nickname)) for i in res: - # logger.info("获取UP【{up}】的动态,时间{time},内容{content}".format(up=d.nickname, time=i["timestamp"],content=i["content"][:20])) i["name"] = d.nickname if ts < i["timestamp"]: text, pic_url = subscriptor.generate_output(pattern=i) - # print(text,pic_url) output = Message( [MessageSegment.text(text), MessageSegment.image(pic_url)] ) diff --git a/ATRI/plugins/bilibili_dynamic/data_source.py b/ATRI/plugins/bilibili_dynamic/data_source.py index 49ffce7..a419b45 100644 --- a/ATRI/plugins/bilibili_dynamic/data_source.py +++ b/ATRI/plugins/bilibili_dynamic/data_source.py @@ -11,13 +11,11 @@ import asyncio from typing import Any from operator import itemgetter -__doc__ = """b站订阅动态助手 -""" __session_pool = {} -def get_api(field: str): +def get_api(field: str) -> dict: """ 获取 API。 @@ -33,9 +31,11 @@ def get_api(field: str): if os.path.exists(path): with open(path, encoding="utf8") as f: return json.loads(f.read()) + else: + return dict() -API = get_api("user") +API: dict = get_api("user") def get_session(): @@ -57,12 +57,12 @@ def get_session(): async def bilibili_request( method: str, url: str, - params: dict = None, + params: dict = dict(), data: Any = None, no_csrf: bool = False, json_body: bool = False, **kwargs, -): +) -> dict: """ 向接口发送请求。 @@ -130,21 +130,21 @@ async def bilibili_request( # 检查响应头 Content-Length content_length = resp.headers.get("content-length") if content_length and int(content_length) == 0: - return None + return dict() # 检查响应头 Content-Type content_type = resp.headers.get("content-type") # 不是 application/json - if content_type.lower().index("application/json") == -1: + if content_type.lower().index("application/json") == -1: # type: ignore raise Exception("响应不是 application/json 类型") raw_data = await resp.text() - resp_data: dict + resp_data: dict = dict() if "callback" in params: # JSONP 请求 - resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) + resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) # type: ignore else: # JSON resp_data = json.loads(raw_data) @@ -181,9 +181,9 @@ class User: """ self.uid = uid - self.__self_info = None + self.__self_info = None # 暂时无用 - async def get_user_info(self): + async def get_user_info(self) -> dict: """ 获取用户信息(昵称,性别,生日,签名,头像 URL,空间横幅 URL 等) @@ -216,7 +216,7 @@ class User: "offset_dynamic_id": offset, "need_top": 1 if need_top else 0, } - data = await bilibili_request("GET", url=api["url"], params=params) + data: dict = await bilibili_request("GET", url=api["url"], params=params) # card 字段自动转换成 JSON。 if "cards" in data: for card in data["cards"]: @@ -227,7 +227,7 @@ class User: class BilibiliDynamicSubscriptor(Service): def __init__(self): - Service.__init__(self, "b站动态订阅", __doc__, rule=is_in_service("b站动态订阅")) + Service.__init__(self, "b站动态订阅", "b站订阅动态助手", rule=is_in_service("b站动态订阅")) async def add_subscription(self, uid: int, groupid: int) -> bool: async with DB() as db: @@ -261,7 +261,7 @@ class BilibiliDynamicSubscriptor(Service): async def get_upname_by_uid(self, uid: int) -> str: try: u = User(uid) - info = await u.get_user_info() + info: dict = await u.get_user_info() return info.get("name") except: return "" @@ -334,7 +334,7 @@ class BilibiliDynamicSubscriptor(Service): ret = sorted(ret, key=itemgetter("timestamp")) return ret - def generate_output(self, pattern: dict) -> (str, str): + def generate_output(self, pattern: dict) -> tuple: # 限制摘要的字数 abstractLimit = 40 text_part = """【UP名称】{name}\n【动态类型】{dynamic_type}\n【时间】{time}\n【内容摘要】{content}\n""".format( diff --git a/ATRI/plugins/code_runner/__init__.py b/ATRI/plugins/code_runner/__init__.py index adf6407..77240ec 100644 --- a/ATRI/plugins/code_runner/__init__.py +++ b/ATRI/plugins/code_runner/__init__.py @@ -11,33 +11,43 @@ from .data_source import CodeRunner _flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) -code_runner = CodeRunner().on_command("/code", "在线运行一段代码,帮助:/code help") +code_runner = CodeRunner().on_command("/code", "在线运行一段代码,获取帮助:/code.help") @code_runner.handle([Cooldown(5, prompt=_flmt_notice)]) -async def _code_runner( - matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): - user_id = event.get_user_id() +async def _code_runner(matcher: Matcher, args: Message = CommandArg()): msg = args.extract_plain_text() if msg: matcher.set_arg("opt", args) else: - content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!" + content = "请键入 /code.help 以获取帮助~!" await code_runner.finish(Message(content)) -@code_runner.got("opt") +@code_runner.got("opt", prompt="需要运行的语言及代码?\n获取帮助:/code.help") async def _(event: MessageEvent, opt: str = ArgPlainText("opt")): user_id = event.get_user_id() - msg = opt.split("\n") - if msg[0] == "help": - content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!" - elif msg[0] == "list": - content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().list_supp_lang() - else: - content = MessageSegment.at(user_id) + await CodeRunner().runner(unescape(opt)) + # 拯救傻瓜用户 + if opt == "/code.help": + await code_runner.finish(CodeRunner().help()) + content = MessageSegment.at(user_id) + str(await CodeRunner().runner(unescape(opt))) await code_runner.finish(Message(content)) + + +code_runner_helper = CodeRunner().cmd_as_group("help", "使用说明") + + +@code_runner_helper.handle() +async def _(): + await code_runner_helper.finish(CodeRunner().help()) + + +code_supp_list = CodeRunner().cmd_as_group("list", "查看支持的语言") + + +@code_supp_list.handle() +async def _(): + await code_supp_list.finish(CodeRunner().list_supp_lang()) diff --git a/ATRI/plugins/code_runner/data_source.py b/ATRI/plugins/code_runner/data_source.py index bf3d8b2..87ae792 100644 --- a/ATRI/plugins/code_runner/data_source.py +++ b/ATRI/plugins/code_runner/data_source.py @@ -34,13 +34,11 @@ SUPPORTED_LANGUAGES = { } -__doc__ = """在线跑代码 -""" - - class CodeRunner(Service): def __init__(self): - Service.__init__(self, "在线跑代码", __doc__, rule=is_in_service("在线跑代码")) + Service.__init__( + self, "在线跑代码", "在线跑代码", rule=is_in_service("在线跑代码"), main_cmd="/code" + ) @staticmethod def help() -> str: diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 6ec5401..37c18a4 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -2,6 +2,7 @@ import os import json import shutil import asyncio +from time import sleep from datetime import datetime from pydantic.main import BaseModel from random import choice, randint @@ -33,6 +34,7 @@ from ATRI.log import logger as log from ATRI.config import BotSelfConfig from ATRI.utils import MessageChecker from ATRI.utils.apscheduler import scheduler +from ATRI.utils.check_update import CheckUpdate driver = ATRI.driver() @@ -50,6 +52,24 @@ os.makedirs(TEMP_PATH, exist_ok=True) @driver.on_startup async def startup(): log.info(f"Now running: {ATRI.__version__}") + + try: + log.info("Starting to check update...") + log.info(await CheckUpdate.show_latest_commit_info()) + sleep(1) + + l_v, l_v_t = await CheckUpdate.show_latest_version() + if l_v != ATRI.__version__: + log.warning("New version has been released, please update.") + log.warning(f"Latest version: {l_v} Update time: {l_v_t}") + sleep(3) + except Exception: + log.error("检查 更新/最新推送 失败...") + + if not scheduler.running: + scheduler.start() + log.info("Scheduler Started.") + log.info("アトリは、高性能ですから!") @@ -106,14 +126,9 @@ class GroupRequestInfo(BaseModel): is_approve: bool -__doc__ = """ -对bot基础/必须请求进行处理 -""" - - class Essential(Service): def __init__(self): - Service.__init__(self, "基础部件", __doc__) + Service.__init__(self, "基础部件", "对bot基础/必须请求进行处理") friend_add_event = Essential().on_request("好友添加", "好友添加检测") @@ -279,6 +294,7 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent): except BaseException: return + log.debug(f"Recall raw msg:\n{repo}") user = event.user_id group = event.group_id repo = repo["message"] @@ -310,6 +326,7 @@ async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent): except BaseException: return + log.debug(f"Recall raw msg:\n{repo}") user = event.user_id repo = repo["message"] @@ -347,7 +364,7 @@ async def _(): await acc_recall.finish("现在可以接受撤回信息啦!") [email protected]_job("interval", name="清除缓存", minutes=30, misfire_grace_time=5) [email protected]_job("interval", name="清除缓存", minutes=30, misfire_grace_time=5) # type: ignore async def _clear_cache(): try: shutil.rmtree(TEMP_PATH) diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py index 8edc88a..75fa4ee 100644 --- a/ATRI/plugins/funny/data_source.py +++ b/ATRI/plugins/funny/data_source.py @@ -17,14 +17,9 @@ FUNNY_DIR = Path(".") / "data" os.makedirs(FUNNY_DIR, exist_ok=True) -__doc__ = """ -乐1乐,莫当真 -""" - - class Funny(Service): def __init__(self): - Service.__init__(self, "乐", __doc__, rule=is_in_service("乐")) + Service.__init__(self, "乐", "乐1乐,莫当真", rule=is_in_service("乐")) @staticmethod async def idk_laugh(name: str) -> str: diff --git a/ATRI/plugins/help/__init__.py b/ATRI/plugins/help/__init__.py index d4b8b36..c6f915e 100644 --- a/ATRI/plugins/help/__init__.py +++ b/ATRI/plugins/help/__init__.py @@ -3,34 +3,31 @@ from nonebot.adapters.onebot.v11 import MessageEvent from .data_source import Helper -main_help = Helper().on_command("菜单", "获取食用bot的方法", aliases={"menu"}) +menu = Helper().on_command("菜单", "获取食用bot的方法", aliases={"menu"}) -@main_help.handle() -async def _main_help(): - repo = Helper().menu() - await main_help.finish(repo) +async def _(): + await menu.finish(Helper().menu()) -about_me = Helper().on_command("关于", "获取关于bot的信息", aliases={"about"}) +about = Helper().on_command("关于", "获取关于bot的信息", aliases={"about"}) -@about_me.handle() -async def _about_me(): - repo = Helper().about() - await about_me.finish(repo) +async def _(): + await about.finish(Helper().about()) -service_list = Helper().on_command("服务列表", "查看所有可用服务", aliases={"功能列表"}) +service_list = Helper().on_command("服务列表", "获取服务列表", aliases={"功能列表"}) @service_list.handle() -async def _service_list(): - repo = Helper().service_list() - await service_list.finish(repo) +async def _(): + await service_list.finish(Helper().service_list()) -service_info = Helper().on_command("帮助", "获取服务详细帮助", aliases={"help"}) +service_info = Helper().on_command("帮助", "获取对应服务详细信息", aliases={"help"}) @service_info.handle() diff --git a/ATRI/plugins/help/data_source.py b/ATRI/plugins/help/data_source.py index f96d59e..e3dd1ff 100644 --- a/ATRI/plugins/help/data_source.py +++ b/ATRI/plugins/help/data_source.py @@ -72,7 +72,7 @@ class Helper(Service): ) table = tabulate( services, - headers=["服务名称", "开启状态", "仅支持管理员"], + headers=["服务名称", "开启状态(全局)", "仅支持管理员"], tablefmt="plain", showindex=True, ) @@ -91,7 +91,7 @@ class Helper(Service): service_enabled = data.get("enabled", True) _service_cmd_list = list(data.get("cmd_list", {"error"})) - service_cmd_list = "、".join(map(str, _service_cmd_list)) + service_cmd_list = "\n".join(map(str, _service_cmd_list)) repo = SERVICE_INFO_FORMAT.format( service=service_name, diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/kimo/__init__.py index 4219d56..b1c5698 100644 --- a/ATRI/plugins/chat/__init__.py +++ b/ATRI/plugins/kimo/__init__.py @@ -6,27 +6,27 @@ from nonebot.adapters.onebot.v11 import MessageEvent, Message from nonebot.adapters.onebot.v11.helpers import Cooldown from ATRI.utils.apscheduler import scheduler -from .data_source import Chat +from .data_source import Kimo _chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) -chat = Chat().on_message("文爱", "闲聊(文爱") +kimo = Kimo().on_message("文爱", "闲聊(文爱") [email protected]([Cooldown(3, prompt=_chat_flmt_notice)]) [email protected]([Cooldown(3, prompt=_chat_flmt_notice)]) async def _chat(event: MessageEvent): user_id = event.get_user_id() msg = str(event.message) - repo = await Chat().deal(msg, user_id) + repo = await Kimo().deal(msg, user_id) try: - await chat.finish(repo) + await kimo.finish(repo) except Exception: return -my_name_is = Chat().on_command("叫我", "更改闲聊(文爱)时的称呼", aliases={"我是"}, priority=1) +my_name_is = Kimo().on_command("叫我", "更改kimo时的称呼", aliases={"我是"}, priority=1) @my_name_is.handle([Cooldown(3, prompt=_chat_flmt_notice)]) @@ -47,28 +47,13 @@ async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")): "很不错的称呼呢w", ] ) - Chat().name_is(user_id, new_name) + Kimo().name_is(user_id, new_name) await my_name_is.finish(repo) -say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1) - - [email protected]([Cooldown(3, prompt=_chat_flmt_notice)]) -async def _ready_say(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("say", args) - - [email protected]("say", "想要咱复读啥呢...") -async def _deal_say(msg: str = ArgPlainText("say")): - await say.finish(msg) - - [email protected]_job("interval", name="闲聊词库检查更新", hours=3, misfire_grace_time=60) [email protected]_job("interval", name="kimo词库检查更新", hours=3, misfire_grace_time=60) # type: ignore async def _check_kimo(): try: - await Chat().update_data() + await Kimo().update_data() except BaseException: pass diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/kimo/data_source.py index 576db03..6dd448a 100644 --- a/ATRI/plugins/chat/data_source.py +++ b/ATRI/plugins/kimo/data_source.py @@ -11,19 +11,15 @@ from ATRI.utils import request from ATRI.exceptions import ReadFileError, WriteError -__doc__ = """ -好像有点涩? -""" - -CHAT_PATH = Path(".") / "data" / "database" / "chat" +CHAT_PATH = Path(".") / "data" / "database" / "kimo" os.makedirs(CHAT_PATH, exist_ok=True) KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" -class Chat(Service): +class Kimo(Service): def __init__(self): Service.__init__( - self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5 + self, "kimo", "好像有点涩?", rule=to_bot() & is_in_service("kimo"), priority=5 ) @staticmethod @@ -73,7 +69,7 @@ class Chat(Service): with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(data, indent=4)) - log.info("闲聊词库更新完成") + log.info("kimo词库更新完成") @staticmethod def name_is(user_id: str, new_name: str): diff --git a/ATRI/plugins/manage/data_source.py b/ATRI/plugins/manage/data_source.py index bc9f801..395a165 100644 --- a/ATRI/plugins/manage/data_source.py +++ b/ATRI/plugins/manage/data_source.py @@ -6,6 +6,7 @@ from datetime import datetime from ATRI.service import Service, ServiceTools from ATRI.exceptions import ReadFileError, load_error + MANAGE_DIR = Path(".") / "data" / "database" / "manege" ESSENTIAL_DIR = Path(".") / "data" / "database" / "essential" os.makedirs(MANAGE_DIR, exist_ok=True) @@ -17,12 +18,10 @@ Time: {time} {content} """.strip() -__doc__ = """控制bot的各项服务""" - class Manage(Service): def __init__(self): - Service.__init__(self, "管理", __doc__, True) + Service.__init__(self, "管理", "控制bot的各项服务", True) @staticmethod def _load_block_user_list() -> dict: diff --git a/ATRI/plugins/polaroid/__init__.py b/ATRI/plugins/polaroid/__init__.py new file mode 100644 index 0000000..a3d2d61 --- /dev/null +++ b/ATRI/plugins/polaroid/__init__.py @@ -0,0 +1,27 @@ +from random import choice + +from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment, Message +from nonebot.adapters.onebot.v11.helpers import Cooldown + +from ATRI.rule import to_bot +from .data_source import Polaroid, TEMP_PATH + + +_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) + + +polaroid = Polaroid().on_command("拍立得", "获取一张以自己头像的拍立得图片!需at", rule=to_bot()) + + [email protected]([Cooldown(15, prompt=_flmt_notice)]) +async def _(event: MessageEvent): + user_id = event.get_user_id() + + temp_p = TEMP_PATH / f"{user_id}.png" + if not temp_p.is_file(): + pol = await Polaroid().generate(user_id) + result = MessageSegment.image(pol) + else: + result = MessageSegment.image(temp_p) + + await polaroid.finish(Message(result)) diff --git a/ATRI/plugins/polaroid/data_source.py b/ATRI/plugins/polaroid/data_source.py new file mode 100644 index 0000000..82e0337 --- /dev/null +++ b/ATRI/plugins/polaroid/data_source.py @@ -0,0 +1,60 @@ +import asyncio + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.utils import request +from ATRI.log import logger as log +from ATRI.exceptions import RequestError, WriteError +from .image_dealer import image_dealer + + +TENCENT_AVATER_URL = "https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640" +SOURCE_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/" + + +class Polaroid(Service): + def __init__(self): + Service.__init__(self, "拍立得", "根据头像生成拍立得风格照片!", rule=is_in_service("拍立得")) + + @classmethod + async def _request(cls, user_id: str) -> bytes: + try: + res = await request.get(TENCENT_AVATER_URL.format(user_id=user_id)) + except RequestError: + raise RequestError("Request failed!") + data = res.read() + return data + + @classmethod + async def generate(cls, user_id: str): + await init_source() + + user_avater = await cls._request(user_id) + + result = image_dealer(user_avater, user_id) + return f"file:///{result}" + + +from .image_dealer import TEMP_PATH, POLAROID_DIR + + +async def init_source(): + files = ["frame-0.PNG", "frame-1.PNG", "font-0.ttf"] + + for i in files: + path = POLAROID_DIR / i + if not path.is_file(): + log.warning("插件 polaroid 缺少所需资源,装载中") + + url = SOURCE_URL + i + data = await request.get(url) + try: + with open(path, "wb") as w: + w.write(data.read()) + log.info("所需资源装载完成") + except WriteError: + raise WriteError("装载资源失败") + + +loop = asyncio.get_event_loop() +loop.create_task(init_source()) diff --git a/ATRI/plugins/polaroid/image_dealer.py b/ATRI/plugins/polaroid/image_dealer.py new file mode 100644 index 0000000..c1ab50b --- /dev/null +++ b/ATRI/plugins/polaroid/image_dealer.py @@ -0,0 +1,46 @@ +from random import choice +from pathlib import Path +from datetime import datetime + +from PIL import Image, ImageFont, ImageDraw + + +POLAROID_DIR = Path(".") / "data" / "database" / "polaroid" +TEMP_PATH = Path(".") / "data" / "temp" +POLAROID_DIR.mkdir(exist_ok=True) + + +def image_dealer(user_img: bytes, user_id): + user_p = str((TEMP_PATH / f"{user_id}.png").absolute()) + + with open(user_p, "wb") as w: + w.write(user_img) + + bg = Image.new("RGBA", (689, 1097), color=(0, 0, 0, 0)) + + pol_frame = Image.open(POLAROID_DIR / f"frame-{choice([0, 1])}.PNG").convert("RGBA") + user = Image.open(user_p).convert("RGBA").resize((800, 800), Image.ANTIALIAS) + + _, _, _, a = pol_frame.split() + + f_path = str(POLAROID_DIR / "font-0.ttf") + f_date = ImageFont.truetype(f_path, 100) # type: ignore + f_msg = ImageFont.truetype(f_path, 110) # type: ignore + + bg.paste(user, (0, 53)) + bg.paste(pol_frame, (0, 0), a) + + msg = f"今天辛苦了{choice(['!', '❤️', '✨'])}" + now = datetime.now().strftime("%m / %d") + + img = ImageDraw.Draw(bg) + img.text((25, 805), now, font=f_date, fill=(99, 184, 255)) + img.text( + (15, 915), + msg, + font=f_msg, + fill=(255, 100, 97), + ) + + bg.save(user_p) + return user_p diff --git a/ATRI/plugins/repo.py b/ATRI/plugins/repo.py index 9c5818a..7087afa 100644 --- a/ATRI/plugins/repo.py +++ b/ATRI/plugins/repo.py @@ -15,7 +15,7 @@ _repo_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~ REPO_FORMAT = """ 来自用户{user}反馈: {msg} -""" +""".strip() class Repo(Service): diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index d3ad38f..071af98 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -1,4 +1,3 @@ -from re import findall from random import choice from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py index 80b6b52..8350839 100644 --- a/ATRI/plugins/saucenao/data_source.py +++ b/ATRI/plugins/saucenao/data_source.py @@ -1,28 +1,33 @@ from random import choice -from ATRI.service import Service +from ATRI.service import Service, ServiceTools from ATRI.rule import is_in_service from ATRI.exceptions import RequestError from ATRI.utils import request +from ATRI.config import SauceNAO as sa +from ATRI.log import logger as log -URL = "https://saucenao.com/search.php" -__doc__ = """ -以图搜图,仅限二刺螈 -""" +URL = "https://saucenao.com/search.php" class SaouceNao(Service): def __init__( self, - api_key: str = None, + api_key: str = str(), output_type=2, testmode=1, dbmaski=32768, db=5, numres=5, ): - Service.__init__(self, "以图搜图", __doc__, rule=is_in_service("以图搜图")) + Service.__init__(self, "以图搜图", "以图搜图,仅限二刺螈", rule=is_in_service("以图搜图")) + + if not sa.key: + data = ServiceTools.load_service("以图搜图") + data["enabled"] = False + ServiceTools.save_service(data, "以图搜图") + log.warning("插件 SauceNao 所需的 key 未配置,将被全局禁用,后续填写请手动启用") params = dict() params["api_key"] = api_key diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py index cf38fd6..00eba07 100644 --- a/ATRI/plugins/setu/__init__.py +++ b/ATRI/plugins/setu/__init__.py @@ -9,7 +9,6 @@ from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message, MessageSegme from nonebot.adapters.onebot.v11.helpers import extract_image_urls, Cooldown from ATRI.config import BotSelfConfig -from ATRI.utils.apscheduler import scheduler from .data_source import Setu @@ -171,27 +170,6 @@ async def _deal_setting(msg: str = ArgPlainText("catcher_set")): await catcher_setting.finish(repo) [email protected]_job( - "interval", name="涩批诱捕器", hours=1, misfire_grace_time=60, args=[Bot] -) -async def _scheduler_setu(bot): - try: - group_list = await bot.get_group_list() - lucky_group = choice(group_list) - group_id = lucky_group["group_id"] - setu = await Setu().scheduler() - if not setu: - return - - msg_0 = await bot.send_group_msg(group_id=int(group_id), message=Message(setu)) - message_id = msg_0["message_id"] - await asyncio.sleep(60) - await bot.delete_msg(message_id=message_id) - - except Exception: - pass - - _ag_l = ["涩图来", "来点涩图", "来份涩图"] _ag_patt = r"来[张点丶份](.*?)的[涩色🐍]图" diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py index b7c5162..e7d769d 100644 --- a/ATRI/plugins/setu/data_source.py +++ b/ATRI/plugins/setu/data_source.py @@ -1,5 +1,4 @@ import asyncio -from random import choice from nonebot.adapters.onebot.v11 import Bot, MessageSegment from ATRI.service import Service @@ -78,30 +77,6 @@ class Setu(Service): data = await detect_image(url, file_size) return data - @classmethod - async def scheduler(cls) -> str: - """ - 每隔指定时间随机抽取一个群发送涩图. - 格式: - 是{tag}哦~❤ - {setu} - """ - res = await request.get(LOLICON_URL) - data: dict = res.json() - temp_data: dict = data.get("data", list()) - if not temp_data: - return "" - - tag = choice(temp_data.get("tags", ["女孩子"])) - - url = temp_data[0]["urls"].get( - "original", - cls._use_proxy(DEFAULT_SETU), - ) - setu = MessageSegment.image(url, timeout=114514) - repo = f"是{tag}哦~❤\n{setu}" - return repo - @staticmethod async def async_recall(bot: Bot, event_id): await asyncio.sleep(30) diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py index 3e18e1d..05ece79 100644 --- a/ATRI/plugins/status/__init__.py +++ b/ATRI/plugins/status/__init__.py @@ -22,8 +22,8 @@ async def _status(): info_msg = "アトリは高性能ですから!" [email protected]_job("interval", name="状态检查", minutes=10, misfire_grace_time=15) -async def _status_checking(): [email protected]_job("interval", name="状态检查", minutes=10, misfire_grace_time=15) # type: ignore +async def _check_runtime(): msg, stat = IsSurvive().get_status() if not stat: await status.finish(msg) diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py index 74882de..3595c0c 100644 --- a/ATRI/plugins/status/data_source.py +++ b/ATRI/plugins/status/data_source.py @@ -8,12 +8,24 @@ from ATRI.rule import is_in_service from ATRI.exceptions import GetStatusError -__doc__ = "检查咱自身状态" +_status_msg = """ +> Status Overview + +[CPU: {cpu}%] +[Memory: {mem}%] +[Disk usage: {disk}%] + +[Net sent: {inteSENT}MB] +[Net recv: {inteRECV}MB] + +[Runtime: {up_time}] +{msg} +""".strip() class IsSurvive(Service): def __init__(self): - Service.__init__(self, "状态", __doc__, rule=is_in_service("状态")) + Service.__init__(self, "状态", "检查自身状态", rule=is_in_service("状态")) @staticmethod def ping() -> str: @@ -55,14 +67,14 @@ class IsSurvive(Service): log.info("资源占用正常") is_ok = True - msg0 = ( - "Self status:\n" - f"* CPU: {cpu}%\n" - f"* MEM: {mem}%\n" - f"* DISK: {disk}%\n" - f"* netSENT: {inteSENT}MB\n" - f"* netRECV: {inteRECV}MB\n" - f"* Runtime: {up_time}\n" - ) + msg + msg0 = _status_msg.format( + cpu=cpu, + mem=mem, + disk=disk, + inteSENT=inteSENT, + inteRECV=inteRECV, + up_time=up_time, + msg=msg, + ) return msg0, is_ok diff --git a/ATRI/plugins/util/data_source.py b/ATRI/plugins/util/data_source.py index 7b36ebe..4a73c26 100644 --- a/ATRI/plugins/util/data_source.py +++ b/ATRI/plugins/util/data_source.py @@ -8,14 +8,9 @@ from ATRI.service import Service from ATRI.rule import is_in_service -__doc__ = """ -非常实用(?)的工具们! -""" - - class Utils(Service): def __init__(self): - Service.__init__(self, "小工具", __doc__, rule=is_in_service("小工具")) + Service.__init__(self, "小工具", "非常实用(?)的工具们!", rule=is_in_service("小工具")) @staticmethod def roll_dice(par: str) -> str: diff --git a/ATRI/service.py b/ATRI/service.py index 9f680c8..c9509df 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -4,7 +4,7 @@ import json from pathlib import Path from types import ModuleType from pydantic import BaseModel -from typing import List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING +from typing import List, Set, Tuple, Type, Union, Optional from nonebot.matcher import Matcher from nonebot.permission import Permission @@ -19,9 +19,6 @@ from nonebot.rule import Rule, command, keyword, regex from ATRI.exceptions import ReadFileError, WriteError -if TYPE_CHECKING: - from nonebot.adapters import Bot, Event - SERVICE_DIR = Path(".") / "data" / "service" SERVICES_DIR = SERVICE_DIR / "services" @@ -69,7 +66,7 @@ class Service: def __init__( self, service: str, - docs: str = None, + docs: str, only_admin: bool = False, rule: Optional[Union[Rule, T_RuleChecker]] = None, permission: Optional[Permission] = None, @@ -77,6 +74,7 @@ class Service: temp: bool = False, priority: int = 1, state: Optional[T_State] = None, + main_cmd: str = str(), ): self.service = service self.docs = docs @@ -87,13 +85,9 @@ class Service: self.temp = temp self.priority = priority self.state = state + self.main_cmd = (main_cmd,) - def _generate_service_config(self, service: str = None, docs: str = None) -> None: - if not service: - service = self.service - if not docs: - docs = self.docs or str() - + def _generate_service_config(self, service: str, docs: str = str()) -> None: path = SERVICES_DIR / f"{service}.json" data = ServiceInfo( service=service, @@ -110,31 +104,28 @@ class Service: except WriteError: raise WriteError("Write service info failed!") - def save_service(self, service_data: dict, service: str = None) -> None: + def save_service(self, service_data: dict, service: str) -> None: if not service: service = self.service path = SERVICES_DIR / f"{service}.json" if not path.is_file(): - self._generate_service_config() + self._generate_service_config(service, self.docs) with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(service_data, indent=4)) - def load_service(self, service: str = None) -> dict: - if not service: - service = self.service - + def load_service(self, service: str) -> dict: path = SERVICES_DIR / f"{service}.json" if not path.is_file(): - self._generate_service_config() + self._generate_service_config(service, self.docs) try: data = json.loads(path.read_bytes()) except ReadFileError: with open(path, "w", encoding="utf-8") as w: w.write(json.dumps({})) - self._generate_service_config() + self._generate_service_config(service, self.docs) data = json.loads(path.read_bytes()) return data @@ -142,25 +133,25 @@ class Service: data = self.load_service(self.service) temp_data: dict = data["cmd_list"] temp_data.update(cmds) - self.save_service(data) + self.save_service(data, self.service) def _load_cmds(self) -> dict: path = SERVICES_DIR / f"{self.service}.json" if not path.is_file(): - self._generate_service_config() + self._generate_service_config(self.service, self.docs) data = json.loads(path.read_bytes()) return data["cmd_list"] def on_message( self, - name: str = None, + name: str = str(), docs: str = str(), rule: Optional[Union[Rule, T_RuleChecker]] = None, permission: Optional[Union[Permission, T_PermissionChecker]] = None, handlers: Optional[List[Union[T_Handler, Dependent]]] = None, block: bool = True, - priority: int = None, + priority: int = 1, state: Optional[T_State] = None, ) -> Type[Matcher]: if not rule: @@ -169,8 +160,6 @@ class Service: permission = self.permission if not handlers: handlers = self.handlers - if not priority: - priority = self.priority if not state: state = self.state @@ -253,11 +242,13 @@ class Service: if not aliases: aliases = set() + if isinstance(cmd, tuple): + cmd = ".".join(map(str, cmd)) + cmd_list[cmd] = CommandInfo( type="command", docs=docs, aliases=list(aliases) ).dict() self._save_cmds(cmd_list) - commands = set([cmd]) | (aliases or set()) return self.on_message(rule=command(*commands) & rule, block=True, **kwargs) @@ -297,6 +288,15 @@ class Service: return self.on_message(rule=regex(pattern, flags) & rule, **kwargs) + def cmd_as_group(self, cmd: str, docs: str, **kwargs) -> Type[Matcher]: + sub_cmd = (cmd,) if isinstance(cmd, str) else cmd + _cmd = self.main_cmd + sub_cmd + + if "aliases" in kwargs: + del kwargs["aliases"] + + return self.on_command(_cmd, docs, **kwargs) + class ServiceTools(object): @staticmethod @@ -327,7 +327,7 @@ class ServiceTools(object): return data @classmethod - def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool: + def auth_service(cls, service, user_id: str = str(), group_id: str = str()) -> bool: data = cls.load_service(service) auth_global = data.get("enabled", True) diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py index 565de97..29c9f05 100644 --- a/ATRI/utils/__init__.py +++ b/ATRI/utils/__init__.py @@ -5,18 +5,15 @@ import yaml import aiofiles import time from pathlib import Path -from aiohttp import FormData from datetime import datetime from PIL import Image, ImageFile from aiofiles.threadpool.text import AsyncTextIOWrapper -from . import request - def timestamp2datetimestr(timestamp: int) -> str: format = "%Y-%m-%d %H:%M:%S" - timestamp = time.localtime(timestamp) - dt = time.strftime(format, timestamp) + tt = time.localtime(timestamp) + dt = time.strftime(format, tt) return dt diff --git a/ATRI/utils/apscheduler.py b/ATRI/utils/apscheduler.py index 186ff8e..e1959c1 100644 --- a/ATRI/utils/apscheduler.py +++ b/ATRI/utils/apscheduler.py @@ -1,34 +1,13 @@ -""" -Fork from: https://github.com/nonebot/plugin-apscheduler -""" import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler -from nonebot import get_driver -from nonebot.log import logger, LoguruHandler +from nonebot.log import LoguruHandler -apscheduler_autostart: bool = True -# apscheduler_config: dict = {"apscheduler.timezone": "Asia/Shanghai"} - - -driver = get_driver() scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") -async def _start_scheduler(): - if not scheduler.running: - # scheduler.configure(apscheduler_config) - scheduler.start() - logger.info("Scheduler Started.") - - -if apscheduler_autostart: - driver.on_startup(_start_scheduler) - aps_logger = logging.getLogger("apscheduler") aps_logger.setLevel(logging.DEBUG) aps_logger.handlers.clear() aps_logger.addHandler(LoguruHandler()) - -from apscheduler.triggers.date import DateTrigger diff --git a/ATRI/utils/check_update.py b/ATRI/utils/check_update.py new file mode 100644 index 0000000..a8575ab --- /dev/null +++ b/ATRI/utils/check_update.py @@ -0,0 +1,54 @@ +from ATRI.exceptions import RequestError + +from . import request + + +REPO_COMMITS_URL = "https://api.github.com/repos/Kyomotoi/ATRI/commits" +REPO_RELEASE_URL = "https://api.github.com/repos/Kyomotoi/ATRI/releases" + + +class CheckUpdate: + @staticmethod + async def _get_commits_info() -> dict: + req = await request.get(REPO_COMMITS_URL) + return req.json() + + @staticmethod + async def _get_release_info() -> dict: + req = await request.get(REPO_RELEASE_URL) + return req.json() + + @classmethod + async def show_latest_commit_info(cls) -> str: + try: + data = await cls._get_commits_info() + except RequestError: + raise RequestError("Getting commit info timeout...") + + try: + commit_data: dict = data[0] + except Exception: + raise Exception("GitHub has been error!!!") + + c_info = commit_data["commit"] + c_msg = c_info["message"] + c_sha = commit_data["sha"][0:5] + c_time = c_info["author"]["date"] + + return f"Latest commit {c_msg} | sha: {c_sha} | time: {c_time}" + + @classmethod + async def show_latest_version(cls) -> tuple: + try: + data = await cls._get_release_info() + except RequestError: + raise RequestError("Getting release list timeout...") + + try: + release_data: dict = data[0] + except Exception: + raise Exception("GitHub has been error!!!") + + l_v = release_data["tag_name"] + l_v_t = release_data["published_at"] + return l_v, l_v_t @@ -22,7 +22,7 @@ 为QQ群中复现一个优秀的功能性机器人是本项目的目标 -## 声明(Attaction) +## 声明(Warning) **一切开发旨在学习,请勿用于非法用途** @@ -55,6 +55,7 @@ - 以图搜番 - ATRI语(加密、解密,改自[`rcnb`](https://github.com/rcnbapp/RCNB.js)) - 简单骰子 + - b站动态订阅 - 娱乐: - 看不懂的笑话 @@ -69,7 +70,7 @@ - [ ] 网页控制台 - [ ] RSS订阅 - - [ ] B站动态订阅 + - [x] B站动态订阅 - [ ] 冷重启 - [ ] 进裙验证(问题可自定义) - [ ] 好感度系统(目前优先在[`go-ATRI`](https://github.com/Kyomotoi/go-ATRI)上实现) diff --git a/test/test_plugin_chat.py b/test/test_plugin_chat.py deleted file mode 100644 index e04f1d3..0000000 --- a/test/test_plugin_chat.py +++ /dev/null @@ -1,84 +0,0 @@ -import pytest -from nonebug import App -from nonebot.adapters.onebot.v11 import MessageSegment - -from .utils import make_fake_message, make_fake_event - - -async def test_chat(app: App): - from ATRI.plugins.chat import chat - - Message = make_fake_message() - - async with app.test_matcher(chat) as ctx: - bot = ctx.create_bot() - - msg = Message("爱你") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True) - - -async def test_my_name_is(app: App): - from ATRI.plugins.chat import my_name_is - - Message = make_fake_message() - - async with app.test_matcher(my_name_is) as ctx: - bot = ctx.create_bot() - - msg = Message("叫我") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True) - - msg = Message("欧尼酱") - event = make_fake_event(_message=msg)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True) - - -async def test_say(app: App): - from ATRI.plugins.chat import say - - Message = make_fake_message() - - async with app.test_matcher(say) as ctx: - bot = ctx.create_bot() - - msg = Message("说") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "想要咱复读啥呢...", True) - - msg = Message("nya~") - event = make_fake_event(_message=msg)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "nya~", True) - - async with app.test_matcher(say) as ctx: - bot = ctx.create_bot() - - msg = Message("说") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "想要咱复读啥呢...", True) - - msg = Message( - MessageSegment.image( - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png" - ) - ) - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "不要...", True) diff --git a/test/test_plugin_code_runner.py b/test/test_plugin_code_runner.py index 584ebb0..ec9eb2d 100644 --- a/test/test_plugin_code_runner.py +++ b/test/test_plugin_code_runner.py @@ -17,12 +17,12 @@ async def test_code_runner(app: App): event = make_fake_event(_message=msg)() ctx.receive_event(bot, event) - ctx.should_call_send(event, "请键入 /code help 以获取帮助~!", True) + ctx.should_call_send(event, "请键入 /code.help 以获取帮助~!", True) async with app.test_matcher(code_runner) as ctx: bot = ctx.create_bot() - msg = Message("/code help") + msg = Message("/code.help") event = make_fake_event(_message=msg)() ctx.receive_event(bot, event) @@ -34,14 +34,14 @@ async def test_code_runner(app: App): For example: /code python print('hello world') - """, + """.strip(), True, ) async with app.test_matcher(code_runner) as ctx: bot = ctx.create_bot() - msg = Message("/code list") + msg = Message("/code.list") event = make_fake_event(_message=msg)() ctx.receive_event(bot, event) @@ -56,7 +56,7 @@ async def test_code_runner(app: App): julia, kotlin, lua, perl, php, python, ruby, rust, scala, swift, typescript - """, + """.strip(), True, ) diff --git a/test/test_plugin_help.py b/test/test_plugin_help.py index 82a0467..3d975ac 100644 --- a/test/test_plugin_help.py +++ b/test/test_plugin_help.py @@ -6,11 +6,11 @@ from .utils import make_fake_message, make_fake_event @pytest.mark.asyncio async def test_main_help(app: App): - from ATRI.plugins.help import main_help + from ATRI.plugins.help import menu Message = make_fake_message() - async with app.test_matcher(main_help) as ctx: + async with app.test_matcher(menu) as ctx: bot = ctx.create_bot() msg = Message("菜单") @@ -25,7 +25,7 @@ async def test_main_help(app: App): 服务列表 -以查看所有可用服务 帮助 [服务] -以查看对应服务帮助 Tip: 均需要at触发。@bot 菜单 以打开此页面 - """, + """.strip(), True, ) @@ -34,7 +34,7 @@ async def test_main_help(app: App): async def test_about_me(app: App): from ATRI import __version__ from ATRI.config import BotSelfConfig - from ATRI.plugins.help import about_me + from ATRI.plugins.help import about temp_list = list() for i in BotSelfConfig.nickname: @@ -43,7 +43,7 @@ async def test_about_me(app: App): Message = make_fake_message() - async with app.test_matcher(about_me) as ctx: + async with app.test_matcher(about) as ctx: bot = ctx.create_bot() msg = Message("关于") @@ -57,8 +57,9 @@ async def test_about_me(app: App): 可以称呼咱:{nickname} 咱的型号是:{__version__} 想进一步了解: - https://github.com/Kyomotoi/ATRI - """, + atri.kyomotoi.moe + 进不去: project-atri-docs.vercel.app + """.strip(), True, ) @@ -87,7 +88,10 @@ async def test_service_list(app: App): ] ) table = tabulate( - services, headers=["服务名称", "开启状态", "仅支持管理员"], tablefmt="plain", showindex=True + services, + headers=["服务名称", "开启状态(全局)", "仅支持管理员"], + tablefmt="plain", + showindex=True, ) output = f"咱搭载了以下服务~\n{table}\n@bot 帮助 [服务] -以查看对应服务帮助" @@ -139,7 +143,8 @@ async def test_service_info(app: App): 服务名:状态 说明:检查咱自身状态 可用命令: - /ping、/status + /ping + /status 是否全局启用:True Tip: @bot 帮助 [服务] [命令] 以查看对应命令详细信息 """, diff --git a/test/test_plugin_kimo.py b/test/test_plugin_kimo.py new file mode 100644 index 0000000..ad6ff0b --- /dev/null +++ b/test/test_plugin_kimo.py @@ -0,0 +1,43 @@ +import pytest +from nonebug import App +from nonebot.adapters.onebot.v11 import MessageSegment + +from .utils import make_fake_message, make_fake_event + + +async def test_chat(app: App): + from ATRI.plugins.kimo import kimo + + Message = make_fake_message() + + async with app.test_matcher(kimo) as ctx: + bot = ctx.create_bot() + + msg = Message("爱你") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True) + + +async def test_my_name_is(app: App): + from ATRI.plugins.kimo import my_name_is + + Message = make_fake_message() + + async with app.test_matcher(my_name_is) as ctx: + bot = ctx.create_bot() + + msg = Message("叫我") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True) + + msg = Message("欧尼酱") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True) |