summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-02-03 14:36:24 +0800
committerKyomotoi <[email protected]>2022-02-03 14:36:24 +0800
commit3e32ca3964ff8f40e0b491e87f153040f2348fd0 (patch)
treed04d19ba5a25f6f1f6a9c4f8c398d49eb252df3c
parentf5a020d45f7294214bbcd488955b9c391d651a6d (diff)
downloadATRI-3e32ca3964ff8f40e0b491e87f153040f2348fd0.tar.gz
ATRI-3e32ca3964ff8f40e0b491e87f153040f2348fd0.tar.bz2
ATRI-3e32ca3964ff8f40e0b491e87f153040f2348fd0.zip
🔖 更新版本:
更新记录请参考文档: atri.kyomotoi.moe/changelog/overview/
-rw-r--r--.gitignore5
-rw-r--r--ATRI/__init__.py15
-rw-r--r--ATRI/config.py17
-rw-r--r--ATRI/exceptions.py9
-rw-r--r--ATRI/log.py3
-rw-r--r--ATRI/plugins/anime_search.py35
-rw-r--r--ATRI/plugins/broadcast.py134
-rw-r--r--ATRI/plugins/chat/__init__.py58
-rw-r--r--ATRI/plugins/code_runner/__init__.py9
-rw-r--r--ATRI/plugins/essential.py90
-rw-r--r--ATRI/plugins/funny/__init__.py42
-rw-r--r--ATRI/plugins/funny/data_source.py6
-rw-r--r--ATRI/plugins/help/__init__.py11
-rw-r--r--ATRI/plugins/help/data_source.py6
-rw-r--r--ATRI/plugins/manage/__init__.py150
-rw-r--r--ATRI/plugins/repo.py48
-rw-r--r--ATRI/plugins/rich/__init__.py4
-rw-r--r--ATRI/plugins/saucenao/__init__.py31
-rw-r--r--ATRI/plugins/setu/__init__.py91
-rw-r--r--ATRI/plugins/setu/data_source.py23
-rw-r--r--ATRI/plugins/setu/tf_dealer.py10
-rw-r--r--ATRI/plugins/status/__init__.py6
-rw-r--r--ATRI/plugins/util/__init__.py97
-rw-r--r--ATRI/plugins/wife/__init__.py38
-rw-r--r--ATRI/plugins/wife/data_source.py2
-rw-r--r--ATRI/rule.py8
-rw-r--r--ATRI/service.py51
-rw-r--r--ATRI/utils/__init__.py13
-rw-r--r--ATRI/utils/apscheduler.py8
-rw-r--r--ATRI/utils/request.py9
-rw-r--r--changelog.md77
-rw-r--r--config.yml14
-rw-r--r--main.py2
-rw-r--r--pyproject.toml3
-rw-r--r--requirements.txt8
-rw-r--r--test/__init__.py0
-rw-r--r--test/test_plugin_anime_search.py32
-rw-r--r--test/test_plugin_chat.py84
-rw-r--r--test/test_plugin_code_runner.py75
-rw-r--r--test/test_plugin_funny.py117
-rw-r--r--test/test_plugin_help.py154
-rw-r--r--test/test_plugin_manage.py293
-rw-r--r--test/test_plugin_rich.py28
-rw-r--r--test/test_plugin_saucenao.py32
-rw-r--r--test/test_plugin_setu.py20
-rw-r--r--test/utils.py87
46 files changed, 1608 insertions, 447 deletions
diff --git a/.gitignore b/.gitignore
index 6ca4481..e2a2045 100644
--- a/.gitignore
+++ b/.gitignore
@@ -137,4 +137,7 @@ dmypy.json
# pytype static type analyzer
.pytype/
-# End of https://www.toptal.com/developers/gitignore/api/python \ No newline at end of file
+# End of https://www.toptal.com/developers/gitignore/api/python
+
+/accounts/*
+/data/*
diff --git a/ATRI/__init__.py b/ATRI/__init__.py
index 5e9db56..5346a7a 100644
--- a/ATRI/__init__.py
+++ b/ATRI/__init__.py
@@ -1,13 +1,11 @@
from time import sleep
import nonebot
-from nonebot.adapters.cqhttp import Bot as ATRIBot
+from nonebot.adapters.onebot.v11 import Adapter
from .config import RUNTIME_CONFIG
-from .log import logger
-
-__version__ = "YHN-001-A04"
+__version__ = "YHN-001-A05"
def asgi():
@@ -20,12 +18,11 @@ def driver():
def init():
nonebot.init(**RUNTIME_CONFIG)
- driver().register_adapter("cqhttp", ATRIBot)
+ driver().register_adapter(Adapter)
nonebot.load_plugins("ATRI/plugins")
- if RUNTIME_CONFIG["debug"]:
- nonebot.load_plugin("nonebot_plugin_test")
+ nonebot.load_plugin("nonebot_plugin_gocqhttp")
sleep(3)
-def run(app):
- nonebot.run(app=app)
+def run():
+ nonebot.run()
diff --git a/ATRI/config.py b/ATRI/config.py
index d1a9bfd..b911a83 100644
--- a/ATRI/config.py
+++ b/ATRI/config.py
@@ -30,12 +30,26 @@ class BotSelfConfig:
proxy: str = config.get("proxy", None)
+class InlineGoCQHTTP:
+ config: dict = config["InlineGoCQHTTP"]
+
+ accounts: list = config.get("accounts", [])
+ download_version: str = str(config.get("download_version", "latest"))
+
+
class SauceNAO:
config: dict = config["SauceNAO"]
key: str = config.get("key", "")
+class Setu:
+ config: dict = config["Setu"]
+
+ reverse_proxy: bool = bool(config.get("reverse_proxy", False))
+ reverse_proxy_domain: str = config.get("reverse_proxy_domain", str())
+
+
RUNTIME_CONFIG = {
"host": BotSelfConfig.host,
"port": BotSelfConfig.port,
@@ -45,4 +59,7 @@ RUNTIME_CONFIG = {
"command_start": BotSelfConfig.command_start,
"command_sep": BotSelfConfig.command_sep,
"session_expire_timeout": BotSelfConfig.session_expire_timeout,
+
+ "gocq_accounts": InlineGoCQHTTP.accounts,
+ "gocq_version": InlineGoCQHTTP.download_version,
}
diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py
index c95867d..8ce6a85 100644
--- a/ATRI/exceptions.py
+++ b/ATRI/exceptions.py
@@ -8,9 +8,7 @@ from typing import Optional
from traceback import format_exc
from pydantic.main import BaseModel
-from nonebot.adapters.cqhttp import Bot, Event
-from nonebot.matcher import Matcher
-from nonebot.typing import T_State
+from nonebot.adapters.onebot.v11 import Bot
from nonebot.message import run_postprocessor
from .log import logger
@@ -92,13 +90,10 @@ class ServiceRegisterError(BaseBotException):
prompt = "服务注册错误"
-@run_postprocessor # type: ignore
+@run_postprocessor
async def _track_error(
- matcher: Matcher,
exception: Optional[Exception],
bot: Bot,
- event: Event,
- state: T_State,
) -> None:
if not exception:
return
diff --git a/ATRI/log.py b/ATRI/log.py
index cbdc041..39a4864 100644
--- a/ATRI/log.py
+++ b/ATRI/log.py
@@ -27,6 +27,9 @@ class LoguruNameDealer:
if "nonebot.plugin.manager" in log_handle:
plugin_name = log_handle.split(".")[-1]
record["name"] = f"plugin.{plugin_name}"
+ elif "nonebot_plugin_gocqhttp" in log_handle:
+ plugin_name = log_handle.split("_")[-1]
+ record["name"] = "gocqhttp"
else:
record["name"] = record["name"].split(".")[0]
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py
index 5b4e212..7436aa2 100644
--- a/ATRI/plugins/anime_search.py
+++ b/ATRI/plugins/anime_search.py
@@ -1,9 +1,9 @@
import re
from random import choice
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
-from nonebot.adapters.cqhttp.message import Message, MessageSegment
+from nonebot.matcher import Matcher
+from nonebot.params import ArgPlainText, CommandArg
+from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message, MessageSegment
from ATRI.service import Service
from ATRI.rule import is_in_service
@@ -80,34 +80,25 @@ class Anime(Service):
anime_search = Anime().on_command("以图搜番", "发送一张图以搜索可能的番剧")
-@anime_search.args_parser # type: ignore
-async def _get_anime(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "不搜了", "取消"]
- if msg in quit_list:
- await anime_search.finish("好吧...")
- if not msg:
- await anime_search.reject("图呢?")
- else:
- state["anime"] = msg
-
-
@anime_search.handle()
-async def _ready_sear(bot: Bot, event: MessageEvent, state: T_State):
+async def _ready_sear(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
user_id = event.get_user_id()
if not _anime_flmt.check(user_id):
await anime_search.finish(_anime_flmt_notice)
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["anime"] = msg
+ matcher.set_arg("anime_pic", args)
-@anime_search.got("anime", "图呢?")
-async def _deal_sear(bot: Bot, event: MessageEvent, state: T_State):
+@anime_search.got("anime_pic", "图呢?")
+async def _deal_sear(
+ bot: Bot, event: MessageEvent, pic: str = ArgPlainText("anime_pic")
+):
user_id = event.get_user_id()
- msg = state["anime"]
- img = re.findall(r"url=(.*?)]", msg)
+ img = re.findall(r"url=(.*?)]", pic)
if not img:
await anime_search.reject("请发送图片而不是其它东西!!")
diff --git a/ATRI/plugins/broadcast.py b/ATRI/plugins/broadcast.py
new file mode 100644
index 0000000..0948e31
--- /dev/null
+++ b/ATRI/plugins/broadcast.py
@@ -0,0 +1,134 @@
+import os
+import json
+import random
+import asyncio
+from pathlib import Path
+
+from nonebot.matcher import Matcher
+from nonebot.permission import SUPERUSER
+from nonebot.params import CommandArg, ArgPlainText
+from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent
+from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN
+from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent
+
+
+from ATRI.service import Service
+from ATRI.rule import to_bot
+
+
+BC_PATH = Path(".") / "data" / "database" / "broadcast"
+os.makedirs(BC_PATH, exist_ok=True)
+
+_BROADCAST_BACK = """
+广播报告:
+信息:{msg}
+预计推送个数(群):{len_g}
+成功:{su_g}
+失败:{fl_g}
+失败列表:{f_g}
+""".strip()
+
+
+class BroadCast(Service):
+ def __init__(self):
+ Service.__init__(self, "广播", "向bot所在的所有群发送信息", True, to_bot())
+
+ @staticmethod
+ def load_rej_list() -> list:
+ data = list()
+ path = BC_PATH / "rej_list.json"
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data))
+ return data
+
+ return json.loads(path.read_bytes())
+
+ @classmethod
+ def store_rej_list(cls, data: list):
+ path = BC_PATH/ "rej_list.json"
+ if not path.is_file():
+ cls.load_rej_list()
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data))
+
+
+caster = BroadCast().on_command("广播", "向bot所在的所有群发送信息,有防寄延迟", aliases={"/bc","bc"}, permission=SUPERUSER)
+
+
+async def _(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("bc_msg", args)
+
+
[email protected]("bc_msg", "想要咱群发什么呢?")
+async def _(bot: Bot, event: MessageEvent, s_msg: str = ArgPlainText("bc_msg")):
+ w_group = await bot.get_group_list()
+
+ await bot.send(event, "正在推送...(每个群延迟1~3s)")
+
+ w_msg = f" 来自维护者的信息:\n{s_msg}"
+
+ su_g = list()
+ fl_g = list()
+ for i in w_group:
+ group_id = i["group_id"]
+ try:
+ await bot.send_group_msg(group_id=group_id, message=w_msg)
+ su_g.append(group_id)
+ except:
+ fl_g.append(group_id)
+
+ await asyncio.sleep(random.randint(2, 3))
+
+ repo_msg = _BROADCAST_BACK.format(
+ msg=s_msg,
+ len_g=len(w_group),
+ su_g=su_g,
+ fl_g=fl_g,
+ f_g="、".join(map(str, fl_g))
+ )
+ await caster.finish(repo_msg)
+
+
+rej_broadcast = BroadCast().on_command("拒绝广播", "拒绝来自开发者的广播推送", permission=GROUP_OWNER | GROUP_ADMIN)
+
+
+@rej_broadcast.handle()
+async def _(bot: Bot, event: GroupMessageEvent):
+ group_id = str(event.group_id)
+
+ rej_g = BroadCast().load_rej_list()
+ if group_id in rej_g:
+ await rej_broadcast.finish("本群已在推送黑名单内辣!")
+ else:
+ rej_g.append(group_id)
+ BroadCast().store_rej_list(rej_g)
+ await rej_broadcast.finish("完成~!已将本群列入推送黑名单")
+
+@rej_broadcast.handle()
+async def _(event: PrivateMessageEvent):
+ await rej_broadcast.finish("该功能仅在群聊中触发...")
+
+
+acc_broadcast = BroadCast().on_command("接受广播", "接受来自开发者的广播推送", permission=GROUP_OWNER | GROUP_ADMIN)
+
+
+@acc_broadcast.handle()
+async def _(bot: Bot, event: GroupMessageEvent):
+ group_id = str(event.group_id)
+
+ rej_g = BroadCast().load_rej_list()
+ if group_id in rej_g:
+ rej_g.remove(group_id)
+ BroadCast().store_rej_list(rej_g)
+ await rej_broadcast.finish("已将本群移除推送黑名单!")
+ else:
+ await rej_broadcast.finish("本群不在推送黑名单里呢...")
+
+@acc_broadcast.handle()
+async def _(event: PrivateMessageEvent):
+ await rej_broadcast.finish("该功能仅在群聊中触发...")
diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py
index f401f3f..217c71e 100644
--- a/ATRI/plugins/chat/__init__.py
+++ b/ATRI/plugins/chat/__init__.py
@@ -1,9 +1,9 @@
from random import choice
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.matcher import Matcher
+from nonebot.params import ArgPlainText, CommandArg
+from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message
-from ATRI.utils import CoolqCodeChecker
from ATRI.utils.limit import FreqLimiter
from ATRI.utils.apscheduler import scheduler
from .data_source import Chat
@@ -17,7 +17,6 @@ chat = Chat().on_message("文爱", "闲聊(文爱")
@chat.handle()
async def _chat(bot: Bot, event: MessageEvent):
- print(1)
user_id = event.get_user_id()
if not _chat_flmt.check(user_id):
await chat.finish(_chat_flmt_notice)
@@ -34,33 +33,20 @@ async def _chat(bot: Bot, event: MessageEvent):
my_name_is = Chat().on_command("叫我", "更改闲聊(文爱)时的称呼", aliases={"我是"}, priority=1)
-@my_name_is.args_parser # type: ignore
-async def _get_name(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "取消"]
- if msg in quit_list:
- await my_name_is.finish("好吧...")
- if not msg:
- await my_name_is.reject("欧尼酱想让咱如何称呼呢!0w0")
- else:
- state["name"] = msg
-
-
@my_name_is.handle()
-async def _name(bot: Bot, event: MessageEvent, state: T_State):
+async def _name(matcher: Matcher, event: MessageEvent, args: Message = CommandArg()):
user_id = event.get_user_id()
if not _chat_flmt.check(user_id):
await my_name_is.finish(_chat_flmt_notice)
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["name"] = msg
+ matcher.set_arg("name", args)
@my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0")
-async def _deal_name(bot: Bot, event: MessageEvent, state: T_State):
+async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")):
user_id = event.get_user_id()
- new_name = state["name"]
repo = choice(
[
f"好~w 那咱以后就称呼你为{new_name}!",
@@ -77,37 +63,21 @@ async def _deal_name(bot: Bot, event: MessageEvent, state: T_State):
say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1)
[email protected]_parser # type: ignore
-async def _get_say(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了"]
- if msg in quit_list:
- await say.finish("好吧...")
- if not msg:
- await say.reject("阿!要咱说啥呢...")
- else:
- state["say"] = msg
-
-
@say.handle()
-async def _ready_say(bot: Bot, event: MessageEvent, state: T_State):
+async def _ready_say(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
user_id = event.get_user_id()
if not _chat_flmt.check(user_id):
await say.finish(_chat_flmt_notice)
- msg = str(event.message)
+ msg = args.extract_plain_text()
if msg:
- state["say"] = msg
-
+ matcher.set_arg("say", args)
-async def _deal_say(bot: Bot, event: MessageEvent, state: T_State):
- msg = state["say"]
- check = CoolqCodeChecker(msg).check
- if not check:
- repo = choice(["不要...", "这个咱不想复读!", "不可以", "不好!"])
- await say.finish(repo)
[email protected]("say", "想要咱复读啥呢...")
+async def _deal_say(event: MessageEvent, msg: str = ArgPlainText("say")):
user_id = event.get_user_id()
_chat_flmt.start_cd(user_id)
await say.finish(msg)
diff --git a/ATRI/plugins/code_runner/__init__.py b/ATRI/plugins/code_runner/__init__.py
index 3f5697b..334c09a 100644
--- a/ATRI/plugins/code_runner/__init__.py
+++ b/ATRI/plugins/code_runner/__init__.py
@@ -1,7 +1,6 @@
from random import choice
-from nonebot.adapters.cqhttp import Bot, MessageEvent
-from nonebot.adapters.cqhttp.message import Message, MessageSegment
+from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment, unescape
from ATRI.utils.limit import FreqLimiter
from .data_source import CodeRunner
@@ -15,7 +14,7 @@ code_runner = CodeRunner().on_command("/code", "在线运行一段代码,帮�
@code_runner.handle()
-async def _code_runner(bot: Bot, event: MessageEvent):
+async def _code_runner(event: MessageEvent):
user_id = event.get_user_id()
if not _flmt.check(user_id):
await code_runner.finish(_flmt_notice)
@@ -23,14 +22,14 @@ async def _code_runner(bot: Bot, event: MessageEvent):
msg = str(event.get_message())
args = msg.split("\n")
- if not args:
+ if not args[0]:
content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!"
elif args[0] == "help":
content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().help()
elif args[0] == "list":
content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().list_supp_lang()
else:
- content = MessageSegment.at(user_id) + await CodeRunner().runner(msg)
+ content = MessageSegment.at(user_id) + await CodeRunner().runner(unescape(msg))
_flmt.start_cd(user_id)
await code_runner.finish(Message(content))
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
index fb493a9..6d981aa 100644
--- a/ATRI/plugins/essential.py
+++ b/ATRI/plugins/essential.py
@@ -8,11 +8,10 @@ from random import choice, randint
from pathlib import Path
import nonebot
-from nonebot.typing import T_State
-from nonebot.matcher import Matcher
+from nonebot.permission import SUPERUSER
from nonebot.message import run_preprocessor
from nonebot.exception import IgnoredException
-from nonebot.adapters.cqhttp import (
+from nonebot.adapters.onebot.v11 import (
Bot,
MessageEvent,
GroupMessageEvent,
@@ -24,13 +23,15 @@ from nonebot.adapters.cqhttp import (
GroupBanNoticeEvent,
GroupRecallNoticeEvent,
FriendRecallNoticeEvent,
+ MessageSegment,
+ Message
)
import ATRI
from ATRI.service import Service
from ATRI.log import logger as log
from ATRI.config import BotSelfConfig
-from ATRI.utils import CoolqCodeChecker
+from ATRI.utils import MessageChecker
from ATRI.utils.apscheduler import scheduler
@@ -57,10 +58,8 @@ async def shutdown():
log.info("Thanks for using.")
-@run_preprocessor # type: ignore
-async def _check_block(
- matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State
-) -> None:
+@run_preprocessor
+async def _check_block(event: MessageEvent):
user_file = "block_user.json"
path = MANEGE_DIR / user_file
if not path.is_file():
@@ -261,6 +260,9 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent):
await bot.send_private_msg(user_id=int(superuser), message=msg)
+_acc_recall = True
+
+
recall_event = Essential().on_notice("撤回事件", "撤回事件检测")
@@ -269,6 +271,9 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent):
if event.is_tome():
return
+ if not _acc_recall:
+ return
+
try:
repo = await bot.get_msg(message_id=event.message_id)
except BaseException:
@@ -276,14 +281,13 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent):
user = event.user_id
group = event.group_id
- repo = str(repo["message"])
- check = CoolqCodeChecker(repo).check
- if not check:
- repo = repo.replace("CQ", "QC")
+ repo: dict = repo["message"]
+
+ m = recall_msg_dealer(repo)
- msg = "主人,咱拿到了一条撤回信息!\n" f"{user}@[群:{group}]\n" "撤回了\n" f"{repo}"
+ msg = f"主人,咱拿到了一条撤回信息!\n{user}@[群:{group}]\n撤回了\n{m}"
for superuser in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=int(superuser), message=msg)
+ await bot.send_private_msg(user_id=int(superuser), message=Message(msg))
@recall_event.handle()
@@ -291,20 +295,40 @@ async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent):
if event.is_tome():
return
+ if not _acc_recall:
+ return
+
try:
repo = await bot.get_msg(message_id=event.message_id)
except BaseException:
return
-
+
user = event.user_id
- repo = str(repo["message"])
- check = CoolqCodeChecker(repo).check
- if not check:
- repo = repo.replace("CQ", "QC")
+ repo: dict = repo["message"]
+
+ m = recall_msg_dealer(repo)
- msg = "主人,咱拿到了一条撤回信息!\n" f"{user}@[私聊]" "撤回了\n" f"{repo}"
+ msg = f"主人,咱拿到了一条撤回信息!\n{user}@[私聊]撤回了\n{m}"
for superuser in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=int(superuser), message=msg)
+ await bot.send_private_msg(user_id=int(superuser), message=Message(msg))
+
+
+rej_recall = Essential().on_command("拒绝撤回", "拒绝撤回信息", permission=SUPERUSER)
+
+@rej_recall.handle()
+async def _():
+ global _acc_recall
+ _acc_recall = False
+ await rej_recall.finish("已拒绝撤回信息...")
+
+
+acc_recall = Essential().on_command("接受撤回", "接受撤回信息", permission=SUPERUSER)
+
+@acc_recall.handle()
+async def _():
+ global _acc_recall
+ _acc_recall = True
+ await acc_recall.finish("现在可以接受撤回信息啦!")
@scheduler.scheduled_job("interval", name="清除缓存", minutes=30, misfire_grace_time=5)
@@ -314,3 +338,27 @@ async def _clear_cache():
os.makedirs(TEMP_PATH, exist_ok=True)
except Exception:
log.warning("清除缓存失败,请手动清除:data/temp")
+
+
+def recall_msg_dealer(msg: dict) -> str:
+ temp_m = list()
+
+ for i in msg:
+ _type = i["type"]
+ _data = i["data"]
+ if _type == "text":
+ temp_m.append(_data["text"])
+ elif _type == "image":
+ url = _data["url"]
+ check = MessageChecker(url).check_image_url
+ if check:
+ temp_m.append(MessageSegment.image(url))
+ else:
+ temp_m.append(f"[该图片可能包含非法内容,源url:{url}]")
+ elif _type == "face":
+ temp_m.append(MessageSegment.face(_data["id"]))
+ else:
+ temp_m.append(f"[未知类型信息:{_data}]")
+
+ repo = str().join(map(str, temp_m))
+ return repo
diff --git a/ATRI/plugins/funny/__init__.py b/ATRI/plugins/funny/__init__.py
index ea60375..8173f3b 100644
--- a/ATRI/plugins/funny/__init__.py
+++ b/ATRI/plugins/funny/__init__.py
@@ -1,8 +1,8 @@
from random import choice, randint
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
-from nonebot.adapters.cqhttp.message import Message
+from nonebot.matcher import Matcher
+from nonebot.params import ArgPlainText, CommandArg
+from nonebot.adapters.onebot.v11 import Bot, MessageEvent, GroupMessageEvent, Message
from ATRI.utils.limit import FreqLimiter, DailyLimiter
from .data_source import Funny
@@ -39,38 +39,36 @@ _fake_flmt = FreqLimiter(60)
_fake_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
-@fake_msg.args_parser # type: ignore
-async def _perp_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了"]
- if msg in quit_list:
- await fake_msg.finish("好吧...")
- if not msg:
- await fake_msg.reject("内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开")
- else:
- state["content"] = msg
-
-
@fake_msg.handle()
-async def _ready_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _ready_fake(
+ matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()
+):
user_id = event.get_user_id()
if not _fake_daliy_max.check(user_id):
await fake_msg.finish(_fake_max_notice)
if not _fake_flmt.check(user_id):
await fake_msg.finish(_fake_flmt_notice)
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["content"] = msg
+ matcher.set_arg("content", args)
@fake_msg.got("content", "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开")
-async def _deal_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
- content = state["content"]
+async def _deal_fake(
+ bot: Bot, event: GroupMessageEvent, content: str = ArgPlainText("content")
+):
group_id = event.group_id
user_id = event.get_user_id()
- node = Funny().fake_msg(content)
- await bot.send_group_forward_msg(group_id=group_id, messages=node)
+ try:
+ node = Funny().fake_msg(content)
+ except Exception:
+ await fake_msg.finish("内容格式错误,请检查(")
+
+ try:
+ await bot.send_group_forward_msg(group_id=group_id, messages=node)
+ except Exception:
+ await fake_msg.finish("构造失败惹...可能是被制裁了(")
_fake_flmt.start_cd(user_id)
_fake_daliy_max.increase(user_id)
diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py
index a1fce64..8edc88a 100644
--- a/ATRI/plugins/funny/data_source.py
+++ b/ATRI/plugins/funny/data_source.py
@@ -3,7 +3,7 @@ import os
from pathlib import Path
from random import choice, randint
-from nonebot.adapters.cqhttp.utils import unescape
+from nonebot.adapters.onebot.v11 import unescape
from ATRI.service import Service
from ATRI.log import logger
@@ -77,9 +77,9 @@ class Funny(Service):
EAT_URL = "https://wtf.hiigara.net/api/run/"
params = {"event": "ManualRun"}
pattern_0 = r"大?[今明后]天(.*?)吃[什啥]么?"
- pattern_1 = r"(今|明|后|大后)天"
+ pattern_1 = r"[今|明|后|大后]天"
arg = re.findall(pattern_0, msg)[0]
- day = re.match(pattern_1, msg).group(0) # type: ignore
+ day = re.findall(pattern_1, msg)[0]
if arg == "中午":
a = f"LdS4K6/{randint(0, 1145141919810)}"
diff --git a/ATRI/plugins/help/__init__.py b/ATRI/plugins/help/__init__.py
index 1d1102e..a8465bc 100644
--- a/ATRI/plugins/help/__init__.py
+++ b/ATRI/plugins/help/__init__.py
@@ -1,5 +1,4 @@
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.onebot.v11 import MessageEvent
from ATRI.rule import to_bot
from .data_source import Helper
@@ -9,7 +8,7 @@ main_help = Helper().on_command("菜单", "获取食用bot的方法", rule=to_bo
@main_help.handle()
-async def _main_help(bot: Bot, event: MessageEvent):
+async def _main_help():
repo = Helper().menu()
await main_help.finish(repo)
@@ -18,7 +17,7 @@ about_me = Helper().on_command("关于", "获取关于bot的信息", rule=to_bot
@about_me.handle()
-async def _about_me(bot: Bot, event: MessageEvent):
+async def _about_me():
repo = Helper().about()
await about_me.finish(repo)
@@ -27,7 +26,7 @@ service_list = Helper().on_command("服务列表", "查看所有可用服务", r
@service_list.handle()
-async def _service_list(bot: Bot, event: MessageEvent):
+async def _service_list():
repo = Helper().service_list()
await service_list.finish(repo)
@@ -36,7 +35,7 @@ service_info = Helper().on_command("帮助", "获取服务详细帮助", aliases
@service_info.handle()
-async def _ready_service_info(bot: Bot, event: MessageEvent, state: T_State):
+async def _ready_service_info(event: MessageEvent):
msg = str(event.message).split(" ")
service = msg[0]
try:
diff --git a/ATRI/plugins/help/data_source.py b/ATRI/plugins/help/data_source.py
index 7b2f6f8..cfa0c62 100644
--- a/ATRI/plugins/help/data_source.py
+++ b/ATRI/plugins/help/data_source.py
@@ -34,7 +34,7 @@ class Helper(Service):
"关于 -查看bot基本信息\n"
"服务列表 -以查看所有可用服务\n"
"帮助 [服务] -以查看对应服务帮助\n"
- "Tip: 均需要at触发。菜单 以打开此页面"
+ "Tip: 均需要at触发。@bot 菜单 以打开此页面"
)
@staticmethod
@@ -59,10 +59,8 @@ class Helper(Service):
service = i.replace(".json", "")
temp_list.append(service)
- msg0 = "咱搭载了以下服务~\n"
services = "、".join(map(str, temp_list))
- msg0 = msg0 + services
- repo = msg0 + "\n@ 帮助 [服务] -以查看对应服务帮助"
+ repo = f"咱搭载了以下服务~\n{services}\n@bot 帮助 [服务] -以查看对应服务帮助"
return repo
@staticmethod
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py
index 77692fd..62ed378 100644
--- a/ATRI/plugins/manage/__init__.py
+++ b/ATRI/plugins/manage/__init__.py
@@ -1,25 +1,26 @@
import re
-from nonebot.typing import T_State
from nonebot.permission import SUPERUSER
-from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
-from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN
+from nonebot.matcher import Matcher
+from nonebot.params import ArgPlainText, CommandArg
+from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent
+from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN
from .data_source import Manage
+
block_user = Manage().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER)
@block_user.handle()
-async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_block_user(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["block_user"] = msg
+ matcher.set_arg("block_user", args)
@block_user.got("block_user", "哪位?GKD!")
-async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State):
- user_id = state["block_user"]
+async def _deal_block_user(user_id: str = ArgPlainText("block_user")):
quit_list = ["算了", "罢了"]
if user_id in quit_list:
await block_user.finish("...看来有人逃过一劫呢")
@@ -35,15 +36,14 @@ unblock_user = Manage().on_command("解封用户", "对目标用户进行解封"
@unblock_user.handle()
-async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_unblock_user(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["unblock_user"] = msg
+ matcher.set_arg("unblock_user", args)
@unblock_user.got("unblock_user", "哪位?GKD!")
-async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
- user_id = state["unblock_user"]
+async def _deal_unblock_user(user_id: str = ArgPlainText("unblock_user")):
quit_list = ["算了", "罢了"]
if user_id in quit_list:
await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了")
@@ -59,15 +59,14 @@ block_group = Manage().on_command("封禁群", "对目标群进行封禁", permi
@block_group.handle()
-async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_block_group(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["block_group"] = msg
+ matcher.set_arg("block_group", args)
@block_group.got("block_group", "哪个群?GKD!")
-async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State):
- group_id = state["block_group"]
+async def _deal_block_group(group_id: str = ArgPlainText("block_group")):
quit_list = ["算了", "罢了"]
if group_id in quit_list:
await block_group.finish("...看来有一群逃过一劫呢")
@@ -83,15 +82,14 @@ unblock_group = Manage().on_command("解封群", "对目标群进行解封", per
@unblock_group.handle()
-async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_unblock_group(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["unblock_group"] = msg
+ matcher.set_arg("unblock_group", args)
@unblock_group.got("unblock_group", "哪个群?GKD!")
-async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
- group_id = state["unblock_group"]
+async def _deal_unblock_group(group_id: str = ArgPlainText("unblock_group")):
quit_list = ["算了", "罢了"]
if group_id in quit_list:
await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了")
@@ -107,15 +105,16 @@ global_block_service = Manage().on_command("全局禁用", "全局禁用某服�
@global_block_service.handle()
-async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_block_service(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["global_block_service"] = msg
+ matcher.set_arg("global_block_service", args)
@global_block_service.got("global_block_service", "阿...是哪个服务呢")
-async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State):
- block_service = state["global_block_service"]
+async def _deal_global_block_service(
+ block_service: str = ArgPlainText("global_block_service"),
+):
quit_list = ["算了", "罢了"]
if block_service in quit_list:
await global_block_service.finish("好吧...")
@@ -131,15 +130,18 @@ global_unblock_service = Manage().on_command("全局启用", "全局启用某服
@global_unblock_service.handle()
-async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_unblock_service(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
+ msg = args.extract_plain_text()
if msg:
- state["global_unblock_service"] = msg
+ matcher.set_arg("global_unblock_service", args)
@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢")
-async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
- unblock_service = state["global_unblock_service"]
+async def _deal_global_unblock_service(
+ unblock_service: str = ArgPlainText("global_unblock_service"),
+):
quit_list = ["算了", "罢了"]
if unblock_service in quit_list:
await global_unblock_service.finish("好吧...")
@@ -157,7 +159,7 @@ user_block_service = Manage().on_regex(
@user_block_service.handle()
-async def _user_block_service(bot: Bot, event: MessageEvent):
+async def _user_block_service(event: MessageEvent):
msg = str(event.message).strip()
pattern = r"对用户(.*?)禁用(.*)"
reg = re.findall(pattern, msg)
@@ -176,7 +178,7 @@ user_unblock_service = Manage().on_regex(
@user_unblock_service.handle()
-async def _user_unblock_service(bot: Bot, event: MessageEvent):
+async def _user_unblock_service(event: MessageEvent):
msg = str(event.message).strip()
pattern = r"对用户(.*?)启用(.*)"
reg = re.findall(pattern, msg)
@@ -196,16 +198,17 @@ group_block_service = Manage().on_command(
@group_block_service.handle()
async def _ready_group_block_service(
- bot: Bot, event: GroupMessageEvent, state: T_State
+ matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()
):
msg = str(event.message).strip()
if msg:
- state["group_block_service"] = msg
+ matcher.set_arg("group_block_service", args)
@group_block_service.got("group_block_service", "阿...是哪个服务呢")
-async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
- aim_service = state["group_block_service"]
+async def _deal_group_block_service(
+ event: GroupMessageEvent, aim_service: str = ArgPlainText("group_block_service")
+):
group_id = str(event.group_id)
quit_list = ["算了", "罢了"]
if aim_service in quit_list:
@@ -224,18 +227,17 @@ group_unblock_service = Manage().on_command(
@group_unblock_service.handle()
async def _ready_group_unblock_service(
- bot: Bot, event: GroupMessageEvent, state: T_State
+ matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()
):
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["group_unblock_service"] = msg
+ matcher.set_arg("group_unblock_service", args)
@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢")
async def _deal_group_unblock_service(
- bot: Bot, event: GroupMessageEvent, state: T_State
+ event: GroupMessageEvent, aim_service: str = ArgPlainText("group_unblock_service")
):
- aim_service = state["group_unblock_service"]
group_id = str(event.group_id)
quit_list = ["算了", "罢了"]
if aim_service in quit_list:
@@ -251,7 +253,7 @@ get_friend_add_list = Manage().on_command("获取好友申请", "获取好友申
@get_friend_add_list.handle()
-async def _get_friend_add_list(bot: Bot, event: MessageEvent):
+async def _get_friend_add_list():
data = Manage().load_friend_apply_list()
temp_list = list()
for i in data:
@@ -270,15 +272,18 @@ approve_friend_add = Manage().on_command("同意好友", "同意好友申请", p
@approve_friend_add.handle()
-async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_approve_friend_add(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
+ msg = args.extract_plain_text()
if msg:
- state["approve_friend_add"] = msg
+ matcher.set_arg("approve_friend_add", args)
@approve_friend_add.got("approve_friend_add", "申请码GKD!")
-async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
- apply_code = state["approve_friend_add"]
+async def _deal_approve_friend_add(
+ bot: Bot, apply_code: str = ArgPlainText("approve_friend_add")
+):
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await approve_friend_add.finish("好吧...")
@@ -297,15 +302,18 @@ refuse_friend_add = Manage().on_command("拒绝好友", "拒绝好友申请", pe
@refuse_friend_add.handle()
-async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_refuse_friend_add(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
+ msg = args.extract_plain_text()
if msg:
- state["refuse_friend_add"] = msg
+ matcher.set_arg("refuse_friend_add", args)
@refuse_friend_add.got("refuse_friend_add", "申请码GKD!")
-async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
- apply_code = state["refuse_friend_add"]
+async def _deal_refuse_friend_add(
+ bot: Bot, apply_code: str = ArgPlainText("refuse_friend_add")
+):
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await refuse_friend_add.finish("好吧...")
@@ -324,7 +332,7 @@ get_group_invite_list = Manage().on_command("获取邀请列表", "获取群邀�
@get_group_invite_list.handle()
-async def _get_group_invite_list(bot: Bot, event: MessageEvent):
+async def _get_group_invite_list():
data = Manage().load_invite_apply_list()
temp_list = list()
for i in data:
@@ -343,15 +351,18 @@ approve_group_invite = Manage().on_command("同意邀请", "同意群聊邀请",
@approve_group_invite.handle()
-async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_approve_group_invite(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
+ msg = args.extract_plain_text()
if msg:
- state["approve_group_invite"] = msg
+ matcher.set_arg("approve_group_invite", args)
@approve_group_invite.got("approve_group_invite", "申请码GKD!")
-async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
- apply_code = state["approve_group_invite"]
+async def _deal_approve_group_invite(
+ bot: Bot, apply_code: str = ArgPlainText("approve_group_invite")
+):
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await approve_group_invite.finish("好吧...")
@@ -372,15 +383,18 @@ refuse_group_invite = Manage().on_command("拒绝邀请", "拒绝群聊邀请",
@refuse_group_invite.handle()
-async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_refuse_group_invite(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
+ msg = args.extract_plain_text()
if msg:
- state["refuse_group_invite"] = msg
+ matcher.set_arg("refuse_group_invite", args)
@refuse_group_invite.got("refuse_group_invite", "申请码GKD!")
-async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
- apply_code = state["refuse_group_invite"]
+async def _deal_refuse_group_invite(
+ bot: Bot, apply_code: str = ArgPlainText("refuse_group_invite")
+):
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await refuse_group_invite.finish("好吧...")
@@ -390,7 +404,7 @@ async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_Stat
flag=apply_code, sub_type="invite", approve=False
)
except BaseException:
- await refuse_group_invite.finish("拒绝失败...尝试下手动?")
+ await refuse_group_invite.finish("拒绝失败...(可能是小群免验证)尝试下手动?")
data = Manage().load_invite_apply_list()
data.pop(apply_code)
Manage().save_invite_apply_list(data)
@@ -401,7 +415,7 @@ track_error = Manage().on_command("追踪", "获取报错信息,传入追踪�
@track_error.handle()
-async def _track_error(bot: Bot, event: MessageEvent):
+async def _track_error(event: MessageEvent):
track_id = str(event.message).strip()
repo = await Manage().track_error(track_id)
await track_error.finish(repo)
diff --git a/ATRI/plugins/repo.py b/ATRI/plugins/repo.py
index 9c52610..40869eb 100644
--- a/ATRI/plugins/repo.py
+++ b/ATRI/plugins/repo.py
@@ -1,7 +1,8 @@
from random import choice
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.matcher import Matcher
+from nonebot.params import CommandArg, ArgPlainText
+from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message
from ATRI.service import Service
from ATRI.config import BotSelfConfig
@@ -25,46 +26,39 @@ class Repo(Service):
Service.__init__(self, "反馈", "向维护者发送消息")
-repo = Repo().on_command("来杯红茶", "向维护者发送消息", aliases={"反馈", "报告"})
+reporter = Repo().on_command("来杯红茶", "向维护者发送消息", aliases={"反馈", "报告"})
[email protected]_parser # type: ignore
-async def _get_repo(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "取消"]
- if msg in quit_list:
- await repo.finish("好吧...")
- if not msg:
- await repo.reject("需要反馈的内容呢?~")
- else:
- state["repo"] = msg
-
-
-async def _ready_repo(bot: Bot, event: MessageEvent, state: T_State):
+async def _ready_repo(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
user_id = event.get_user_id()
if not _repo_flmt.check(user_id):
- await repo.finish(_repo_flmt_notice)
+ await reporter.finish(_repo_flmt_notice)
if not _repo_dlmt.check(user_id):
- await repo.finish(_repo_dlmt_notice)
+ await reporter.finish(_repo_dlmt_notice)
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["repo"] = msg
+ matcher.set_arg("repo", args)
[email protected]("repo", "需要反馈的内容呢?~")
-async def _deal_repo(bot: Bot, event: MessageEvent, state: T_State):
- msg = state["repo"]
[email protected]("repo", "需要反馈的内容呢?~")
+async def _deal_repo(
+ bot: Bot,
+ event: MessageEvent,
+ repo_msg: str = ArgPlainText("repo"),
+):
user_id = event.get_user_id()
- repo_0 = REPO_FORMAT.format(user=user_id, msg=msg)
+ repo_0 = REPO_FORMAT.format(user=user_id, msg=repo_msg)
for superuser in BotSelfConfig.superusers:
try:
await bot.send_private_msg(user_id=superuser, message=repo_0)
except BaseException:
- await repo.finish("发送失败了呢...")
+ await reporter.finish("发送失败了呢...")
_repo_flmt.start_cd(user_id)
_repo_dlmt.increase(user_id)
- await repo.finish("吾辈的心愿已由咱转告维护者!")
+ await reporter.finish("吾辈的心愿已由咱转告维护者!")
diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py
index bcd58f7..3b49750 100644
--- a/ATRI/plugins/rich/__init__.py
+++ b/ATRI/plugins/rich/__init__.py
@@ -1,4 +1,4 @@
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.onebot.v11 import MessageEvent
from ATRI.utils.limit import FreqLimiter
from ATRI.log import logger as log
@@ -10,7 +10,7 @@ bili_rich = Rich().on_message("小程序检测", "小程序爪巴", block=False)
@bili_rich.handle()
-async def _fk_bili(bot: Bot, event: MessageEvent):
+async def _fk_bili(event: MessageEvent):
user_id = event.get_user_id()
if not _rich_flmt.check(user_id):
return
diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py
index fed1096..092db50 100644
--- a/ATRI/plugins/saucenao/__init__.py
+++ b/ATRI/plugins/saucenao/__init__.py
@@ -1,9 +1,9 @@
from re import findall
from random import choice
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
-from nonebot.adapters.cqhttp.message import Message, MessageSegment
+from nonebot.matcher import Matcher
+from nonebot.params import ArgPlainText, CommandArg
+from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment
from ATRI.config import SauceNAO
from ATRI.utils.limit import FreqLimiter
@@ -17,33 +17,22 @@ _search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇�
saucenao = SaouceNao().on_command("以图搜图", "透过一张图搜索可能的来源")
[email protected]_parser # type: ignore
-async def _get_img(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "不搜了"]
- if msg in quit_list:
- await saucenao.finish("好吧...")
- if not msg:
- await saucenao.reject("图呢?")
- else:
- state["img"] = msg
-
-
@saucenao.handle()
-async def _ready_search(bot: Bot, event: MessageEvent, state: T_State):
+async def _ready_search(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
user_id = event.get_user_id()
if not _search_flmt.check(user_id):
await saucenao.finish(_search_flmt_notice)
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["img"] = msg
+ matcher.set_arg("saucenao_img", args)
[email protected]("img", "图呢?")
-async def _deal_search(bot: Bot, event: MessageEvent, state: T_State):
[email protected]("saucenao_img", "图呢?")
+async def _deal_search(event: MessageEvent, msg: str = ArgPlainText("saucenao_img")):
user_id = event.get_user_id()
- msg = state["img"]
img = findall(r"url=(.*?)]", msg)
if not img:
await saucenao.reject("请发送图片而不是其他东西!!")
diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py
index 244e030..6c135aa 100644
--- a/ATRI/plugins/setu/__init__.py
+++ b/ATRI/plugins/setu/__init__.py
@@ -1,10 +1,11 @@
import re
import asyncio
from random import choice
+
from nonebot.permission import SUPERUSER
-from nonebot.adapters.cqhttp import Bot, MessageEvent, Message
-from nonebot.adapters.cqhttp.message import MessageSegment
-from nonebot.typing import T_State
+from nonebot.matcher import Matcher
+from nonebot.params import CommandArg, ArgPlainText
+from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message, MessageSegment
from ATRI.config import BotSelfConfig
from ATRI.utils.limit import FreqLimiter, DailyLimiter
@@ -22,7 +23,7 @@ random_setu = Setu().on_command(
@random_setu.handle()
-async def _random_setu(bot: Bot, event: MessageEvent):
+async def _random_setu(bot: Bot, event: MessageEvent, matcher: Matcher, args: Message = CommandArg()):
user_id = event.get_user_id()
if not _setu_flmt.check(user_id):
await random_setu.finish()
@@ -44,12 +45,25 @@ async def _random_setu(bot: Bot, event: MessageEvent):
await asyncio.sleep(30)
await bot.delete_msg(message_id=event_id)
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("r_rush_after_think", args)
+
+@random_setu.got("r_rush_after_think")
+async def _(think: str = ArgPlainText("r_rush_after_think")):
+ is_repo = will_think(think)
+ if not is_repo:
+ await random_setu.finish()
+ else:
+ await random_setu.finish(is_repo)
+
+
tag_setu = Setu().on_regex(r"来[张点丶份](.*?)的[涩色🐍]图", "根据提供的tag查找涩图")
@tag_setu.handle()
-async def _tag_setu(bot: Bot, event: MessageEvent):
+async def _tag_setu(bot: Bot, event: MessageEvent, matcher: Matcher, args: Message = CommandArg()):
user_id = event.get_user_id()
if not _setu_flmt.check(user_id):
await random_setu.finish()
@@ -77,6 +91,19 @@ async def _tag_setu(bot: Bot, event: MessageEvent):
await asyncio.sleep(30)
await bot.delete_msg(message_id=event_id)
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("r_rush_after_think", args)
+
+
+@tag_setu.got("t_rush_after_think")
+async def _(think: str = ArgPlainText("t_rush_after_think")):
+ is_repo = will_think(think)
+ if not is_repo:
+ await random_setu.finish()
+ else:
+ await random_setu.finish(is_repo)
+
_catcher_max_file_size = 128
@@ -129,17 +156,16 @@ nsfw_checker = Setu().on_command("/nsfw", "涩值检测")
@nsfw_checker.handle()
-async def _nsfw_checker(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _nsfw_checker(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["nsfw_img"] = msg
+ matcher.set_arg("nsfw_img", args)
@nsfw_checker.got("nsfw_img", "图呢?")
-async def _deal_check(bot: Bot, event: MessageEvent, state: T_State):
- msg = state["nsfw_img"]
+async def _deal_check(bot: Bot, img: str = ArgPlainText("nsfw_img")):
pattern = r"url=(.*?)]"
- args = re.findall(pattern, msg)
+ args = re.findall(pattern, img)
if not args:
await nsfw_checker.reject("请发送图片而不是其他东西!!")
@@ -167,16 +193,15 @@ catcher_setting = Setu().on_command("嗅探设置", "涩图检测图片文件大
@catcher_setting.handle()
-async def _catcher_setting(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _catcher_setting(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["catcher_set"] = msg
+ matcher.set_arg("catcher_set", args)
@catcher_setting.got("catcher_set", "数值呢?(1对应1kb,默认128)")
-async def _deal_setting(bot: Bot, event: MessageEvent, state: T_State):
+async def _deal_setting(msg: str = ArgPlainText("catcher_set")):
global _catcher_max_file_size
- msg = state["catcher_set"]
try:
_catcher_max_file_size = int(msg)
except Exception:
@@ -205,3 +230,37 @@ async def _scheduler_setu(bot):
except Exception:
pass
+
+
+_ag_l = ["涩图来", "来点涩图", "来份涩图"]
+_ag_patt = r"来[张点丶份](.*?)的[涩色🐍]图"
+
+_nice_patt = r"[hH好][sS涩色][oO哦]|[嗯恩摁社蛇🐍射]了|(硬|石更)了|[牛🐂][牛🐂]要炸了|[炼恋]起来|开?导"
+_nope_patt = r"不够[涩色]|就这|这也[是叫算]|[??]"
+_again_patt = r"再来一张|不够"
+
+_nice_repo = ["w", "好诶!", "ohh", "(///w///)", "🥵", "我也"]
+_nope_repo = ["那你来发", "爱看不看", "你看不看吧", "看这种类型的涩图,是一件多么美妙的事情"]
+_again_repo = ["没了...", "自己找去"]
+
+
+def will_think(msg: str) -> str:
+ if msg in _ag_l:
+ return str()
+
+ ag_jud = re.findall(_ag_patt, msg)
+ if ag_jud:
+ return str()
+
+ nice_jud = re.findall(_nice_patt, msg)
+ nope_jud = re.findall(_nope_patt, msg)
+ again_jud = re.findall(_again_patt, msg)
+
+ if nice_jud:
+ return choice(_nice_repo)
+ elif nope_jud:
+ return choice(_nope_repo)
+ elif again_jud:
+ return choice(_again_repo)
+ else:
+ return str()
diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py
index cca2767..a26bbef 100644
--- a/ATRI/plugins/setu/data_source.py
+++ b/ATRI/plugins/setu/data_source.py
@@ -1,16 +1,16 @@
-import base64
-
-# from pathlib import Path
from random import choice
-from nonebot.adapters.cqhttp import MessageSegment
+from nonebot.adapters.onebot.v11 import MessageSegment
from ATRI.service import Service
from ATRI.rule import is_in_service
from ATRI.utils import request
+from ATRI.config import Setu as ST
from .tf_dealer import detect_image
LOLICON_URL = "https://api.lolicon.app/setu/v2"
+DEFAULT_SETU = "https://i.pixiv.cat/img-original/img/2021/02/28/22/44/49/88124144_p0.jpg"
+
class Setu(Service):
@@ -31,9 +31,9 @@ class Setu(Service):
data: dict = temp_data[0]
title = data.get("title", "木陰のねこ")
p_id = data.get("pid", 88124144)
- url = data["urls"].get("original", "ignore")
+ url: str = data["urls"].get("original", "ignore")
- setu = MessageSegment.image(url, timeout=114514)
+ setu = MessageSegment.image(use_proxy(url), timeout=114514)
repo = f"Title: {title}\nPid: {p_id}"
return repo, setu
@@ -55,7 +55,7 @@ class Setu(Service):
p_id = data.get("pid", 88124144)
url = data["urls"].get(
"original",
- "https://i.pixiv.cat/img-original/img/2021/02/28/22/44/49/88124144_p0.jpg",
+ use_proxy(DEFAULT_SETU),
)
setu = MessageSegment.image(url, timeout=114514)
repo = f"Title: {title}\nPid: {p_id}"
@@ -87,8 +87,15 @@ class Setu(Service):
url = temp_data[0]["urls"].get(
"original",
- "https://i.pixiv.cat/img-original/img/2021/02/28/22/44/49/88124144_p0.jpg",
+ use_proxy(DEFAULT_SETU),
)
setu = MessageSegment.image(url, timeout=114514)
repo = f"是{tag}哦~❤\n{setu}"
return repo
+
+
+def use_proxy(url: str) -> str:
+ if ST.reverse_proxy:
+ return url.replace("i.pixiv.cat", ST.reverse_proxy_domain)
+ else:
+ return url
diff --git a/ATRI/plugins/setu/tf_dealer.py b/ATRI/plugins/setu/tf_dealer.py
index bf68020..f966636 100644
--- a/ATRI/plugins/setu/tf_dealer.py
+++ b/ATRI/plugins/setu/tf_dealer.py
@@ -1,7 +1,6 @@
import io
import os
import re
-import string
import asyncio
import skimage
import skimage.io
@@ -9,12 +8,8 @@ import numpy as np
from PIL import Image
from pathlib import Path
from sys import getsizeof
-from random import sample
-try:
- import tflite_runtime.interpreter as tf # type: ignore
-except Exception:
- import tensorflow as tf
+import tensorflow as tf
from ATRI.log import logger as log
from ATRI.utils import request
@@ -116,4 +111,5 @@ async def init_module():
raise WriteError("NSFW TF module init failed!")
-asyncio.get_event_loop().run_until_complete(init_module())
+loop = asyncio.get_event_loop()
+loop.create_task(init_module())
diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py
index 359344b..06afa3b 100644
--- a/ATRI/plugins/status/__init__.py
+++ b/ATRI/plugins/status/__init__.py
@@ -1,5 +1,3 @@
-from nonebot.adapters.cqhttp import Bot, MessageEvent
-
from ATRI.utils.apscheduler import scheduler
from .data_source import IsSurvive
@@ -8,7 +6,7 @@ ping = IsSurvive().on_command("/ping", "检测bot简单信息处理速度")
@ping.handle()
-async def _ping(bot: Bot, event: MessageEvent):
+async def _ping():
await ping.finish(IsSurvive.ping())
@@ -16,7 +14,7 @@ status = IsSurvive().on_command("/status", "查看运行资源占用")
@status.handle()
-async def _status(bot: Bot, event: MessageEvent):
+async def _status():
msg, _ = IsSurvive.get_status()
await status.finish(msg)
diff --git a/ATRI/plugins/util/__init__.py b/ATRI/plugins/util/__init__.py
index 541ca1b..b67a988 100644
--- a/ATRI/plugins/util/__init__.py
+++ b/ATRI/plugins/util/__init__.py
@@ -1,8 +1,9 @@
import re
from random import choice, random
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.matcher import Matcher
+from nonebot.params import CommandArg, ArgPlainText
+from nonebot.adapters.onebot.v11 import MessageEvent, Message
from ATRI.utils.limit import FreqLimiter
from .data_source import Encrypt, Utils, Yinglish
@@ -11,62 +12,36 @@ from .data_source import Encrypt, Utils, Yinglish
roll = Utils().on_command("/roll", "骰子~用法:1d10 或 2d10+2d10+more")
[email protected]_parser # type: ignore
-async def _get_roll(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了"]
- if msg in quit_list:
- await roll.finish("好吧...")
- if not msg:
- await roll.reject("参数呢?!格式:1d10 或 2d10+2d10+more")
- else:
- state["roll"] = msg
-
-
@roll.handle()
-async def _ready_roll(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_roll(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["roll"] = msg
+ matcher.set_arg("roll", args)
@roll.got("roll", "参数呢?!格式:1d10 或 2d10+2d10+more")
-async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State):
- text = state["roll"]
- match = re.match(r"^([\dd+\s]+?)$", text)
+async def _deal_roll(roll_msg: str = ArgPlainText("roll")):
+ match = re.match(r"^([\dd+\s]+?)$", roll_msg)
if not match:
await roll.finish("阿——!参数不对!格式:1d10 或 2d10+2d10+more")
- msg = Utils().roll_dice(text)
+ msg = Utils().roll_dice(roll_msg)
await roll.finish(msg)
-encrypt_en = Utils().on_command("加密", "我们之前的秘密❤")
-
-
-@encrypt_en.args_parser # type: ignore
-async def _get_encr_en_text(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了"]
- if msg in quit_list:
- await roll.finish("好吧...")
- if not msg:
- await roll.reject("内容呢?!")
- else:
- state["encr_en_text"] = msg
+encrypt_en = Utils().on_command("加密", "我们之间的秘密❤")
@encrypt_en.handle()
-async def _ready_en(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_en(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["encr_en_text"] = msg
+ matcher.set_arg("encr_en_text", args)
@encrypt_en.got("encr_en_text", "内容呢?!")
-async def _deal_en(bot: Bot, event: MessageEvent, state: T_State):
- text = state["encr_en_text"]
+async def _deal_en(text: str = ArgPlainText("encr_en_text")):
is_ok = len(text)
if is_ok < 10:
await encrypt_en.reject("太短不加密!")
@@ -78,28 +53,15 @@ async def _deal_en(bot: Bot, event: MessageEvent, state: T_State):
encrypt_de = Utils().on_command("解密", "解开我们的秘密❤")
-@encrypt_de.args_parser # type: ignore
-async def _get_encr_de_text(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了"]
- if msg in quit_list:
- await encrypt_de.finish("好吧...")
- if not msg:
- await encrypt_de.reject("内容呢?!")
- else:
- state["encr_de_text"] = msg
-
-
@encrypt_de.handle()
-async def _ready_de(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
+async def _ready_de(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
if msg:
- state["encr_de_text"] = msg
+ matcher.set_arg("encr_de_text", args)
@encrypt_de.got("encr_de_text", "内容呢?!")
-async def _deal_de(bot: Bot, event: MessageEvent, state: T_State):
- text = state["encr_de_text"]
+async def _deal_de(text: str = ArgPlainText("encr_de_text")):
en = Encrypt()
result = en.decode(text)
await encrypt_de.finish(result)
@@ -111,33 +73,22 @@ _sepi_flmt = FreqLimiter(3)
_sepi_flmt_notice = ["涩批爬", "✌🥵✌"]
[email protected]_parser # type: ignore
-async def _get_sepi(bot: Bot, event: MessageEvent, state: T_State):
- msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "取消"]
- if msg in quit_list:
- await sepi.finish("好吧...")
- if not msg:
- await sepi.reject("内容呢?!")
- else:
- state["sepi_text"] = msg
-
-
@sepi.handle()
-async def _ready_sepi(bot: Bot, event: MessageEvent, state: T_State):
+async def _ready_sepi(
+ matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
+):
user_id = event.get_user_id()
if not _sepi_flmt.check(user_id):
await sepi.finish(choice(_sepi_flmt_notice))
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["sepi_text"] = msg
+ matcher.set_arg("sepi_text", args)
@sepi.got("sepi_text", "内容呢?!")
-async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State):
+async def _deal_sepi(event: MessageEvent, msg: str = ArgPlainText("sepi_text")):
user_id = event.get_user_id()
- msg = state["sepi_text"]
if len(msg) < 4:
await sepi.finish("这么短?涩不起来!")
diff --git a/ATRI/plugins/wife/__init__.py b/ATRI/plugins/wife/__init__.py
index 57a5af2..b69315d 100644
--- a/ATRI/plugins/wife/__init__.py
+++ b/ATRI/plugins/wife/__init__.py
@@ -3,9 +3,10 @@ from random import choice
from pydantic import BaseModel
from nonebot.rule import Rule
-from nonebot.typing import T_State
+from nonebot.matcher import Matcher
+from nonebot.params import CommandArg, ArgPlainText
from nonebot.permission import SUPERUSER
-from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent, Message
+from nonebot.adapters.onebot.v11 import Bot, MessageEvent, GroupMessageEvent, Message
from ATRI.utils.limit import FreqLimiter
from .data_source import Wife
@@ -25,7 +26,7 @@ tietie_superuser = Wife().on_message(
@tietie_superuser.handle()
-async def _tietie_superuser(bot: Bot, event: MessageEvent):
+async def _tietie_superuser(event: MessageEvent):
if not _is_tietie:
await tietie_superuser.finish()
@@ -38,25 +39,21 @@ async def _tietie_superuser(bot: Bot, event: MessageEvent):
await tietie_superuser.finish(Message(result))
-no_tietie = Wife().on_command(
- "不可以贴", docs="拒绝贴贴", rule=Rule(), permission=SUPERUSER, block=False
-)
+no_tietie = Wife().on_command("不可以贴", docs="拒绝贴贴", rule=Rule(), permission=SUPERUSER)
@no_tietie.handle()
-async def _no_tietie(bot: Bot, event: MessageEvent):
+async def _no_tietie():
global _is_tietie
_is_tietie = False
await no_tietie.finish("好吧...")
-yes_tietie = Wife().on_command(
- "来贴贴", docs="继续贴贴", rule=Rule(), permission=SUPERUSER, block=False
-)
+yes_tietie = Wife().on_command("来贴贴", docs="继续贴贴", rule=Rule(), permission=SUPERUSER)
@yes_tietie.handle()
-async def _yes_tietie(bot: Bot, event: MessageEvent):
+async def _yes_tietie():
global _is_tietie
_is_tietie = True
await yes_tietie.finish("好欸!")
@@ -117,7 +114,7 @@ async def _get_wife(bot: Bot, event: GroupMessageEvent):
name=req_user_card, sex=is_nick, wife=user_id
).dict()
data[user_id] = MarryInfo(
- name=lucky_user_card, sex=lucky_user_sex, wife=lucky_user
+ name=lucky_user_card, sex=lucky_user_sex, wife=str(lucky_user)
).dict()
Wife().save_marry_list(data)
@@ -133,7 +130,7 @@ call_wife = Wife().on_command("老婆", "呼唤老婆/老公!", aliases={"老�
@call_wife.handle()
-async def _call_wife(bot: Bot, event: MessageEvent):
+async def _call_wife(event: MessageEvent):
user_id = event.get_user_id()
if not _wife_flmt.check(user_id):
await call_wife.finish()
@@ -154,7 +151,9 @@ discard_wife = Wife().on_command("我要离婚", "离婚!")
@discard_wife.handle()
-async def _discard_wife(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _discard_wife(
+ matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()
+):
user_id = event.get_user_id()
if not _wife_flmt.check(user_id):
await discard_wife.finish()
@@ -163,19 +162,20 @@ async def _discard_wife(bot: Bot, event: GroupMessageEvent, state: T_State):
if user_id not in data:
await discard_wife.finish("你还没对象呐...")
- msg = str(event.message).strip()
+ msg = args.extract_plain_text()
if msg:
- state["is_disc"] = msg
+ matcher.set_arg("is_disc", args)
@discard_wife.got("is_disc", "真的吗...(y/是)")
-async def _deal_discard(bot: Bot, event: GroupMessageEvent, state: T_State):
- msg = state["is_disc"]
+async def _deal_discard(
+ bot: Bot, event: GroupMessageEvent, is_disc: str = ArgPlainText("is_disc")
+):
rd_list = ["y", "Y", "是", "确认", "对"]
user_id = event.get_user_id()
group_id = event.group_id
- if msg not in rd_list:
+ if is_disc not in rd_list:
user_info = await bot.get_group_member_info(
group_id=group_id, user_id=int(user_id)
)
diff --git a/ATRI/plugins/wife/data_source.py b/ATRI/plugins/wife/data_source.py
index 670f862..29d58e7 100644
--- a/ATRI/plugins/wife/data_source.py
+++ b/ATRI/plugins/wife/data_source.py
@@ -2,7 +2,7 @@ import os
import json
from random import choice
from pathlib import Path
-from nonebot.adapters.cqhttp import MessageSegment
+from nonebot.adapters.onebot.v11 import MessageSegment
from ATRI.service import Service
from ATRI.rule import is_in_service
diff --git a/ATRI/rule.py b/ATRI/rule.py
index e8126e2..925565e 100644
--- a/ATRI/rule.py
+++ b/ATRI/rule.py
@@ -1,12 +1,12 @@
-from nonebot.adapters.cqhttp.event import PrivateMessageEvent
from nonebot.rule import Rule
-from nonebot.adapters.cqhttp import MessageEvent, GroupMessageEvent
+from nonebot.adapters import Bot, Event
+from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent
from .service import ServiceTools
def is_in_service(service: str) -> Rule:
- async def _is_in_service(bot, event, state) -> bool:
+ async def _is_in_service(bot: Bot, event: Event) -> bool:
result = ServiceTools().auth_service(service)
if not result:
return False
@@ -27,7 +27,7 @@ def is_in_service(service: str) -> Rule:
def to_bot() -> Rule:
- async def _to_bot(bot, event, state) -> bool:
+ async def _to_bot(bot: Bot, event: Event) -> bool:
return event.is_tome()
return Rule(_to_bot)
diff --git a/ATRI/service.py b/ATRI/service.py
index 5379644..9f680c8 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -2,15 +2,22 @@ import os
import re
import json
from pathlib import Path
+from types import ModuleType
from pydantic import BaseModel
from typing import List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING
from nonebot.matcher import Matcher
from nonebot.permission import Permission
-from nonebot.typing import T_State, T_Handler, T_RuleChecker
+from nonebot.dependencies import Dependent
+from nonebot.typing import (
+ T_State,
+ T_Handler,
+ T_RuleChecker,
+ T_PermissionChecker,
+)
from nonebot.rule import Rule, command, keyword, regex
-from ATRI.exceptions import ReadFileError, ServiceRegisterError, WriteError
+from ATRI.exceptions import ReadFileError, WriteError
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
@@ -35,7 +42,7 @@ class ServiceInfo(BaseModel):
class CommandInfo(BaseModel):
type: str
docs: str
- aliases: list or set
+ aliases: Union[list, set]
class Service:
@@ -148,11 +155,10 @@ class Service:
def on_message(
self,
name: str = None,
- docs: str = None,
- _from: str = str(), # 供类似 on_command 的方法,提供更直观的 log 中 matcher 触发来源
+ docs: str = str(),
rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- handlers: Optional[List[T_Handler]] = None,
+ permission: Optional[Union[Permission, T_PermissionChecker]] = None,
+ handlers: Optional[List[Union[T_Handler, Dependent]]] = None,
block: bool = True,
priority: int = None,
state: Optional[T_State] = None,
@@ -181,15 +187,14 @@ class Service:
matcher = Matcher.new(
"message",
Rule() & rule,
- permission or Permission(),
- module=self.service + "-" + _from,
+ Permission() | permission,
+ module=ModuleType(self.service),
temp=self.temp,
priority=priority,
block=block,
handlers=handlers,
default_state=state,
)
- matcher.module = self.service
return matcher
def on_notice(self, name: str, docs: str, block: bool = True) -> Type[Matcher]:
@@ -204,7 +209,7 @@ class Service:
"notice",
Rule() & self.rule,
Permission(),
- module=self.service + "-" + name,
+ module=ModuleType(self.service),
temp=self.temp,
priority=self.priority,
block=block,
@@ -225,7 +230,7 @@ class Service:
"request",
Rule() & self.rule,
Permission(),
- module=self.service + "-" + name,
+ module=ModuleType(self.service),
temp=self.temp,
priority=self.priority,
block=block,
@@ -253,22 +258,8 @@ class Service:
).dict()
self._save_cmds(cmd_list)
- async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
- message = event.get_message()
- segment = message.pop(0)
- new_message = message.__class__(
- str(segment).lstrip()[len(state["_prefix"]["raw_command"]) :].lstrip()
- ) # type: ignore
- for new_segment in reversed(new_message):
- message.insert(0, new_segment)
-
- handlers = kwargs.pop("handlers", [])
- handlers.insert(0, _strip_cmd)
-
commands = set([cmd]) | (aliases or set())
- return self.on_message(
- _from=str(cmd), rule=command(*commands) & rule, handlers=handlers, **kwargs
- )
+ return self.on_message(rule=command(*commands) & rule, block=True, **kwargs)
def on_keyword(
self,
@@ -287,7 +278,7 @@ class Service:
cmd_list[name] = CommandInfo(type="keyword", docs=docs, aliases=keywords).dict()
self._save_cmds(cmd_list)
- return self.on_message(_from=name, rule=keyword(*keywords) & rule, **kwargs)
+ return self.on_message(rule=keyword(*keywords) & rule, **kwargs)
def on_regex(
self,
@@ -304,9 +295,7 @@ class Service:
cmd_list[pattern] = CommandInfo(type="regex", docs=docs, aliases=list()).dict()
self._save_cmds(cmd_list)
- return self.on_message(
- _from=pattern, rule=regex(pattern, flags) & rule, **kwargs
- )
+ return self.on_message(rule=regex(pattern, flags) & rule, **kwargs)
class ServiceTools(object):
diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py
index 68a69b6..480d00e 100644
--- a/ATRI/utils/__init__.py
+++ b/ATRI/utils/__init__.py
@@ -55,9 +55,9 @@ class ListDealer:
return self.lst
-class CoolqCodeChecker:
+class MessageChecker:
"""
- 检查所传回的cq码是否存在被注入可能
+ 检查所传回的信息是否存在被注入可能
"""
tenc_gchat_url: str = "gchat.qpic.cn"
@@ -67,7 +67,7 @@ class CoolqCodeChecker:
self.text = text
@property
- def check(self) -> bool:
+ def check_cq_code(self) -> bool:
_type = re.findall(r"CQ:(.*?),", self.text)
for i in _type:
if i == "image":
@@ -83,6 +83,13 @@ class CoolqCodeChecker:
return True
else:
return True
+
+ @property
+ def check_image_url(self) -> bool:
+ if self.tenc_gchat_url not in self.text:
+ return False
+ else:
+ return True
class FileDealer:
diff --git a/ATRI/utils/apscheduler.py b/ATRI/utils/apscheduler.py
index 2680f19..daf37f4 100644
--- a/ATRI/utils/apscheduler.py
+++ b/ATRI/utils/apscheduler.py
@@ -1,9 +1,10 @@
-# Fork from: https://github.com/nonebot/plugin-apscheduler
-
+"""
+Fork from: https://github.com/nonebot/plugin-apscheduler
+"""
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
-from nonebot import get_driver, export
+from nonebot import get_driver
from nonebot.log import logger, LoguruHandler
@@ -13,7 +14,6 @@ apscheduler_config: dict = {"apscheduler.timezone": "Asia/Shanghai"}
driver = get_driver()
scheduler = AsyncIOScheduler()
-export().scheduler = scheduler
async def _start_scheduler():
diff --git a/ATRI/utils/request.py b/ATRI/utils/request.py
index 2cfce85..da0cdd4 100644
--- a/ATRI/utils/request.py
+++ b/ATRI/utils/request.py
@@ -2,16 +2,17 @@ import httpx
from ATRI.config import BotSelfConfig
-proxy = BotSelfConfig.proxy
-if not proxy:
+if not BotSelfConfig.proxy:
proxy = dict()
+else:
+ proxy = {"all://": BotSelfConfig.proxy}
async def get(url: str, **kwargs):
- async with httpx.AsyncClient(proxies=proxy) as client:
+ async with httpx.AsyncClient(proxies=proxy) as client: # type: ignore
return await client.get(url, **kwargs)
async def post(url: str, **kwargs):
- async with httpx.AsyncClient(proxies=proxy) as client:
+ async with httpx.AsyncClient(proxies=proxy) as client: # type: ignore
return await client.post(url, **kwargs)
diff --git a/changelog.md b/changelog.md
index 47e5578..5c6b518 100644
--- a/changelog.md
+++ b/changelog.md
@@ -1,24 +1,91 @@
-> 此处仅为记录新功能更新,修复 BUG/以及其它 请关注[`GitHub commits`](https://github.com/Kyomotoi/ATRI/commits/main)
+> 此处仅为记录重大更新,修复 BUG/以及其它 请关注[`GitHub commits`](https://github.com/Kyomotoi/ATRI/commits/main)
+
+## Comming soon...
+- Next: `YHN-001-A06`
+
+---
+
+## Jan 31, 2022
+
+> 新年快乐!
+
+- 更新版本至: `YHN-001-A05`
+- 新增:
+ - 全面适配 `NoneBot v2.0.0 beta1`
+ - 插件:[`broadcast`](user/plugin-broadcast.md)
+ - 现在可以向机器人所在群推送同一消息
+ - 插件:[`essential`](user/plugin-essential.md)
+ - 现在可以接受、拒绝撤回消息推送
+ - 内置 [`gocqhttp`](https://github.com/mnixry/nonebot-plugin-gocqhttp)
+ - 现在可以连同机器人一同启动了
+ - 同时包含了可视化gocq控制台
+ - 可快捷、快速添加新账号
+ - 生产环境资源可视化
+ - [单元测试](/test),加快了机器人开发上线前自检的速度
+ - `config.yml` 新增以下内容:
+ ```yaml
+ InlineGoCQHTTP:
+ accounts: # 可多个账号,具体请参考文档
+ - uin: 1234567890
+ password: ""
+ protocol: 3
+
+ download_version: "latest"
+
+ Setu:
+ reverse_proxy: true # 请参考文档
+ reverse_proxy_domain: "i.pixiv.re"
+ ```
+ - `config` 中 `proxy` 真正意义上生效
+- 其它:
+ - 移除插件: `cause`
+ - 原因:过于无趣,遭大多用户反对,且源API已失效,不想再替换新的第三方API
+
+---
## Oct 24, 2021
+
+- 更新版本至: `YHN-001-A04`
- 新增:
- - nsfw检测(主动/被动)
+ - nsfw检测(主动/被动)又名 `涩图嗅探`
+ - 可选代理
+- 修复:
+ - plugin/chat 在 nb2-a14+ 版本 finish 内为空时会报错
+- 其他:
+ - 对定时任务进行中文命名
+- 移除:
+ - 前端界面(赶工而导致成品炸裂)
+
+---
## Jul 31, 2021
-- 新增:
+
+- 新增:
- 前端(单主页)
- 和维护者贴贴w
+---
+
## Jul 8, 2021
-- 更新版本至:YHN-001-A03,请关注commit:[传送](https://github.com/Kyomotoi/ATRI/commit/be2747e4d4b820ca0f1f988d3b77a628da26fe7b)
+
+- 更新版本至: `YHN-001-A03`,请关注commit: [传送](https://github.com/Kyomotoi/ATRI/commit/be2747e4d4b820ca0f1f988d3b77a628da26fe7b)
+
+---
## May 4, 2021
-- 新增:
+
+- 新增:
- 涩图
- 群老婆!
+---
+
## Apr 11, 2021
+
- 正式重构完成
+---
+
## 更早记录
+
- 请自行查看[`GitHub commits`](https://github.com/Kyomotoi/ATRI/commits/main)
diff --git a/config.yml b/config.yml
index afe9fde..0ff3f14 100644
--- a/config.yml
+++ b/config.yml
@@ -7,7 +7,19 @@ BotSelfConfig:
command_start: ["", "/"]
command_sep: ["."]
session_expire_timeout: 60
- proxy: ""
+ proxy: "" # 请参考文档
+
+InlineGoCQHTTP:
+ accounts: # 可多个账号,具体请参考文档
+ - uin: 1234567890
+ password: ""
+ protocol: 3
+
+ download_version: "latest"
SauceNAO:
key: ""
+
+Setu:
+ reverse_proxy: true # 请参考文档
+ reverse_proxy_domain: "i.pixiv.re"
diff --git a/main.py b/main.py
index a1dfbd2..d06cd84 100644
--- a/main.py
+++ b/main.py
@@ -6,4 +6,4 @@ ATRI.init()
app = ATRI.asgi()
if __name__ == "__main__":
- ATRI.run("main:app")
+ ATRI.run()
diff --git a/pyproject.toml b/pyproject.toml
index 56dbf61..3f573e2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -15,6 +15,9 @@ python = "^3.8"
[tool.poetry.dev-dependencies]
black = "^21.4-beta.0"
+[tool.pytest.ini_options]
+addopts = '-p no:warnings'
+
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
diff --git a/requirements.txt b/requirements.txt
index bef0a64..af70f71 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,10 @@
-aiohttp>=3.6.2
aiofiles>=0.6.0
APScheduler>=3.7.0
Pillow>=8.1.1
-nonebot-adapter-cqhttp>=2.0.0a14
-nonebot-plugin-test>=0.2.0
-nonebot2>=2.0.0a13.post1
+nonebot2>=2.0.0b1
+nonebot-adapter-onebot>=2.0.0b1
+nonebug>=0.2.0
+nonebot-plugin-gocqhttp>=0.3.2
psutil>=5.7.2
pathlib>=1.0.1
pytz>=2020.1
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/__init__.py
diff --git a/test/test_plugin_anime_search.py b/test/test_plugin_anime_search.py
new file mode 100644
index 0000000..2782ba9
--- /dev/null
+++ b/test/test_plugin_anime_search.py
@@ -0,0 +1,32 @@
+import pytest
+from nonebug import App
+
+from nonebot.adapters.onebot.v11 import MessageSegment
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_saucenao(app: App):
+ from ATRI.plugins.saucenao import saucenao
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(saucenao) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("以图搜图")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "图呢?", True)
+
+ msg = Message(
+ MessageSegment.image(
+ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png"
+ )
+ )
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "失败了...", False)
diff --git a/test/test_plugin_chat.py b/test/test_plugin_chat.py
new file mode 100644
index 0000000..e04f1d3
--- /dev/null
+++ b/test/test_plugin_chat.py
@@ -0,0 +1,84 @@
+import pytest
+from nonebug import App
+from nonebot.adapters.onebot.v11 import MessageSegment
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_chat(app: App):
+ from ATRI.plugins.chat import chat
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(chat) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("爱你")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True)
+
+
+async def test_my_name_is(app: App):
+ from ATRI.plugins.chat import my_name_is
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(my_name_is) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("叫我")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True)
+
+ msg = Message("欧尼酱")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True)
+
+
+async def test_say(app: App):
+ from ATRI.plugins.chat import say
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(say) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("说")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "想要咱复读啥呢...", True)
+
+ msg = Message("nya~")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "nya~", True)
+
+ async with app.test_matcher(say) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("说")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "想要咱复读啥呢...", True)
+
+ msg = Message(
+ MessageSegment.image(
+ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png"
+ )
+ )
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "不要...", True)
diff --git a/test/test_plugin_code_runner.py b/test/test_plugin_code_runner.py
new file mode 100644
index 0000000..584ebb0
--- /dev/null
+++ b/test/test_plugin_code_runner.py
@@ -0,0 +1,75 @@
+import pytest
+from nonebug import App
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_code_runner(app: App):
+ from ATRI.plugins.code_runner import code_runner
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(code_runner) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/code")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "请键入 /code help 以获取帮助~!", True)
+
+ async with app.test_matcher(code_runner) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/code help")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ /code {语言}
+ {代码}
+ For example:
+ /code python
+ print('hello world')
+ """,
+ True,
+ )
+
+ async with app.test_matcher(code_runner) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/code list")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ 咱现在支持的语言如下:
+ assembly, bash, c, clojure,
+ coffeescript, cpp, csharp,
+ erlang, fsharp, go, groovy,
+ haskell, java, javascript,
+ julia, kotlin, lua, perl,
+ php, python, ruby, rust,
+ scala, swift, typescript
+ """,
+ True,
+ )
+
+ async with app.test_matcher(code_runner) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message(
+ """
+ /code python
+ print("hello world")
+ """
+ )
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "stdout:\nhello world", True)
diff --git a/test/test_plugin_funny.py b/test/test_plugin_funny.py
new file mode 100644
index 0000000..851a922
--- /dev/null
+++ b/test/test_plugin_funny.py
@@ -0,0 +1,117 @@
+import pytest
+from nonebug import App
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_get_laugh(app: App):
+ from ATRI.plugins.funny import get_laugh
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(get_laugh) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("来句笑话")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "哈 哈 哈", True)
+
+
+async def test_me_re_you(app: App):
+ from ATRI.plugins.funny import me_re_you
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(me_re_you) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("超市我")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "超市你", True)
+
+
+async def test_fake_msg(app: App):
+ from ATRI.plugins.funny import fake_msg
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(fake_msg) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/fakemsg")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开", True)
+
+ msg = Message("114514")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "内容格式错误,请检查(", True)
+
+ async with app.test_matcher(fake_msg) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/fakemsg")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开", True)
+
+ msg = Message("114514-0w0-testing")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ "[{'type': 'node', 'data': {'name': '0w0', 'uin': '114514', 'content': 'testing'}}]",
+ True,
+ )
+
+ async with app.test_matcher(fake_msg) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/fakemsg")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开", True)
+
+ msg = Message("114514-0w0-testing")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "构造失败惹...可能是被制裁了(", True)
+
+ async with app.test_matcher(fake_msg) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/fakemsg")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "慢...慢一..点❤", True)
+
+
+async def test_eat_what(app: App):
+ from ATRI.plugins.funny import eat_what
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(eat_what) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("今天吃什么")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "欧尼酱的智商低下想不到今天要吃甚么,所以由我来选择,我给的答案是(串烧)。选我正解!!", True)
diff --git a/test/test_plugin_help.py b/test/test_plugin_help.py
new file mode 100644
index 0000000..e107150
--- /dev/null
+++ b/test/test_plugin_help.py
@@ -0,0 +1,154 @@
+import pytest
+from nonebug import App
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_main_help(app: App):
+ from ATRI.plugins.help import main_help
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(main_help) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("菜单")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ 哦呀?~需要帮助?
+ 关于 -查看bot基本信息
+ 服务列表 -以查看所有可用服务
+ 帮助 [服务] -以查看对应服务帮助
+ Tip: 均需要at触发。@bot 菜单 以打开此页面
+ """,
+ True,
+ )
+
+
+async def test_about_me(app: App):
+ from ATRI import __version__
+ from ATRI.config import BotSelfConfig
+ from ATRI.plugins.help import about_me
+
+ temp_list = list()
+ for i in BotSelfConfig.nickname:
+ temp_list.append(i)
+ nickname = "、".join(map(str, temp_list))
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(about_me) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("关于")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ f"""
+ 唔...是来认识咱的么
+ 可以称呼咱:{nickname}
+ 咱的型号是:{__version__}
+ 想进一步了解:
+ https://github.com/Kyomotoi/ATRI
+ """,
+ True,
+ )
+
+
+async def test_service_list(app: App):
+ import os
+
+ from ATRI.service import SERVICES_DIR
+ from ATRI.plugins.help import service_list
+
+ files = os.listdir(SERVICES_DIR)
+ temp_list = list()
+ for i in files:
+ service = i.replace(".json", "")
+ temp_list.append(service)
+
+ services = "、".join(map(str, temp_list))
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(service_list) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("服务列表")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ f"""
+ 咱搭载了以下服务~
+ {services}
+ @bot 帮助 [服务] -以查看对应服务帮助
+ """,
+ True,
+ )
+
+
+async def test_service_info(app: App):
+ from ATRI.plugins.help import service_info
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(service_info) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("帮助")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "请检查是否输入错误...", True)
+
+ async with app.test_matcher(service_info) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("帮助 状态")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ 服务名:状态
+ 说明:
+ 检查咱自身状态
+
+ 可用命令:
+ /ping、/status
+ 是否全局启用:True
+ Tip: 帮助 [服务] [命令] 以查看对应命令详细信息
+ """,
+ True,
+ )
+
+ async with app.test_matcher(service_info) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("帮助 状态 /ping")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ 命令:/ping
+ 类型:command
+ 说明:检测bot简单信息处理速度
+ 更多触发方式:[]
+ """,
+ True,
+ )
diff --git a/test/test_plugin_manage.py b/test/test_plugin_manage.py
new file mode 100644
index 0000000..8f3042a
--- /dev/null
+++ b/test/test_plugin_manage.py
@@ -0,0 +1,293 @@
+import pytest
+from nonebug import App
+
+from ATRI.config import RUNTIME_CONFIG
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_block_user(app: App):
+ from ATRI.plugins.manage import block_user
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(block_user) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("封禁用户")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "哪位?GKD!", True)
+
+ msg = Message("114514")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "用户 114514 危!", True)
+
+
+async def test_unblock_user(app: App):
+ from ATRI.plugins.manage import unblock_user
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(unblock_user) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("解封用户")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "哪位?GKD!", True)
+
+ msg = Message("114514")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "好欸! 114514 重获新生!", True)
+
+
+async def test_block_group(app: App):
+ from ATRI.plugins.manage import block_group
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(block_group) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("封禁群")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "哪位?GKD!", True)
+
+ msg = Message("114514")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "群 114514 危!", True)
+
+
+async def test_unblock_group(app: App):
+ from ATRI.plugins.manage import unblock_group
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(unblock_group) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("解封群")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "哪个群?GKD!", True)
+
+ msg = Message("114514")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "好欸! 114514 重获新生!", True)
+
+
+async def test_global_block_service(app: App):
+ from ATRI.plugins.manage import global_block_service
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(global_block_service) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("全局封禁")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "阿...是哪个服务呢", True)
+
+ msg = Message("状态")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "服务 状态 已被禁用", True)
+
+
+async def test_global_unblock_service(app: App):
+ from ATRI.plugins.manage import global_unblock_service
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(global_unblock_service) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("全局启用")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "阿...是哪个服务呢", True)
+
+ msg = Message("状态")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "服务 状态 已启用", True)
+
+
+async def test_user_block_service(app: App):
+ from ATRI.plugins.manage import user_block_service
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(user_block_service) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("对用户114514禁用状态")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "完成~已禁止用户 114514 使用 状态", True)
+
+
+async def test_user_unblock_service(app: App):
+ from ATRI.plugins.manage import user_unblock_service
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(user_unblock_service) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("对用户114514启用状态")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "完成~已允许用户 114514 使用 状态", True)
+
+
+async def test_group_block_service(app: App):
+ from ATRI.plugins.manage import group_block_service
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(group_block_service) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("禁用")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "阿...是哪个服务呢", True)
+
+ msg = Message("状态")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "完成!~已禁止本群使用服务:状态", True)
+
+
+async def test_group_unblock_service(app: App):
+ from ATRI.plugins.manage import group_unblock_service
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(group_unblock_service) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("启用")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "阿...是哪个服务呢", True)
+
+ msg = Message("状态")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "完成!~已允许本群使用服务:状态", True)
+
+
+async def test_get_friend_add_list(app: App):
+ from ATRI.plugins.manage import get_friend_add_list
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(get_friend_add_list) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("获取好友申请")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ 申请人ID | 申请信息 | 申请码
+ Tip: 使用 同意/拒绝好友 [申请码] 以决定
+ """,
+ True,
+ )
+
+
+# @pytest.mark.asyncio
+# async def test_approve_friend_add(app: App):
+# from ATRI.plugins.manage import approve_friend_add
+
+# Message = make_fake_message()
+
+# async with app.test_matcher(approve_friend_add) as ctx:
+# bot = ctx.create_bot()
+
+# msg = Message("同意好友")
+# event = make_fake_event(_message=msg)()
+
+# ctx.receive_event(bot, event)
+# ctx.should_call_send(event, "申请码GKD!", True)
+
+# msg = Message()
+
+
+async def test_get_group_invite_list(app: App):
+ from ATRI.plugins.manage import get_group_invite_list
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(get_group_invite_list) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("获取邀请列表")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ 申请人ID | 申请信息 | 申请码
+ Tip: 使用 同意/拒绝邀请 [申请码] 以决定
+ """,
+ True,
+ )
+
+
+async def test_track_error(app: App):
+ from ATRI.plugins.manage import track_error
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(track_error) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("/track")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "请检查ID是否正确...", True)
diff --git a/test/test_plugin_rich.py b/test/test_plugin_rich.py
new file mode 100644
index 0000000..3e63f4a
--- /dev/null
+++ b/test/test_plugin_rich.py
@@ -0,0 +1,28 @@
+import pytest
+from nonebug import App
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_bili_rich(app: App):
+ from ATRI.plugins.rich import bili_rich
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(bili_rich) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("BV1Ff4y1C7YR")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(
+ event,
+ """
+ BV1Ff4y1C7YR INFO:
+ Title: 【8K30fps】这可能是画质最高的Rick Roll (doge)
+ Link: https://b23.tv/BV1Ff4y1C7YR
+ """,
+ True,
+ )
diff --git a/test/test_plugin_saucenao.py b/test/test_plugin_saucenao.py
new file mode 100644
index 0000000..2782ba9
--- /dev/null
+++ b/test/test_plugin_saucenao.py
@@ -0,0 +1,32 @@
+import pytest
+from nonebug import App
+
+from nonebot.adapters.onebot.v11 import MessageSegment
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_saucenao(app: App):
+ from ATRI.plugins.saucenao import saucenao
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(saucenao) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("以图搜图")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "图呢?", True)
+
+ msg = Message(
+ MessageSegment.image(
+ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png"
+ )
+ )
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "失败了...", False)
diff --git a/test/test_plugin_setu.py b/test/test_plugin_setu.py
new file mode 100644
index 0000000..b5a7ff2
--- /dev/null
+++ b/test/test_plugin_setu.py
@@ -0,0 +1,20 @@
+import pytest
+from nonebug import App
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_random_setu(app: App):
+ from ATRI.plugins.setu import random_setu
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(random_setu) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("来张涩图")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "hso(发不出", False)
diff --git a/test/utils.py b/test/utils.py
new file mode 100644
index 0000000..70d1a27
--- /dev/null
+++ b/test/utils.py
@@ -0,0 +1,87 @@
+"""
+Fork from: https://github.com/nonebot/nonebot2/blob/master/tests/utils.py
+"""
+from typing import TYPE_CHECKING, Type, Optional
+
+from pydantic import create_model
+
+if TYPE_CHECKING:
+ from nonebot.adapters import Event, Message
+
+
+def make_fake_message() -> Type["Message"]:
+ from nonebot.adapters import Message, MessageSegment
+
+ class FakeMessageSegment(MessageSegment):
+ @classmethod
+ def get_message_class(cls):
+ return FakeMessage
+
+ def __str__(self) -> str:
+ return self.data["text"] if self.type == "text" else f"[fake:{self.type}]"
+
+ @classmethod
+ def text(cls, text: str):
+ return cls("text", {"text": text})
+
+ @classmethod
+ def image(cls, url: str):
+ return cls("image", {"url": url})
+
+ def is_text(self) -> bool:
+ return self.type == "text"
+
+ class FakeMessage(Message):
+ @classmethod
+ def get_segment_class(cls):
+ return FakeMessageSegment
+
+ @staticmethod
+ def _construct(msg: str):
+ yield FakeMessageSegment.text(msg)
+
+ return FakeMessage
+
+
+def make_fake_event(
+ _type: str = "message",
+ _name: str = "test",
+ _description: str = "test",
+ _user_id: str = "test",
+ _session_id: str = "test",
+ _message: Optional["Message"] = None,
+ _to_me: bool = True,
+ **fields,
+) -> Type["Event"]:
+ from nonebot.adapters import Event
+
+ _Fake = create_model("_Fake", __base__=Event, **fields)
+
+ class FakeEvent(_Fake):
+ def get_type(self) -> str:
+ return _type
+
+ def get_event_name(self) -> str:
+ return _name
+
+ def get_event_description(self) -> str:
+ return _description
+
+ def get_user_id(self) -> str:
+ return _user_id
+
+ def get_session_id(self) -> str:
+ return _session_id
+
+ def get_message(self) -> "Message":
+ if _message is not None:
+ return _message
+ raise NotImplementedError
+
+ def is_tome(self) -> bool:
+ return _to_me
+
+ class Config:
+ extra = "forbid"
+
+ return FakeEvent