diff options
Diffstat (limited to 'ATRI/plugins/manage/__init__.py')
-rw-r--r-- | ATRI/plugins/manage/__init__.py | 545 |
1 files changed, 176 insertions, 369 deletions
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py index 30d37c5..d32cb40 100644 --- a/ATRI/plugins/manage/__init__.py +++ b/ATRI/plugins/manage/__init__.py @@ -1,435 +1,242 @@ -import re +from datetime import datetime +from typing import Type, Callable +from asyncio import iscoroutinefunction from nonebot.matcher import Matcher from nonebot.params import ArgPlainText, CommandArg -from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent +from nonebot.adapters.onebot.v11 import ( + Bot, + Message, + MessageEvent, + FriendRequestEvent, + GroupRequestEvent, +) from ATRI.rule import to_bot from ATRI.service import Service from ATRI.message import MessageBuilder from ATRI.permission import MASTER, ADMIN -from .data_source import Manage +from .models import RequestInfo +from .data_source import BotManager from .plugin import NonebotPluginManager -plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER) - - -block_user = plugin.on_command("封禁用户", "对目标用户进行封禁") - - -@block_user.handle() -async def _ready_block_user(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("block_user", args) - - -@block_user.got("block_user", "哪位?GKD!") -async def _deal_block_user(user_id: str = ArgPlainText("block_user")): - quit_list = ["算了", "罢了"] - if user_id in quit_list: - await block_user.finish("...看来有人逃过一劫呢") - - is_ok = Manage().block_user(user_id) - if not is_ok: - await block_user.finish("kuso!封禁失败了...") - - await block_user.finish(f"用户 {user_id} 危!") - - -unblock_user = plugin.on_command("解封用户", "对目标用户进行解封") - - -@unblock_user.handle() -async def _ready_unblock_user(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("unblock_user", args) - - -@unblock_user.got("unblock_user", "哪位?GKD!") -async def _deal_unblock_user(user_id: str = ArgPlainText("unblock_user")): - quit_list = ["算了", "罢了"] - if user_id in quit_list: - await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") - - is_ok = Manage().unblock_user(user_id) - if not is_ok: - await unblock_user.finish("kuso!解封失败了...") - - await unblock_user.finish(f"好欸!{user_id} 重获新生!") - - -block_group = plugin.on_command("封禁群", "对目标群进行封禁") - - -@block_group.handle() -async def _ready_block_group(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("block_group", args) - - -@block_group.got("block_group", "哪个群?GKD!") -async def _deal_block_group(group_id: str = ArgPlainText("block_group")): - quit_list = ["算了", "罢了"] - if group_id in quit_list: - await block_group.finish("...看来有一群逃过一劫呢") - - is_ok = Manage().block_group(group_id) - if not is_ok: - await block_group.finish("kuso!封禁失败了...") - - await block_group.finish(f"群 {group_id} 危!") - +_QUIT_ARGS = ["算了", "罢了"] -unblock_group = plugin.on_command("解封群", "对目标群进行解封") - -@unblock_group.handle() -async def _ready_unblock_group(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("unblock_group", args) - - -@unblock_group.got("unblock_group", "哪个群?GKD!") -async def _deal_unblock_group(group_id: str = ArgPlainText("unblock_group")): - quit_list = ["算了", "罢了"] - if group_id in quit_list: - await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") - - is_ok = Manage().unblock_group(group_id) - if not is_ok: - await unblock_group.finish("kuso!解封失败了...") - - await unblock_group.finish(f"好欸!群 {group_id} 重获新生!") - - -global_block_service = plugin.on_command("全局禁用", "全局禁用某服务") +def handle_command( + plugin: Type[Matcher], + func: Callable, + success_msg: str, + fail_msg: str = "操作 {} 失败...原因:\n{}", +): + @plugin.handle() + async def handle_command(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("target", args) + + @plugin.got("target", "要操作的目标是?") + async def handle_target(event: MessageEvent, target: str = ArgPlainText("target")): + if target in _QUIT_ARGS: + await plugin.finish("好吧") + + try: + func_argcount = func.__code__.co_argcount + if iscoroutinefunction(func): + if func_argcount == 3: + result = await func(target, event) + else: + result = await func(target) + else: + if func_argcount == 3: + result = func(target, event) + else: + result = func(target) + msg = success_msg.format(target) + if type(result) == bool: + msg += "启用" if result else "禁用" + await plugin.send(msg.format(target)) + except Exception as e: + error_msg = str(e) + await plugin.send(fail_msg.format(target, error_msg)) -@global_block_service.handle() -async def _ready_block_service(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("global_block_service", args) +plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER) -@global_block_service.got("global_block_service", "阿...是哪个服务呢") -async def _deal_global_block_service( - block_service: str = ArgPlainText("global_block_service"), -): - quit_list = ["算了", "罢了"] - if block_service in quit_list: - await global_block_service.finish("好吧...") +block_user = plugin.on_command("封禁用户", "阻止目标用户使用bot") +handle_command(block_user, BotManager().block_user, "用户 {} 危!") - is_ok = Manage().control_global_service(block_service, False) - if not is_ok: - await global_block_service.finish("kuso!禁用失败了...") - await global_block_service.finish(f"服务 {block_service} 已被禁用") +unblock_user = plugin.on_command("解封用户", "对被阻止的用户解封") +handle_command(unblock_user, BotManager().unblock_user, "用户 {} 已解封") -global_unblock_service = plugin.on_command("全局启用", "全局启用某服务") +block_group = plugin.on_command("封禁群", "阻止目标群所有人使用bot") +handle_command(block_group, BotManager().block_group, "群 {} 危!") -@global_unblock_service.handle() -async def _ready_unblock_service( - matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("global_unblock_service", args) +unblock_group = plugin.on_command("解封群", "对被阻止的群解封") +handle_command(unblock_group, BotManager().unblock_group, "群 {} 已解封") -@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") -async def _deal_global_unblock_service( - unblock_service: str = ArgPlainText("global_unblock_service"), -): - quit_list = ["算了", "罢了"] - if unblock_service in quit_list: - await global_unblock_service.finish("好吧...") +toggle_global_service = plugin.on_command("全局控制", "全局禁用/启用某一服务") +handle_command( + toggle_global_service, + BotManager().toggle_global_service, + "服务 {} 已全局", +) - is_ok = Manage().control_global_service(unblock_service, True) - if not is_ok: - await global_unblock_service.finish("kuso!启用服务失败了...") - await global_unblock_service.finish(f"服务 {unblock_service} 已启用") +toggle_user_service = plugin.on_command("用户控制", "针对单一用户禁用/启用某一服务") +handle_command( + toggle_user_service, + BotManager().toggle_user_service, + "服务 {} 已针对该用户", +) -user_block_service = plugin.on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户(qid)禁用服务") +toggle_group_service = plugin.on_command("控制", "针对所在群禁用/启用某一服务", permission=ADMIN) +handle_command( + toggle_group_service, + BotManager().toggle_group_service, + "服务 {} 已针对本群", +) -@user_block_service.handle() -async def _user_block_service(event: MessageEvent): - msg = str(event.message).strip() - pattern = r"对用户(.*?)禁用(.*)" - reg = re.findall(pattern, msg)[0] - aim_user = reg[0] - aim_service = reg[1] +track = plugin.on_command("追踪", "根据ID获取对应报错信息", aliases={"/track"}) +handle_command( + track, + BotManager().track_error, + "{}", +) - is_ok = Manage().control_user_service(aim_service, aim_user, False) - if not is_ok: - await user_block_service.finish("禁用失败...请检查服务名是否正确") - await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}") +apply_friend_req = plugin.on_command("同意好友", "根据申请码同意对应好友申请") +handle_command(apply_friend_req, BotManager().apply_friend_req, "已同意该申请") -user_unblock_service = plugin.on_regex(r"对用户(.*?)启用(.*)", "针对某一用户(qid)启用服务") +reject_friend_req = plugin.on_command("拒绝好友", "根据申请码拒绝对应好友申请") +handle_command(reject_friend_req, BotManager().reject_friend_req, "已拒绝该申请") -@user_unblock_service.handle() -async def _user_unblock_service(event: MessageEvent): - msg = str(event.message).strip() - pattern = r"对用户(.*?)启用(.*)" - reg = re.findall(pattern, msg)[0] - aim_user = reg[0] - aim_service = reg[1] - is_ok = Manage().control_user_service(aim_service, aim_user, True) - if not is_ok: - await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中") - await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}") +apply_group_req = plugin.on_command("同意邀请", "根据申请码同意对应群邀请") +handle_command(apply_group_req, BotManager().apply_group_req, "已同意该邀请") -group_block_service = plugin.on_command("禁用", "针对所在群禁用某服务", permission=ADMIN) +reject_group_req = plugin.on_command("拒绝邀请", "根据申请码拒绝对应群邀请") +handle_command(reject_group_req, BotManager().reject_group_req, "已拒绝该邀请") -@group_block_service.handle() -async def _ready_group_block_service( - matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() -): - msg = str(event.message).strip() - if msg: - matcher.set_arg("group_block_service", args) +friend_req = plugin.on_request("好友申请", "好友申请检测") -@group_block_service.got("group_block_service", "阿...是哪个服务呢") -async def _deal_group_block_service( - event: GroupMessageEvent, aim_service: str = ArgPlainText("group_block_service") -): - group_id = str(event.group_id) - quit_list = ["算了", "罢了"] - if aim_service in quit_list: - await group_block_service.finish("好吧...") +@friend_req.handle() +async def _(event: FriendRequestEvent): + apply_code = event.flag + user_id = event.get_user_id() + apply_comment = event.comment + now_time = str(datetime.now().timestamp()) - is_ok = Manage().control_group_service(aim_service, group_id, False) - if not is_ok: - await group_block_service.finish("禁用失败...请检查服务名是否输入正确") - await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}") + data = await BotManager().load_friend_req() + data[apply_code] = RequestInfo( + user_id=user_id, + comment=apply_comment, + time=now_time, + ) + await BotManager().store_friend_req(data) + + result = ( + MessageBuilder("咱收到一条好友请求!") + .text(f"请求人:{user_id}") + .text(f"申请信息:{apply_comment}") + .text(f"申请码:{apply_code}") + .text("Tip:好友申请列表") + ) + await plugin.send_to_master(result) -group_unblock_service = plugin.on_command("启用", "针对所在群启用某服务", permission=ADMIN) +group_req = plugin.on_request("应邀入群", "应邀入群检测") -@group_unblock_service.handle() -async def _ready_group_unblock_service( - matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() -): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("group_unblock_service", args) +@group_req.handle() +async def _(event: GroupRequestEvent): + if event.sub_type != "invite": + return + apply_code = event.flag + target_group = event.group_id + user_id = event.get_user_id() + apply_comment = event.comment + now_time = str(datetime.now().timestamp()) -@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") -async def _deal_group_unblock_service( - event: GroupMessageEvent, aim_service: str = ArgPlainText("group_unblock_service") -): - group_id = str(event.group_id) - quit_list = ["算了", "罢了"] - if aim_service in quit_list: - await group_unblock_service.finish("好吧...") + data = await BotManager().load_group_req() + data[apply_code] = RequestInfo( + user_id=user_id, + comment=apply_comment + f"(目标群{target_group})", + time=now_time, + ) + await BotManager().store_group_req(data) + + result = ( + MessageBuilder("咱收到一条应邀入群请求!") + .text(f"申请人:{user_id}") + .text(f"申请信息:{apply_comment}") + .text(f"申请码:{apply_code}") + .text(f"目标群:{target_group}") + .text("Tip:群邀请列表") + ) + await plugin.send_to_master(result) - is_ok = Manage().control_group_service(aim_service, group_id, True) - if not is_ok: - await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中") - await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}") +get_friend_req_list = plugin.on_command("好友申请列表", "获取好友申请列表") -get_friend_add_list = plugin.on_command("好友申请列表", "获取好友申请列表") +@get_friend_req_list.handle() +async def _(): + data = await BotManager().load_friend_req() + if not data: + await get_friend_req_list.finish("当前没有申请") -@get_friend_add_list.handle() -async def _get_friend_add_list(): - data = Manage().load_friend_apply_list() - temp_list = list() + cache_list = list() for i in data: apply_code = i - apply_user = data[i]["user_id"] - apply_comment = data[i]["comment"] - temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" - temp_list.append(temp_msg) - - msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) - msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" - await get_friend_add_list.finish(msg1) - - -approve_friend_add = plugin.on_command("同意好友", "同意好友申请") - - -@approve_friend_add.handle() -async def _ready_approve_friend_add( - matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("approve_friend_add", args) - - -@approve_friend_add.got("approve_friend_add", "申请码GKD!") -async def _deal_approve_friend_add( - bot: Bot, apply_code: str = ArgPlainText("approve_friend_add") -): - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await approve_friend_add.finish("好吧...") - - try: - await bot.set_friend_add_request(flag=apply_code, approve=True) - except Exception: - await approve_friend_add.finish("同意失败...尝试下手动?") - data = Manage().load_friend_apply_list() - if apply_code not in data: - await approve_friend_add.reject("申请码不存在...请检查是否输入正确") - data.pop(apply_code) - Manage().save_friend_apply_list(data) - await approve_friend_add.finish("好欸!申请已通过!") - - -refuse_friend_add = plugin.on_command("拒绝好友", "拒绝好友申请") - - -@refuse_friend_add.handle() -async def _ready_refuse_friend_add( - matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("refuse_friend_add", args) + apply_user = data[i].user_id + apply_comment = data[i].comment + cache_list.append(f"{apply_user} | {apply_comment} | {apply_code}") + + result = ( + "申请人ID | 申请信息 | 申请码\n" + + "\n".join(map(str, cache_list)) + + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" + ) + await get_friend_req_list.finish(result) -@refuse_friend_add.got("refuse_friend_add", "申请码GKD!") -async def _deal_refuse_friend_add( - bot: Bot, apply_code: str = ArgPlainText("refuse_friend_add") -): - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await refuse_friend_add.finish("好吧...") - - try: - await bot.set_friend_add_request(flag=apply_code, approve=False) - except Exception: - await refuse_friend_add.finish("拒绝失败...尝试下手动?") - data = Manage().load_friend_apply_list() - data.pop(apply_code) - Manage().save_friend_apply_list(data) - await refuse_friend_add.finish("已拒绝!") +get_group_req_list = plugin.on_command("群邀请列表", "获取群邀请列表") -get_group_invite_list = plugin.on_command("应邀入群列表", "获取群邀请列表") +@get_group_req_list.handle() +async def _(): + data = await BotManager().load_group_req() + if not data: + await get_group_req_list.finish("当前没有申请") - -@get_group_invite_list.handle() -async def _get_group_invite_list(): - data = Manage().load_invite_apply_list() - temp_list = list() + cache_list = list() for i in data: apply_code = i - apply_user = data[i]["user_id"] - apply_comment = data[i]["comment"] - temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" - temp_list.append(temp_msg) - - msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) - msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" - await get_friend_add_list.finish(msg1) - - -approve_group_invite = plugin.on_command("同意邀请", "同意群聊邀请") - - -@approve_group_invite.handle() -async def _ready_approve_group_invite( - matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("approve_group_invite", args) - - -@approve_group_invite.got("approve_group_invite", "申请码GKD!") -async def _deal_approve_group_invite( - bot: Bot, apply_code: str = ArgPlainText("approve_group_invite") -): - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await approve_group_invite.finish("好吧...") - - try: - await bot.set_group_add_request( - flag=apply_code, sub_type="invite", approve=True - ) - except Exception: - await approve_group_invite.finish("同意失败...尝试下手动?") - data = Manage().load_invite_apply_list() - data.pop(apply_code) - Manage().save_invite_apply_list(data) - await approve_group_invite.finish("好欸!申请已通过!") - - -refuse_group_invite = plugin.on_command("拒绝邀请", "拒绝群聊邀请") - - -@refuse_group_invite.handle() -async def _ready_refuse_group_invite( - matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("refuse_group_invite", args) - - -@refuse_group_invite.got("refuse_group_invite", "申请码GKD!") -async def _deal_refuse_group_invite( - bot: Bot, apply_code: str = ArgPlainText("refuse_group_invite") -): - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await refuse_group_invite.finish("好吧...") - - try: - await bot.set_group_add_request( - flag=apply_code, sub_type="invite", approve=False - ) - except Exception: - await refuse_group_invite.finish("拒绝失败...(可能是小群免验证)尝试下手动?") - data = Manage().load_invite_apply_list() - data.pop(apply_code) - Manage().save_invite_apply_list(data) - await refuse_group_invite.finish("已拒绝!") - - -track_error = plugin.on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) - - -@track_error.handle() -async def _track_error(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("track_code", args) - - -@track_error.got("track_code", "报错码 速速") -async def _(track_code: str = ArgPlainText("track_code")): - quit_list = ["算了", "罢了"] - if track_code in quit_list: - await track_error.finish("好吧...") - - repo = await Manage().track_error(track_code) - await track_error.finish(repo) + apply_user = data[i].user_id + apply_comment = data[i].comment + cache_list.append(f"{apply_user} | {apply_comment} | {apply_code}") + + result = ( + "申请人ID | 申请信息 | 申请码\n" + + "\n".join(map(str, cache_list)) + + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" + ) + await get_group_req_list.finish(result) recall_msg = plugin.on_command("撤回", "撤回bot已发送的信息", to_bot()) @@ -506,7 +313,7 @@ upgrade_nonebot_plugin = plugin.on_command( @upgrade_nonebot_plugin.handle() async def _(event: MessageEvent): - result = NonebotPluginManager.upgrade_plugin() + result = NonebotPluginManager().upgrade_plugin() if not result: await upgrade_nonebot_plugin.finish("当前没有插件可更新...") @@ -521,7 +328,7 @@ from .listener import init_listener driver().on_startup(init_listener) driver().on_startup(NonebotPluginManager().get_store_list) -driver().on_startup(NonebotPluginManager.load_plugin) +driver().on_startup(NonebotPluginManager().load_plugin) scheduler.scheduled_job( "interval", name="Nonebot 商店刷新", |