summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2023-05-09 17:04:55 +0800
committerKyomotoi <[email protected]>2023-05-09 17:04:55 +0800
commitc3cddcf2ce87631824d31739f1cc9d2c52efacc8 (patch)
tree01cac456cd36dc2325bffbc09cdb3268d713c7b9
parent6b3766c65af44154eae26ecc0a0b3a7bca9b2ea3 (diff)
downloadATRI-c3cddcf2ce87631824d31739f1cc9d2c52efacc8.tar.gz
ATRI-c3cddcf2ce87631824d31739f1cc9d2c52efacc8.tar.bz2
ATRI-c3cddcf2ce87631824d31739f1cc9d2c52efacc8.zip
♻️ 重构代码, 迁移功能
-rw-r--r--ATRI/plugins/essential/__init__.py71
-rw-r--r--ATRI/plugins/essential/models.py7
-rw-r--r--ATRI/plugins/manage/__init__.py545
-rw-r--r--ATRI/plugins/manage/data_source.py371
-rw-r--r--ATRI/plugins/manage/models.py6
-rw-r--r--ATRI/plugins/manage/plugin.py14
6 files changed, 350 insertions, 664 deletions
diff --git a/ATRI/plugins/essential/__init__.py b/ATRI/plugins/essential/__init__.py
index 883c191..096eb54 100644
--- a/ATRI/plugins/essential/__init__.py
+++ b/ATRI/plugins/essential/__init__.py
@@ -22,11 +22,10 @@ from ATRI import conf
from ATRI.log import log
from ATRI.service import Service
from ATRI.utils.apscheduler import scheduler
-from ATRI.utils import FileDealer, MessageChecker
+from ATRI.utils import MessageChecker
from ATRI.permission import MASTER
from ATRI.message import MessageBuilder
-from .models import RequestInfo
from .data_source import recall_msg_dealer
@@ -41,74 +40,6 @@ __TEMP_DIR.mkdir(parents=True, exist_ok=True)
plugin = Service("基础部件").document("对基础请求进行处理")
-friend_add_event = plugin.on_request("好友申请", "好友申请检测")
-
-
-@friend_add_event.handle()
-async def _(event: FriendRequestEvent):
- path = __ESSENTIAL_DIR / "friend_add.json"
- file = FileDealer(path)
- if not path.is_file():
- await file.write_json(dict())
- data = dict()
-
- apply_code = event.flag
- user_id = event.get_user_id()
- apply_comment = event.comment
- now_time = str(datetime.now().timestamp())
-
- data = file.json()
- data[apply_code] = RequestInfo(
- user_id=user_id,
- comment=apply_comment,
- time=now_time,
- ).dict()
- await file.write_json(data)
-
- result = (
- MessageBuilder("咱收到一条好友请求!")
- .text(f"请求人: {user_id}")
- .text(f"申请信息: {apply_comment}")
- .text(f"申请码: {apply_code}")
- .text("Tip: 好友申请列表")
- )
- await plugin.send_to_master(result)
-
-
-group_invite_request = plugin.on_request("应邀入群", "应邀入群检测")
-
-
-@group_invite_request.handle()
-async def _(event: GroupRequestEvent):
- path = __ESSENTIAL_DIR / "group_invite.json"
- file = FileDealer(path)
- if not path.is_file():
- await file.write_json(dict())
- data = dict()
-
- apply_code = event.flag
- user_id = event.get_user_id()
- apply_comment = event.comment
- now_time = str(datetime.now().timestamp())
-
- data = file.json()
- data[apply_code] = RequestInfo(
- user_id=user_id,
- comment=apply_comment,
- time=now_time,
- ).dict()
- await file.write_json(data)
-
- result = (
- MessageBuilder("咱收到一条应邀入群请求!")
- .text(f"申请人: {user_id}")
- .text(f"申请信息: {apply_comment}")
- .text(f"申请码: {apply_code}")
- .text("Tip: 应邀入群列表")
- )
- await plugin.send_to_master(result)
-
-
group_member_event = plugin.on_notice("群成员变动", "群成员变动检测")
diff --git a/ATRI/plugins/essential/models.py b/ATRI/plugins/essential/models.py
deleted file mode 100644
index dd2a801..0000000
--- a/ATRI/plugins/essential/models.py
+++ /dev/null
@@ -1,7 +0,0 @@
-from pydantic import BaseModel
-
-
-class RequestInfo(BaseModel):
- user_id: str
- comment: str
- time: str
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py
index 30d37c5..d32cb40 100644
--- a/ATRI/plugins/manage/__init__.py
+++ b/ATRI/plugins/manage/__init__.py
@@ -1,435 +1,242 @@
-import re
+from datetime import datetime
+from typing import Type, Callable
+from asyncio import iscoroutinefunction
from nonebot.matcher import Matcher
from nonebot.params import ArgPlainText, CommandArg
-from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent
+from nonebot.adapters.onebot.v11 import (
+ Bot,
+ Message,
+ MessageEvent,
+ FriendRequestEvent,
+ GroupRequestEvent,
+)
from ATRI.rule import to_bot
from ATRI.service import Service
from ATRI.message import MessageBuilder
from ATRI.permission import MASTER, ADMIN
-from .data_source import Manage
+from .models import RequestInfo
+from .data_source import BotManager
from .plugin import NonebotPluginManager
-plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER)
-
-
-block_user = plugin.on_command("封禁用户", "对目标用户进行封禁")
-
-
-@block_user.handle()
-async def _ready_block_user(matcher: Matcher, args: Message = CommandArg()):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("block_user", args)
-
-
-@block_user.got("block_user", "哪位?GKD!")
-async def _deal_block_user(user_id: str = ArgPlainText("block_user")):
- quit_list = ["算了", "罢了"]
- if user_id in quit_list:
- await block_user.finish("...看来有人逃过一劫呢")
-
- is_ok = Manage().block_user(user_id)
- if not is_ok:
- await block_user.finish("kuso!封禁失败了...")
-
- await block_user.finish(f"用户 {user_id} 危!")
-
-
-unblock_user = plugin.on_command("解封用户", "对目标用户进行解封")
-
-
-@unblock_user.handle()
-async def _ready_unblock_user(matcher: Matcher, args: Message = CommandArg()):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("unblock_user", args)
-
-
-@unblock_user.got("unblock_user", "哪位?GKD!")
-async def _deal_unblock_user(user_id: str = ArgPlainText("unblock_user")):
- quit_list = ["算了", "罢了"]
- if user_id in quit_list:
- await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了")
-
- is_ok = Manage().unblock_user(user_id)
- if not is_ok:
- await unblock_user.finish("kuso!解封失败了...")
-
- await unblock_user.finish(f"好欸!{user_id} 重获新生!")
-
-
-block_group = plugin.on_command("封禁群", "对目标群进行封禁")
-
-
-@block_group.handle()
-async def _ready_block_group(matcher: Matcher, args: Message = CommandArg()):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("block_group", args)
-
-
-@block_group.got("block_group", "哪个群?GKD!")
-async def _deal_block_group(group_id: str = ArgPlainText("block_group")):
- quit_list = ["算了", "罢了"]
- if group_id in quit_list:
- await block_group.finish("...看来有一群逃过一劫呢")
-
- is_ok = Manage().block_group(group_id)
- if not is_ok:
- await block_group.finish("kuso!封禁失败了...")
-
- await block_group.finish(f"群 {group_id} 危!")
-
+_QUIT_ARGS = ["算了", "罢了"]
-unblock_group = plugin.on_command("解封群", "对目标群进行解封")
-
-@unblock_group.handle()
-async def _ready_unblock_group(matcher: Matcher, args: Message = CommandArg()):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("unblock_group", args)
-
-
-@unblock_group.got("unblock_group", "哪个群?GKD!")
-async def _deal_unblock_group(group_id: str = ArgPlainText("unblock_group")):
- quit_list = ["算了", "罢了"]
- if group_id in quit_list:
- await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了")
-
- is_ok = Manage().unblock_group(group_id)
- if not is_ok:
- await unblock_group.finish("kuso!解封失败了...")
-
- await unblock_group.finish(f"好欸!群 {group_id} 重获新生!")
-
-
-global_block_service = plugin.on_command("全局禁用", "全局禁用某服务")
+def handle_command(
+ plugin: Type[Matcher],
+ func: Callable,
+ success_msg: str,
+ fail_msg: str = "操作 {} 失败...原因:\n{}",
+):
+ @plugin.handle()
+ async def handle_command(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("target", args)
+
+ @plugin.got("target", "要操作的目标是?")
+ async def handle_target(event: MessageEvent, target: str = ArgPlainText("target")):
+ if target in _QUIT_ARGS:
+ await plugin.finish("好吧")
+
+ try:
+ func_argcount = func.__code__.co_argcount
+ if iscoroutinefunction(func):
+ if func_argcount == 3:
+ result = await func(target, event)
+ else:
+ result = await func(target)
+ else:
+ if func_argcount == 3:
+ result = func(target, event)
+ else:
+ result = func(target)
+ msg = success_msg.format(target)
+ if type(result) == bool:
+ msg += "启用" if result else "禁用"
+ await plugin.send(msg.format(target))
+ except Exception as e:
+ error_msg = str(e)
+ await plugin.send(fail_msg.format(target, error_msg))
-@global_block_service.handle()
-async def _ready_block_service(matcher: Matcher, args: Message = CommandArg()):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("global_block_service", args)
+plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER)
-@global_block_service.got("global_block_service", "阿...是哪个服务呢")
-async def _deal_global_block_service(
- block_service: str = ArgPlainText("global_block_service"),
-):
- quit_list = ["算了", "罢了"]
- if block_service in quit_list:
- await global_block_service.finish("好吧...")
+block_user = plugin.on_command("封禁用户", "阻止目标用户使用bot")
+handle_command(block_user, BotManager().block_user, "用户 {} 危!")
- is_ok = Manage().control_global_service(block_service, False)
- if not is_ok:
- await global_block_service.finish("kuso!禁用失败了...")
- await global_block_service.finish(f"服务 {block_service} 已被禁用")
+unblock_user = plugin.on_command("解封用户", "对被阻止的用户解封")
+handle_command(unblock_user, BotManager().unblock_user, "用户 {} 已解封")
-global_unblock_service = plugin.on_command("全局启用", "全局启用某服务")
+block_group = plugin.on_command("封禁群", "阻止目标群所有人使用bot")
+handle_command(block_group, BotManager().block_group, "群 {} 危!")
-@global_unblock_service.handle()
-async def _ready_unblock_service(
- matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
-):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("global_unblock_service", args)
+unblock_group = plugin.on_command("解封群", "对被阻止的群解封")
+handle_command(unblock_group, BotManager().unblock_group, "群 {} 已解封")
-@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢")
-async def _deal_global_unblock_service(
- unblock_service: str = ArgPlainText("global_unblock_service"),
-):
- quit_list = ["算了", "罢了"]
- if unblock_service in quit_list:
- await global_unblock_service.finish("好吧...")
+toggle_global_service = plugin.on_command("全局控制", "全局禁用/启用某一服务")
+handle_command(
+ toggle_global_service,
+ BotManager().toggle_global_service,
+ "服务 {} 已全局",
+)
- is_ok = Manage().control_global_service(unblock_service, True)
- if not is_ok:
- await global_unblock_service.finish("kuso!启用服务失败了...")
- await global_unblock_service.finish(f"服务 {unblock_service} 已启用")
+toggle_user_service = plugin.on_command("用户控制", "针对单一用户禁用/启用某一服务")
+handle_command(
+ toggle_user_service,
+ BotManager().toggle_user_service,
+ "服务 {} 已针对该用户",
+)
-user_block_service = plugin.on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户(qid)禁用服务")
+toggle_group_service = plugin.on_command("控制", "针对所在群禁用/启用某一服务", permission=ADMIN)
+handle_command(
+ toggle_group_service,
+ BotManager().toggle_group_service,
+ "服务 {} 已针对本群",
+)
-@user_block_service.handle()
-async def _user_block_service(event: MessageEvent):
- msg = str(event.message).strip()
- pattern = r"对用户(.*?)禁用(.*)"
- reg = re.findall(pattern, msg)[0]
- aim_user = reg[0]
- aim_service = reg[1]
+track = plugin.on_command("追踪", "根据ID获取对应报错信息", aliases={"/track"})
+handle_command(
+ track,
+ BotManager().track_error,
+ "{}",
+)
- is_ok = Manage().control_user_service(aim_service, aim_user, False)
- if not is_ok:
- await user_block_service.finish("禁用失败...请检查服务名是否正确")
- await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}")
+apply_friend_req = plugin.on_command("同意好友", "根据申请码同意对应好友申请")
+handle_command(apply_friend_req, BotManager().apply_friend_req, "已同意该申请")
-user_unblock_service = plugin.on_regex(r"对用户(.*?)启用(.*)", "针对某一用户(qid)启用服务")
+reject_friend_req = plugin.on_command("拒绝好友", "根据申请码拒绝对应好友申请")
+handle_command(reject_friend_req, BotManager().reject_friend_req, "已拒绝该申请")
-@user_unblock_service.handle()
-async def _user_unblock_service(event: MessageEvent):
- msg = str(event.message).strip()
- pattern = r"对用户(.*?)启用(.*)"
- reg = re.findall(pattern, msg)[0]
- aim_user = reg[0]
- aim_service = reg[1]
- is_ok = Manage().control_user_service(aim_service, aim_user, True)
- if not is_ok:
- await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中")
- await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}")
+apply_group_req = plugin.on_command("同意邀请", "根据申请码同意对应群邀请")
+handle_command(apply_group_req, BotManager().apply_group_req, "已同意该邀请")
-group_block_service = plugin.on_command("禁用", "针对所在群禁用某服务", permission=ADMIN)
+reject_group_req = plugin.on_command("拒绝邀请", "根据申请码拒绝对应群邀请")
+handle_command(reject_group_req, BotManager().reject_group_req, "已拒绝该邀请")
-@group_block_service.handle()
-async def _ready_group_block_service(
- matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()
-):
- msg = str(event.message).strip()
- if msg:
- matcher.set_arg("group_block_service", args)
+friend_req = plugin.on_request("好友申请", "好友申请检测")
-@group_block_service.got("group_block_service", "阿...是哪个服务呢")
-async def _deal_group_block_service(
- event: GroupMessageEvent, aim_service: str = ArgPlainText("group_block_service")
-):
- group_id = str(event.group_id)
- quit_list = ["算了", "罢了"]
- if aim_service in quit_list:
- await group_block_service.finish("好吧...")
+@friend_req.handle()
+async def _(event: FriendRequestEvent):
+ apply_code = event.flag
+ user_id = event.get_user_id()
+ apply_comment = event.comment
+ now_time = str(datetime.now().timestamp())
- is_ok = Manage().control_group_service(aim_service, group_id, False)
- if not is_ok:
- await group_block_service.finish("禁用失败...请检查服务名是否输入正确")
- await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}")
+ data = await BotManager().load_friend_req()
+ data[apply_code] = RequestInfo(
+ user_id=user_id,
+ comment=apply_comment,
+ time=now_time,
+ )
+ await BotManager().store_friend_req(data)
+
+ result = (
+ MessageBuilder("咱收到一条好友请求!")
+ .text(f"请求人:{user_id}")
+ .text(f"申请信息:{apply_comment}")
+ .text(f"申请码:{apply_code}")
+ .text("Tip:好友申请列表")
+ )
+ await plugin.send_to_master(result)
-group_unblock_service = plugin.on_command("启用", "针对所在群启用某服务", permission=ADMIN)
+group_req = plugin.on_request("应邀入群", "应邀入群检测")
-@group_unblock_service.handle()
-async def _ready_group_unblock_service(
- matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()
-):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("group_unblock_service", args)
+@group_req.handle()
+async def _(event: GroupRequestEvent):
+ if event.sub_type != "invite":
+ return
+ apply_code = event.flag
+ target_group = event.group_id
+ user_id = event.get_user_id()
+ apply_comment = event.comment
+ now_time = str(datetime.now().timestamp())
-@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢")
-async def _deal_group_unblock_service(
- event: GroupMessageEvent, aim_service: str = ArgPlainText("group_unblock_service")
-):
- group_id = str(event.group_id)
- quit_list = ["算了", "罢了"]
- if aim_service in quit_list:
- await group_unblock_service.finish("好吧...")
+ data = await BotManager().load_group_req()
+ data[apply_code] = RequestInfo(
+ user_id=user_id,
+ comment=apply_comment + f"(目标群{target_group})",
+ time=now_time,
+ )
+ await BotManager().store_group_req(data)
+
+ result = (
+ MessageBuilder("咱收到一条应邀入群请求!")
+ .text(f"申请人:{user_id}")
+ .text(f"申请信息:{apply_comment}")
+ .text(f"申请码:{apply_code}")
+ .text(f"目标群:{target_group}")
+ .text("Tip:群邀请列表")
+ )
+ await plugin.send_to_master(result)
- is_ok = Manage().control_group_service(aim_service, group_id, True)
- if not is_ok:
- await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中")
- await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}")
+get_friend_req_list = plugin.on_command("好友申请列表", "获取好友申请列表")
-get_friend_add_list = plugin.on_command("好友申请列表", "获取好友申请列表")
+@get_friend_req_list.handle()
+async def _():
+ data = await BotManager().load_friend_req()
+ if not data:
+ await get_friend_req_list.finish("当前没有申请")
-@get_friend_add_list.handle()
-async def _get_friend_add_list():
- data = Manage().load_friend_apply_list()
- temp_list = list()
+ cache_list = list()
for i in data:
apply_code = i
- apply_user = data[i]["user_id"]
- apply_comment = data[i]["comment"]
- temp_msg = f"{apply_user} | {apply_comment} | {apply_code}"
- temp_list.append(temp_msg)
-
- msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list))
- msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定"
- await get_friend_add_list.finish(msg1)
-
-
-approve_friend_add = plugin.on_command("同意好友", "同意好友申请")
-
-
-@approve_friend_add.handle()
-async def _ready_approve_friend_add(
- matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
-):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("approve_friend_add", args)
-
-
-@approve_friend_add.got("approve_friend_add", "申请码GKD!")
-async def _deal_approve_friend_add(
- bot: Bot, apply_code: str = ArgPlainText("approve_friend_add")
-):
- quit_list = ["算了", "罢了"]
- if apply_code in quit_list:
- await approve_friend_add.finish("好吧...")
-
- try:
- await bot.set_friend_add_request(flag=apply_code, approve=True)
- except Exception:
- await approve_friend_add.finish("同意失败...尝试下手动?")
- data = Manage().load_friend_apply_list()
- if apply_code not in data:
- await approve_friend_add.reject("申请码不存在...请检查是否输入正确")
- data.pop(apply_code)
- Manage().save_friend_apply_list(data)
- await approve_friend_add.finish("好欸!申请已通过!")
-
-
-refuse_friend_add = plugin.on_command("拒绝好友", "拒绝好友申请")
-
-
-@refuse_friend_add.handle()
-async def _ready_refuse_friend_add(
- matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
-):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("refuse_friend_add", args)
+ apply_user = data[i].user_id
+ apply_comment = data[i].comment
+ cache_list.append(f"{apply_user} | {apply_comment} | {apply_code}")
+
+ result = (
+ "申请人ID | 申请信息 | 申请码\n"
+ + "\n".join(map(str, cache_list))
+ + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定"
+ )
+ await get_friend_req_list.finish(result)
-@refuse_friend_add.got("refuse_friend_add", "申请码GKD!")
-async def _deal_refuse_friend_add(
- bot: Bot, apply_code: str = ArgPlainText("refuse_friend_add")
-):
- quit_list = ["算了", "罢了"]
- if apply_code in quit_list:
- await refuse_friend_add.finish("好吧...")
-
- try:
- await bot.set_friend_add_request(flag=apply_code, approve=False)
- except Exception:
- await refuse_friend_add.finish("拒绝失败...尝试下手动?")
- data = Manage().load_friend_apply_list()
- data.pop(apply_code)
- Manage().save_friend_apply_list(data)
- await refuse_friend_add.finish("已拒绝!")
+get_group_req_list = plugin.on_command("群邀请列表", "获取群邀请列表")
-get_group_invite_list = plugin.on_command("应邀入群列表", "获取群邀请列表")
+@get_group_req_list.handle()
+async def _():
+ data = await BotManager().load_group_req()
+ if not data:
+ await get_group_req_list.finish("当前没有申请")
-
-@get_group_invite_list.handle()
-async def _get_group_invite_list():
- data = Manage().load_invite_apply_list()
- temp_list = list()
+ cache_list = list()
for i in data:
apply_code = i
- apply_user = data[i]["user_id"]
- apply_comment = data[i]["comment"]
- temp_msg = f"{apply_user} | {apply_comment} | {apply_code}"
- temp_list.append(temp_msg)
-
- msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list))
- msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定"
- await get_friend_add_list.finish(msg1)
-
-
-approve_group_invite = plugin.on_command("同意邀请", "同意群聊邀请")
-
-
-@approve_group_invite.handle()
-async def _ready_approve_group_invite(
- matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
-):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("approve_group_invite", args)
-
-
-@approve_group_invite.got("approve_group_invite", "申请码GKD!")
-async def _deal_approve_group_invite(
- bot: Bot, apply_code: str = ArgPlainText("approve_group_invite")
-):
- quit_list = ["算了", "罢了"]
- if apply_code in quit_list:
- await approve_group_invite.finish("好吧...")
-
- try:
- await bot.set_group_add_request(
- flag=apply_code, sub_type="invite", approve=True
- )
- except Exception:
- await approve_group_invite.finish("同意失败...尝试下手动?")
- data = Manage().load_invite_apply_list()
- data.pop(apply_code)
- Manage().save_invite_apply_list(data)
- await approve_group_invite.finish("好欸!申请已通过!")
-
-
-refuse_group_invite = plugin.on_command("拒绝邀请", "拒绝群聊邀请")
-
-
-@refuse_group_invite.handle()
-async def _ready_refuse_group_invite(
- matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
-):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("refuse_group_invite", args)
-
-
-@refuse_group_invite.got("refuse_group_invite", "申请码GKD!")
-async def _deal_refuse_group_invite(
- bot: Bot, apply_code: str = ArgPlainText("refuse_group_invite")
-):
- quit_list = ["算了", "罢了"]
- if apply_code in quit_list:
- await refuse_group_invite.finish("好吧...")
-
- try:
- await bot.set_group_add_request(
- flag=apply_code, sub_type="invite", approve=False
- )
- except Exception:
- await refuse_group_invite.finish("拒绝失败...(可能是小群免验证)尝试下手动?")
- data = Manage().load_invite_apply_list()
- data.pop(apply_code)
- Manage().save_invite_apply_list(data)
- await refuse_group_invite.finish("已拒绝!")
-
-
-track_error = plugin.on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"})
-
-
-@track_error.handle()
-async def _track_error(matcher: Matcher, args: Message = CommandArg()):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("track_code", args)
-
-
-@track_error.got("track_code", "报错码 速速")
-async def _(track_code: str = ArgPlainText("track_code")):
- quit_list = ["算了", "罢了"]
- if track_code in quit_list:
- await track_error.finish("好吧...")
-
- repo = await Manage().track_error(track_code)
- await track_error.finish(repo)
+ apply_user = data[i].user_id
+ apply_comment = data[i].comment
+ cache_list.append(f"{apply_user} | {apply_comment} | {apply_code}")
+
+ result = (
+ "申请人ID | 申请信息 | 申请码\n"
+ + "\n".join(map(str, cache_list))
+ + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定"
+ )
+ await get_group_req_list.finish(result)
recall_msg = plugin.on_command("撤回", "撤回bot已发送的信息", to_bot())
@@ -506,7 +313,7 @@ upgrade_nonebot_plugin = plugin.on_command(
@upgrade_nonebot_plugin.handle()
async def _(event: MessageEvent):
- result = NonebotPluginManager.upgrade_plugin()
+ result = NonebotPluginManager().upgrade_plugin()
if not result:
await upgrade_nonebot_plugin.finish("当前没有插件可更新...")
@@ -521,7 +328,7 @@ from .listener import init_listener
driver().on_startup(init_listener)
driver().on_startup(NonebotPluginManager().get_store_list)
-driver().on_startup(NonebotPluginManager.load_plugin)
+driver().on_startup(NonebotPluginManager().load_plugin)
scheduler.scheduled_job(
"interval",
name="Nonebot 商店刷新",
diff --git a/ATRI/plugins/manage/data_source.py b/ATRI/plugins/manage/data_source.py
index ff344e7..37edefc 100644
--- a/ATRI/plugins/manage/data_source.py
+++ b/ATRI/plugins/manage/data_source.py
@@ -1,273 +1,224 @@
-import json
+from typing import Dict
from pathlib import Path
from datetime import datetime
+from nonebot import get_bot
+from nonebot.adapters import Bot
+from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent
+
+from ATRI.utils import FileDealer
from ATRI.service import ServiceTools
from ATRI.message import MessageBuilder
from ATRI.exceptions import load_error
+from .models import RequestInfo
+
-MANAGE_DIR = Path(".") / "data" / "plugins" / "manege"
-ESSENTIAL_DIR = Path(".") / "data" / "plugins" / "essential"
+MANAGE_DIR = Path(".") / "data" / "plugins" / "manage"
MANAGE_DIR.mkdir(parents=True, exist_ok=True)
-ESSENTIAL_DIR.mkdir(parents=True, exist_ok=True)
-_TRACK_BACK_FORMAT = (
- MessageBuilder("Track ID: {track_id}")
- .text("Prompt: {prompt}")
- .text("Time: {time}")
+_TRACEBACK_FORMAT = (
+ MessageBuilder("追踪ID:{trace_id}")
+ .text("关键词:{prompt}")
+ .text("时间:{time}")
.text("{content}")
.done()
)
-class Manage:
- @staticmethod
- def _load_block_user_list() -> dict:
- """
- 文件结构:
- {
- "Block user ID": {
- "time": "Block time"
- }
- }
- """
- file_name = "block_user.json"
+class BotManager:
+ async def __load_data(self, file_name: str) -> dict:
path = MANAGE_DIR / file_name
+ dealer = FileDealer(path)
if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
- return dict()
+ await dealer.write_json(dict())
+
try:
- data = json.loads(path.read_bytes())
+ data = dealer.json()
except Exception:
data = dict()
return data
- @staticmethod
- def _save_block_user_list(data: dict) -> None:
- file_name = "block_user.json"
+ async def __store_data(self, file_name: str, data: dict) -> None:
path = MANAGE_DIR / file_name
+ dealer = FileDealer(path)
if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
-
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(data, indent=4))
-
- @staticmethod
- def _load_block_group_list() -> dict:
- """
- 文件结构:
- {
- "Block group ID": {
- "time": "Block time"
- }
- }
- """
- file_name = "block_group.json"
- path = MANAGE_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
- return dict()
+ await dealer.write_json(dict())
- try:
- data = json.loads(path.read_bytes())
- except Exception:
- data = dict()
- return data
+ await dealer.write_json(data)
- @staticmethod
- def _save_block_group_list(data: dict) -> None:
- file_name = "block_group.json"
- path = MANAGE_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
+ async def __load_block_group(self) -> dict:
+ return await self.__load_data("block_group.json")
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(data, indent=4))
+ async def __store_block_group(self, data: dict) -> None:
+ await self.__store_data("block_group.json", data)
- @classmethod
- def block_user(cls, user_id: str) -> bool:
- data = cls._load_block_user_list()
- now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- data[user_id] = {"time": now_time}
- try:
- cls._save_block_user_list(data)
- return True
- except Exception:
- return False
+ async def __load_block_user(self) -> dict:
+ return await self.__load_data("block_user.json")
- @classmethod
- def unblock_user(cls, user_id: str) -> bool:
- data: dict = cls._load_block_user_list()
- if user_id not in data:
- return False
+ async def __store_block_user(self, data: dict) -> None:
+ await self.__store_data("block_user.json", data)
- try:
- data.pop(user_id)
- cls._save_block_user_list(data)
- return True
- except Exception:
- return False
+ async def load_friend_req(self) -> Dict[str, RequestInfo]:
+ return await self.__load_data("friend_add.json")
+
+ async def store_friend_req(self, data: dict) -> None:
+ await self.__store_data("friend_add.json", data)
+
+ async def load_group_req(self) -> Dict[str, RequestInfo]:
+ return await self.__load_data("group_invite.json")
+
+ async def store_group_req(self, data: dict) -> None:
+ await self.__store_data("group_invite.json", data)
- @classmethod
- def block_group(cls, group_id: str) -> bool:
- data = cls._load_block_group_list()
- now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- data[group_id] = {"time": now_time}
+ async def block_group(self, group_id: str) -> None:
+ data = await self.__load_block_group()
+ data[group_id] = {"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
try:
- cls._save_block_group_list(data)
- return True
+ await self.__store_block_group(data)
except Exception:
- return False
+ raise Exception("写入文件时失败")
- @classmethod
- def unblock_group(cls, group_id: str) -> bool:
- data: dict = cls._load_block_group_list()
+ async def unblock_group(self, group_id: str) -> None:
+ data = await self.__load_block_group()
if group_id not in data:
- return False
+ raise Exception("群不存在于封禁名单")
try:
data.pop(group_id)
- cls._save_block_group_list(data)
- return True
+ await self.__store_block_group(data)
except Exception:
- return False
+ raise Exception("写入文件时失败")
- @staticmethod
- def control_global_service(service: str, is_enabled: bool) -> bool:
- """
- Only SUPERUSER.
- """
+ async def block_user(self, user_id: str) -> None:
+ data = await self.__load_block_user()
+ data[user_id] = {"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
try:
- data = ServiceTools(service).load_service()
+ await self.__store_block_user(data)
except Exception:
- return False
- data.enabled = is_enabled
- ServiceTools(service).save_service(data.dict())
- return True
-
- @staticmethod
- def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool:
- """
- Only SUPERUSER.
- """
- try:
- data = ServiceTools(service).load_service()
- except Exception:
- return False
- temp_list: list = data.disable_user
-
- if is_enabled:
- try:
- temp_list.remove(user_id)
- except Exception:
- return False
- else:
- if user_id in temp_list:
- return True
-
- temp_list.append(user_id)
+ raise Exception("写入文件时失败")
- data.disable_user = temp_list
- ServiceTools(service).save_service(data.dict())
- return True
+ async def unblock_user(self, user_id: str) -> None:
+ data = await self.__load_block_user()
+ if user_id not in data:
+ raise Exception("用户不存在于封禁名单")
- @staticmethod
- def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool:
- """
- SUPERUSER and GROUPADMIN or GROUPOWNER.
- Only current group.
- """
try:
- data = ServiceTools(service).load_service()
+ data.pop(user_id)
+ await self.__store_block_user(data)
except Exception:
- return False
- temp_list: list = data.disable_group
+ raise Exception("写入文件时失败")
- if is_enabled:
+ def toggle_global_service(self, service: str) -> bool:
+ serv = ServiceTools(service)
+ try:
+ data = serv.load_service()
+ except Exception as e:
+ error_msg = str(e)
+ raise Exception(error_msg)
+
+ data.enabled = not data.enabled
+ serv.save_service(data)
+ return data.enabled
+
+ def toggle_group_service(self, service: str, event) -> bool:
+ if isinstance(event, GroupMessageEvent):
+ group_id = str(event.group_id)
+ serv = ServiceTools(service)
try:
- temp_list.remove(group_id)
- except Exception:
- return False
+ data = serv.load_service()
+ except Exception as e:
+ error_msg = str(e)
+ raise Exception(error_msg)
+
+ if group_id in data.disable_group:
+ data.disable_group.remove(group_id)
+ result = True
+ else:
+ data.disable_group.append(group_id)
+ result = False
+ serv.save_service(data)
+ return result
+ raise Exception("该功能只能在群聊中使用")
+
+ def toggle_user_service(self, service: str, event: MessageEvent) -> bool:
+ user_id = event.get_user_id()
+ serv = ServiceTools(service)
+ try:
+ data = serv.load_service()
+ except Exception as e:
+ error_msg = str(e)
+ raise Exception(error_msg)
+
+ if user_id in data.disable_user:
+ data.disable_user.remove(user_id)
+ result = True
else:
- if group_id in temp_list:
- return True
-
- temp_list.append(group_id)
-
- data.disable_group = temp_list
- ServiceTools(service).save_service(data.dict())
- return True
-
- @staticmethod
- def load_friend_apply_list() -> dict:
- file_name = "friend_add.json"
- path = ESSENTIAL_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
- return dict()
+ data.disable_user.append(user_id)
+ result = False
+ serv.save_service(data)
+ return result
+ async def track_error(self, trace_id: str) -> str:
try:
- data = json.loads(path.read_bytes())
+ data = load_error(trace_id)
except Exception:
- data = dict()
- return data
-
- @staticmethod
- def save_friend_apply_list(data: dict) -> None:
- file_name = "friend_add.json"
- path = ESSENTIAL_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
-
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(data, indent=4))
+ raise Exception("未找到对应ID的信息")
- @staticmethod
- def load_invite_apply_list() -> dict:
- file_name = "group_invite.json"
- path = ESSENTIAL_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
- return dict()
+ return _TRACEBACK_FORMAT.format(
+ trace_id=data.track_id,
+ prompt=data.prompt,
+ time=data.time,
+ content=data.content,
+ )
+ def __get_bot(self) -> Bot:
try:
- data = json.loads(path.read_bytes())
+ return get_bot()
except Exception:
- data = dict()
- return data
+ raise Exception("无法获取 bot 实例")
- @staticmethod
- def save_invite_apply_list(data: dict) -> None:
- file_name = "group_invite.json"
- path = ESSENTIAL_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
-
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(data, indent=4))
+ async def apply_friend_req(self, code: str) -> None:
+ bot = self.__get_bot()
+ try:
+ await bot.call_api("set_friend_add_request", flag=code, approve=True)
+ except Exception:
+ raise Exception("同意失败,请尝试手动同意")
+ data = await self.load_friend_req()
+ data.pop(code)
+ await self.store_friend_req(data)
- @staticmethod
- async def track_error(track_id: str) -> str:
+ async def reject_friend_req(self, code: str) -> None:
+ bot = self.__get_bot()
try:
- data = load_error(track_id)
+ await bot.call_api("set_friend_add_request", flag=code, approve=False)
except Exception:
- return "请检查ID是否正确..."
+ raise Exception("拒绝失败,请尝试手动拒绝")
+ data = await self.load_friend_req()
+ data.pop(code)
+ await self.store_friend_req(data)
- prompt = data.get("prompt", "ignore")
- time = data.get("time", "ignore")
- content = data.get("content", "ignore")
+ async def apply_group_req(self, code: str) -> None:
+ bot = self.__get_bot()
+ try:
+ await bot.call_api(
+ "set_group_add_request", flag=code, sub_type="invite", approve=True
+ )
+ except Exception:
+ raise Exception("同意失败,请尝试手动同意")
+ data = await self.load_group_req()
+ data.pop(code)
+ await self.store_group_req(data)
- repo = _TRACK_BACK_FORMAT.format(
- track_id=track_id, prompt=prompt, time=time, content=content
- )
- return repo
+ async def reject_group_req(self, code: str) -> None:
+ bot = self.__get_bot()
+ try:
+ await bot.call_api(
+ "set_group_add_request", flag=code, sub_type="invite", approve=False
+ )
+ except Exception:
+ raise Exception("拒绝失败,请尝试手动拒绝")
+ data = await self.load_group_req()
+ data.pop(code)
+ await self.store_group_req(data)
diff --git a/ATRI/plugins/manage/models.py b/ATRI/plugins/manage/models.py
index 90f8059..1ad8eaf 100644
--- a/ATRI/plugins/manage/models.py
+++ b/ATRI/plugins/manage/models.py
@@ -1,6 +1,12 @@
from pydantic import BaseModel
+class RequestInfo(BaseModel):
+ user_id: str
+ comment: str
+ time: str
+
+
class NonebotPluginInfo(BaseModel):
module_name: str
project_link: str
diff --git a/ATRI/plugins/manage/plugin.py b/ATRI/plugins/manage/plugin.py
index d46d1ef..f7d718b 100644
--- a/ATRI/plugins/manage/plugin.py
+++ b/ATRI/plugins/manage/plugin.py
@@ -33,7 +33,7 @@ class NonebotPluginManager:
return json.loads(self._conf_path.read_bytes())
- def revise_list(self, is_del: bool):
+ def revise_list(self, is_del: bool) -> None:
data = self.get_list()
if is_del:
if self._plugin_name in data:
@@ -49,7 +49,7 @@ class NonebotPluginManager:
self._plugin_name = plugin_name
return self
- async def get_store_list(self):
+ async def get_store_list(self) -> None:
global _plugin_list
if not _plugin_list:
@@ -105,9 +105,8 @@ class NonebotPluginManager:
return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json"
return "完成~! 将在下次重启生效"
- @classmethod
- def upgrade_plugin(cls) -> list:
- if not (plugin_list := cls.get_list(cls)):
+ def upgrade_plugin(self) -> list:
+ if not (plugin_list := self.get_list()):
return list()
succ_list = list()
@@ -121,8 +120,7 @@ class NonebotPluginManager:
return succ_list
- @classmethod
- def load_plugin(cls):
- plugin_list = cls.get_list(cls)
+ def load_plugin(self) -> None:
+ plugin_list = self.get_list()
for plugin in plugin_list:
nonebot.load_plugin(plugin)