diff options
Diffstat (limited to 'ATRI/plugins')
| -rw-r--r-- | ATRI/plugins/essential/__init__.py | 71 | ||||
| -rw-r--r-- | ATRI/plugins/essential/models.py | 7 | ||||
| -rw-r--r-- | ATRI/plugins/manage/__init__.py | 545 | ||||
| -rw-r--r-- | ATRI/plugins/manage/data_source.py | 371 | ||||
| -rw-r--r-- | ATRI/plugins/manage/models.py | 6 | ||||
| -rw-r--r-- | ATRI/plugins/manage/plugin.py | 14 | 
6 files changed, 350 insertions, 664 deletions
| diff --git a/ATRI/plugins/essential/__init__.py b/ATRI/plugins/essential/__init__.py index 883c191..096eb54 100644 --- a/ATRI/plugins/essential/__init__.py +++ b/ATRI/plugins/essential/__init__.py @@ -22,11 +22,10 @@ from ATRI import conf  from ATRI.log import log  from ATRI.service import Service  from ATRI.utils.apscheduler import scheduler -from ATRI.utils import FileDealer, MessageChecker +from ATRI.utils import MessageChecker  from ATRI.permission import MASTER  from ATRI.message import MessageBuilder -from .models import RequestInfo  from .data_source import recall_msg_dealer @@ -41,74 +40,6 @@ __TEMP_DIR.mkdir(parents=True, exist_ok=True)  plugin = Service("基础部件").document("对基础请求进行处理") -friend_add_event = plugin.on_request("好友申请", "好友申请检测") - - -@friend_add_event.handle() -async def _(event: FriendRequestEvent): -    path = __ESSENTIAL_DIR / "friend_add.json" -    file = FileDealer(path) -    if not path.is_file(): -        await file.write_json(dict()) -        data = dict() - -    apply_code = event.flag -    user_id = event.get_user_id() -    apply_comment = event.comment -    now_time = str(datetime.now().timestamp()) - -    data = file.json() -    data[apply_code] = RequestInfo( -        user_id=user_id, -        comment=apply_comment, -        time=now_time, -    ).dict() -    await file.write_json(data) - -    result = ( -        MessageBuilder("咱收到一条好友请求!") -        .text(f"请求人: {user_id}") -        .text(f"申请信息: {apply_comment}") -        .text(f"申请码: {apply_code}") -        .text("Tip: 好友申请列表") -    ) -    await plugin.send_to_master(result) - - -group_invite_request = plugin.on_request("应邀入群", "应邀入群检测") - - -@group_invite_request.handle() -async def _(event: GroupRequestEvent): -    path = __ESSENTIAL_DIR / "group_invite.json" -    file = FileDealer(path) -    if not path.is_file(): -        await file.write_json(dict()) -        data = dict() - -    apply_code = event.flag -    user_id = event.get_user_id() -    apply_comment = event.comment -    now_time = str(datetime.now().timestamp()) - -    data = file.json() -    data[apply_code] = RequestInfo( -        user_id=user_id, -        comment=apply_comment, -        time=now_time, -    ).dict() -    await file.write_json(data) - -    result = ( -        MessageBuilder("咱收到一条应邀入群请求!") -        .text(f"申请人: {user_id}") -        .text(f"申请信息: {apply_comment}") -        .text(f"申请码: {apply_code}") -        .text("Tip: 应邀入群列表") -    ) -    await plugin.send_to_master(result) - -  group_member_event = plugin.on_notice("群成员变动", "群成员变动检测") diff --git a/ATRI/plugins/essential/models.py b/ATRI/plugins/essential/models.py deleted file mode 100644 index dd2a801..0000000 --- a/ATRI/plugins/essential/models.py +++ /dev/null @@ -1,7 +0,0 @@ -from pydantic import BaseModel - - -class RequestInfo(BaseModel): -    user_id: str -    comment: str -    time: str diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py index 30d37c5..d32cb40 100644 --- a/ATRI/plugins/manage/__init__.py +++ b/ATRI/plugins/manage/__init__.py @@ -1,435 +1,242 @@ -import re +from datetime import datetime +from typing import Type, Callable +from asyncio import iscoroutinefunction  from nonebot.matcher import Matcher  from nonebot.params import ArgPlainText, CommandArg -from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent +from nonebot.adapters.onebot.v11 import ( +    Bot, +    Message, +    MessageEvent, +    FriendRequestEvent, +    GroupRequestEvent, +)  from ATRI.rule import to_bot  from ATRI.service import Service  from ATRI.message import MessageBuilder  from ATRI.permission import MASTER, ADMIN -from .data_source import Manage +from .models import RequestInfo +from .data_source import BotManager  from .plugin import NonebotPluginManager -plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER) - - -block_user = plugin.on_command("封禁用户", "对目标用户进行封禁") - - -@block_user.handle() -async def _ready_block_user(matcher: Matcher, args: Message = CommandArg()): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("block_user", args) - - -@block_user.got("block_user", "哪位?GKD!") -async def _deal_block_user(user_id: str = ArgPlainText("block_user")): -    quit_list = ["算了", "罢了"] -    if user_id in quit_list: -        await block_user.finish("...看来有人逃过一劫呢") - -    is_ok = Manage().block_user(user_id) -    if not is_ok: -        await block_user.finish("kuso!封禁失败了...") - -    await block_user.finish(f"用户 {user_id} 危!") - - -unblock_user = plugin.on_command("解封用户", "对目标用户进行解封") - - -@unblock_user.handle() -async def _ready_unblock_user(matcher: Matcher, args: Message = CommandArg()): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("unblock_user", args) - - -@unblock_user.got("unblock_user", "哪位?GKD!") -async def _deal_unblock_user(user_id: str = ArgPlainText("unblock_user")): -    quit_list = ["算了", "罢了"] -    if user_id in quit_list: -        await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") - -    is_ok = Manage().unblock_user(user_id) -    if not is_ok: -        await unblock_user.finish("kuso!解封失败了...") - -    await unblock_user.finish(f"好欸!{user_id} 重获新生!") - - -block_group = plugin.on_command("封禁群", "对目标群进行封禁") - - -@block_group.handle() -async def _ready_block_group(matcher: Matcher, args: Message = CommandArg()): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("block_group", args) - - -@block_group.got("block_group", "哪个群?GKD!") -async def _deal_block_group(group_id: str = ArgPlainText("block_group")): -    quit_list = ["算了", "罢了"] -    if group_id in quit_list: -        await block_group.finish("...看来有一群逃过一劫呢") - -    is_ok = Manage().block_group(group_id) -    if not is_ok: -        await block_group.finish("kuso!封禁失败了...") - -    await block_group.finish(f"群 {group_id} 危!") - +_QUIT_ARGS = ["算了", "罢了"] -unblock_group = plugin.on_command("解封群", "对目标群进行解封") - -@unblock_group.handle() -async def _ready_unblock_group(matcher: Matcher, args: Message = CommandArg()): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("unblock_group", args) - - -@unblock_group.got("unblock_group", "哪个群?GKD!") -async def _deal_unblock_group(group_id: str = ArgPlainText("unblock_group")): -    quit_list = ["算了", "罢了"] -    if group_id in quit_list: -        await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") - -    is_ok = Manage().unblock_group(group_id) -    if not is_ok: -        await unblock_group.finish("kuso!解封失败了...") - -    await unblock_group.finish(f"好欸!群 {group_id} 重获新生!") - - -global_block_service = plugin.on_command("全局禁用", "全局禁用某服务") +def handle_command( +    plugin: Type[Matcher], +    func: Callable, +    success_msg: str, +    fail_msg: str = "操作 {} 失败...原因:\n{}", +): +    @plugin.handle() +    async def handle_command(matcher: Matcher, args: Message = CommandArg()): +        msg = args.extract_plain_text() +        if msg: +            matcher.set_arg("target", args) + +    @plugin.got("target", "要操作的目标是?") +    async def handle_target(event: MessageEvent, target: str = ArgPlainText("target")): +        if target in _QUIT_ARGS: +            await plugin.finish("好吧") + +        try: +            func_argcount = func.__code__.co_argcount +            if iscoroutinefunction(func): +                if func_argcount == 3: +                    result = await func(target, event) +                else: +                    result = await func(target) +            else: +                if func_argcount == 3: +                    result = func(target, event) +                else: +                    result = func(target) +            msg = success_msg.format(target) +            if type(result) == bool: +                msg += "启用" if result else "禁用" +            await plugin.send(msg.format(target)) +        except Exception as e: +            error_msg = str(e) +            await plugin.send(fail_msg.format(target, error_msg)) -@global_block_service.handle() -async def _ready_block_service(matcher: Matcher, args: Message = CommandArg()): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("global_block_service", args) +plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER) -@global_block_service.got("global_block_service", "阿...是哪个服务呢") -async def _deal_global_block_service( -    block_service: str = ArgPlainText("global_block_service"), -): -    quit_list = ["算了", "罢了"] -    if block_service in quit_list: -        await global_block_service.finish("好吧...") +block_user = plugin.on_command("封禁用户", "阻止目标用户使用bot") +handle_command(block_user, BotManager().block_user, "用户 {} 危!") -    is_ok = Manage().control_global_service(block_service, False) -    if not is_ok: -        await global_block_service.finish("kuso!禁用失败了...") -    await global_block_service.finish(f"服务 {block_service} 已被禁用") +unblock_user = plugin.on_command("解封用户", "对被阻止的用户解封") +handle_command(unblock_user, BotManager().unblock_user, "用户 {} 已解封") -global_unblock_service = plugin.on_command("全局启用", "全局启用某服务") +block_group = plugin.on_command("封禁群", "阻止目标群所有人使用bot") +handle_command(block_group, BotManager().block_group, "群 {} 危!") -@global_unblock_service.handle() -async def _ready_unblock_service( -    matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("global_unblock_service", args) +unblock_group = plugin.on_command("解封群", "对被阻止的群解封") +handle_command(unblock_group, BotManager().unblock_group, "群 {} 已解封") -@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") -async def _deal_global_unblock_service( -    unblock_service: str = ArgPlainText("global_unblock_service"), -): -    quit_list = ["算了", "罢了"] -    if unblock_service in quit_list: -        await global_unblock_service.finish("好吧...") +toggle_global_service = plugin.on_command("全局控制", "全局禁用/启用某一服务") +handle_command( +    toggle_global_service, +    BotManager().toggle_global_service, +    "服务 {} 已全局", +) -    is_ok = Manage().control_global_service(unblock_service, True) -    if not is_ok: -        await global_unblock_service.finish("kuso!启用服务失败了...") -    await global_unblock_service.finish(f"服务 {unblock_service} 已启用") +toggle_user_service = plugin.on_command("用户控制", "针对单一用户禁用/启用某一服务") +handle_command( +    toggle_user_service, +    BotManager().toggle_user_service, +    "服务 {} 已针对该用户", +) -user_block_service = plugin.on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户(qid)禁用服务") +toggle_group_service = plugin.on_command("控制", "针对所在群禁用/启用某一服务", permission=ADMIN) +handle_command( +    toggle_group_service, +    BotManager().toggle_group_service, +    "服务 {} 已针对本群", +) -@user_block_service.handle() -async def _user_block_service(event: MessageEvent): -    msg = str(event.message).strip() -    pattern = r"对用户(.*?)禁用(.*)" -    reg = re.findall(pattern, msg)[0] -    aim_user = reg[0] -    aim_service = reg[1] +track = plugin.on_command("追踪", "根据ID获取对应报错信息", aliases={"/track"}) +handle_command( +    track, +    BotManager().track_error, +    "{}", +) -    is_ok = Manage().control_user_service(aim_service, aim_user, False) -    if not is_ok: -        await user_block_service.finish("禁用失败...请检查服务名是否正确") -    await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}") +apply_friend_req = plugin.on_command("同意好友", "根据申请码同意对应好友申请") +handle_command(apply_friend_req, BotManager().apply_friend_req, "已同意该申请") -user_unblock_service = plugin.on_regex(r"对用户(.*?)启用(.*)", "针对某一用户(qid)启用服务") +reject_friend_req = plugin.on_command("拒绝好友", "根据申请码拒绝对应好友申请") +handle_command(reject_friend_req, BotManager().reject_friend_req, "已拒绝该申请") -@user_unblock_service.handle() -async def _user_unblock_service(event: MessageEvent): -    msg = str(event.message).strip() -    pattern = r"对用户(.*?)启用(.*)" -    reg = re.findall(pattern, msg)[0] -    aim_user = reg[0] -    aim_service = reg[1] -    is_ok = Manage().control_user_service(aim_service, aim_user, True) -    if not is_ok: -        await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中") -    await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}") +apply_group_req = plugin.on_command("同意邀请", "根据申请码同意对应群邀请") +handle_command(apply_group_req, BotManager().apply_group_req, "已同意该邀请") -group_block_service = plugin.on_command("禁用", "针对所在群禁用某服务", permission=ADMIN) +reject_group_req = plugin.on_command("拒绝邀请", "根据申请码拒绝对应群邀请") +handle_command(reject_group_req, BotManager().reject_group_req, "已拒绝该邀请") -@group_block_service.handle() -async def _ready_group_block_service( -    matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() -): -    msg = str(event.message).strip() -    if msg: -        matcher.set_arg("group_block_service", args) +friend_req = plugin.on_request("好友申请", "好友申请检测") -@group_block_service.got("group_block_service", "阿...是哪个服务呢") -async def _deal_group_block_service( -    event: GroupMessageEvent, aim_service: str = ArgPlainText("group_block_service") -): -    group_id = str(event.group_id) -    quit_list = ["算了", "罢了"] -    if aim_service in quit_list: -        await group_block_service.finish("好吧...") +@friend_req.handle() +async def _(event: FriendRequestEvent): +    apply_code = event.flag +    user_id = event.get_user_id() +    apply_comment = event.comment +    now_time = str(datetime.now().timestamp()) -    is_ok = Manage().control_group_service(aim_service, group_id, False) -    if not is_ok: -        await group_block_service.finish("禁用失败...请检查服务名是否输入正确") -    await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}") +    data = await BotManager().load_friend_req() +    data[apply_code] = RequestInfo( +        user_id=user_id, +        comment=apply_comment, +        time=now_time, +    ) +    await BotManager().store_friend_req(data) + +    result = ( +        MessageBuilder("咱收到一条好友请求!") +        .text(f"请求人:{user_id}") +        .text(f"申请信息:{apply_comment}") +        .text(f"申请码:{apply_code}") +        .text("Tip:好友申请列表") +    ) +    await plugin.send_to_master(result) -group_unblock_service = plugin.on_command("启用", "针对所在群启用某服务", permission=ADMIN) +group_req = plugin.on_request("应邀入群", "应邀入群检测") -@group_unblock_service.handle() -async def _ready_group_unblock_service( -    matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg() -): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("group_unblock_service", args) +@group_req.handle() +async def _(event: GroupRequestEvent): +    if event.sub_type != "invite": +        return +    apply_code = event.flag +    target_group = event.group_id +    user_id = event.get_user_id() +    apply_comment = event.comment +    now_time = str(datetime.now().timestamp()) -@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") -async def _deal_group_unblock_service( -    event: GroupMessageEvent, aim_service: str = ArgPlainText("group_unblock_service") -): -    group_id = str(event.group_id) -    quit_list = ["算了", "罢了"] -    if aim_service in quit_list: -        await group_unblock_service.finish("好吧...") +    data = await BotManager().load_group_req() +    data[apply_code] = RequestInfo( +        user_id=user_id, +        comment=apply_comment + f"(目标群{target_group})", +        time=now_time, +    ) +    await BotManager().store_group_req(data) + +    result = ( +        MessageBuilder("咱收到一条应邀入群请求!") +        .text(f"申请人:{user_id}") +        .text(f"申请信息:{apply_comment}") +        .text(f"申请码:{apply_code}") +        .text(f"目标群:{target_group}") +        .text("Tip:群邀请列表") +    ) +    await plugin.send_to_master(result) -    is_ok = Manage().control_group_service(aim_service, group_id, True) -    if not is_ok: -        await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中") -    await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}") +get_friend_req_list = plugin.on_command("好友申请列表", "获取好友申请列表") -get_friend_add_list = plugin.on_command("好友申请列表", "获取好友申请列表") +@get_friend_req_list.handle() +async def _(): +    data = await BotManager().load_friend_req() +    if not data: +        await get_friend_req_list.finish("当前没有申请") -@get_friend_add_list.handle() -async def _get_friend_add_list(): -    data = Manage().load_friend_apply_list() -    temp_list = list() +    cache_list = list()      for i in data:          apply_code = i -        apply_user = data[i]["user_id"] -        apply_comment = data[i]["comment"] -        temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" -        temp_list.append(temp_msg) - -    msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) -    msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" -    await get_friend_add_list.finish(msg1) - - -approve_friend_add = plugin.on_command("同意好友", "同意好友申请") - - -@approve_friend_add.handle() -async def _ready_approve_friend_add( -    matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("approve_friend_add", args) - - -@approve_friend_add.got("approve_friend_add", "申请码GKD!") -async def _deal_approve_friend_add( -    bot: Bot, apply_code: str = ArgPlainText("approve_friend_add") -): -    quit_list = ["算了", "罢了"] -    if apply_code in quit_list: -        await approve_friend_add.finish("好吧...") - -    try: -        await bot.set_friend_add_request(flag=apply_code, approve=True) -    except Exception: -        await approve_friend_add.finish("同意失败...尝试下手动?") -    data = Manage().load_friend_apply_list() -    if apply_code not in data: -        await approve_friend_add.reject("申请码不存在...请检查是否输入正确") -    data.pop(apply_code) -    Manage().save_friend_apply_list(data) -    await approve_friend_add.finish("好欸!申请已通过!") - - -refuse_friend_add = plugin.on_command("拒绝好友", "拒绝好友申请") - - -@refuse_friend_add.handle() -async def _ready_refuse_friend_add( -    matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("refuse_friend_add", args) +        apply_user = data[i].user_id +        apply_comment = data[i].comment +        cache_list.append(f"{apply_user} | {apply_comment} | {apply_code}") + +    result = ( +        "申请人ID | 申请信息 | 申请码\n" +        + "\n".join(map(str, cache_list)) +        + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" +    ) +    await get_friend_req_list.finish(result) -@refuse_friend_add.got("refuse_friend_add", "申请码GKD!") -async def _deal_refuse_friend_add( -    bot: Bot, apply_code: str = ArgPlainText("refuse_friend_add") -): -    quit_list = ["算了", "罢了"] -    if apply_code in quit_list: -        await refuse_friend_add.finish("好吧...") - -    try: -        await bot.set_friend_add_request(flag=apply_code, approve=False) -    except Exception: -        await refuse_friend_add.finish("拒绝失败...尝试下手动?") -    data = Manage().load_friend_apply_list() -    data.pop(apply_code) -    Manage().save_friend_apply_list(data) -    await refuse_friend_add.finish("已拒绝!") +get_group_req_list = plugin.on_command("群邀请列表", "获取群邀请列表") -get_group_invite_list = plugin.on_command("应邀入群列表", "获取群邀请列表") +@get_group_req_list.handle() +async def _(): +    data = await BotManager().load_group_req() +    if not data: +        await get_group_req_list.finish("当前没有申请") - -@get_group_invite_list.handle() -async def _get_group_invite_list(): -    data = Manage().load_invite_apply_list() -    temp_list = list() +    cache_list = list()      for i in data:          apply_code = i -        apply_user = data[i]["user_id"] -        apply_comment = data[i]["comment"] -        temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" -        temp_list.append(temp_msg) - -    msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) -    msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" -    await get_friend_add_list.finish(msg1) - - -approve_group_invite = plugin.on_command("同意邀请", "同意群聊邀请") - - -@approve_group_invite.handle() -async def _ready_approve_group_invite( -    matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("approve_group_invite", args) - - -@approve_group_invite.got("approve_group_invite", "申请码GKD!") -async def _deal_approve_group_invite( -    bot: Bot, apply_code: str = ArgPlainText("approve_group_invite") -): -    quit_list = ["算了", "罢了"] -    if apply_code in quit_list: -        await approve_group_invite.finish("好吧...") - -    try: -        await bot.set_group_add_request( -            flag=apply_code, sub_type="invite", approve=True -        ) -    except Exception: -        await approve_group_invite.finish("同意失败...尝试下手动?") -    data = Manage().load_invite_apply_list() -    data.pop(apply_code) -    Manage().save_invite_apply_list(data) -    await approve_group_invite.finish("好欸!申请已通过!") - - -refuse_group_invite = plugin.on_command("拒绝邀请", "拒绝群聊邀请") - - -@refuse_group_invite.handle() -async def _ready_refuse_group_invite( -    matcher: Matcher, event: MessageEvent, args: Message = CommandArg() -): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("refuse_group_invite", args) - - -@refuse_group_invite.got("refuse_group_invite", "申请码GKD!") -async def _deal_refuse_group_invite( -    bot: Bot, apply_code: str = ArgPlainText("refuse_group_invite") -): -    quit_list = ["算了", "罢了"] -    if apply_code in quit_list: -        await refuse_group_invite.finish("好吧...") - -    try: -        await bot.set_group_add_request( -            flag=apply_code, sub_type="invite", approve=False -        ) -    except Exception: -        await refuse_group_invite.finish("拒绝失败...(可能是小群免验证)尝试下手动?") -    data = Manage().load_invite_apply_list() -    data.pop(apply_code) -    Manage().save_invite_apply_list(data) -    await refuse_group_invite.finish("已拒绝!") - - -track_error = plugin.on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) - - -@track_error.handle() -async def _track_error(matcher: Matcher, args: Message = CommandArg()): -    msg = args.extract_plain_text() -    if msg: -        matcher.set_arg("track_code", args) - - -@track_error.got("track_code", "报错码 速速") -async def _(track_code: str = ArgPlainText("track_code")): -    quit_list = ["算了", "罢了"] -    if track_code in quit_list: -        await track_error.finish("好吧...") - -    repo = await Manage().track_error(track_code) -    await track_error.finish(repo) +        apply_user = data[i].user_id +        apply_comment = data[i].comment +        cache_list.append(f"{apply_user} | {apply_comment} | {apply_code}") + +    result = ( +        "申请人ID | 申请信息 | 申请码\n" +        + "\n".join(map(str, cache_list)) +        + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" +    ) +    await get_group_req_list.finish(result)  recall_msg = plugin.on_command("撤回", "撤回bot已发送的信息", to_bot()) @@ -506,7 +313,7 @@ upgrade_nonebot_plugin = plugin.on_command(  @upgrade_nonebot_plugin.handle()  async def _(event: MessageEvent): -    result = NonebotPluginManager.upgrade_plugin() +    result = NonebotPluginManager().upgrade_plugin()      if not result:          await upgrade_nonebot_plugin.finish("当前没有插件可更新...") @@ -521,7 +328,7 @@ from .listener import init_listener  driver().on_startup(init_listener)  driver().on_startup(NonebotPluginManager().get_store_list) -driver().on_startup(NonebotPluginManager.load_plugin) +driver().on_startup(NonebotPluginManager().load_plugin)  scheduler.scheduled_job(      "interval",      name="Nonebot 商店刷新", diff --git a/ATRI/plugins/manage/data_source.py b/ATRI/plugins/manage/data_source.py index ff344e7..37edefc 100644 --- a/ATRI/plugins/manage/data_source.py +++ b/ATRI/plugins/manage/data_source.py @@ -1,273 +1,224 @@ -import json +from typing import Dict  from pathlib import Path  from datetime import datetime +from nonebot import get_bot +from nonebot.adapters import Bot +from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent + +from ATRI.utils import FileDealer  from ATRI.service import ServiceTools  from ATRI.message import MessageBuilder  from ATRI.exceptions import load_error +from .models import RequestInfo + -MANAGE_DIR = Path(".") / "data" / "plugins" / "manege" -ESSENTIAL_DIR = Path(".") / "data" / "plugins" / "essential" +MANAGE_DIR = Path(".") / "data" / "plugins" / "manage"  MANAGE_DIR.mkdir(parents=True, exist_ok=True) -ESSENTIAL_DIR.mkdir(parents=True, exist_ok=True) -_TRACK_BACK_FORMAT = ( -    MessageBuilder("Track ID: {track_id}") -    .text("Prompt: {prompt}") -    .text("Time: {time}") +_TRACEBACK_FORMAT = ( +    MessageBuilder("追踪ID:{trace_id}") +    .text("关键词:{prompt}") +    .text("时间:{time}")      .text("{content}")      .done()  ) -class Manage: -    @staticmethod -    def _load_block_user_list() -> dict: -        """ -        文件结构: -        { -            "Block user ID": { -                "time": "Block time" -            } -        } -        """ -        file_name = "block_user.json" +class BotManager: +    async def __load_data(self, file_name: str) -> dict:          path = MANAGE_DIR / file_name +        dealer = FileDealer(path)          if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) -            return dict() +            await dealer.write_json(dict()) +          try: -            data = json.loads(path.read_bytes()) +            data = dealer.json()          except Exception:              data = dict()          return data -    @staticmethod -    def _save_block_user_list(data: dict) -> None: -        file_name = "block_user.json" +    async def __store_data(self, file_name: str, data: dict) -> None:          path = MANAGE_DIR / file_name +        dealer = FileDealer(path)          if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) - -        with open(path, "w", encoding="utf-8") as w: -            w.write(json.dumps(data, indent=4)) - -    @staticmethod -    def _load_block_group_list() -> dict: -        """ -        文件结构: -        { -            "Block group ID": { -                "time": "Block time" -            } -        } -        """ -        file_name = "block_group.json" -        path = MANAGE_DIR / file_name -        if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) -            return dict() +            await dealer.write_json(dict()) -        try: -            data = json.loads(path.read_bytes()) -        except Exception: -            data = dict() -        return data +        await dealer.write_json(data) -    @staticmethod -    def _save_block_group_list(data: dict) -> None: -        file_name = "block_group.json" -        path = MANAGE_DIR / file_name -        if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) +    async def __load_block_group(self) -> dict: +        return await self.__load_data("block_group.json") -        with open(path, "w", encoding="utf-8") as w: -            w.write(json.dumps(data, indent=4)) +    async def __store_block_group(self, data: dict) -> None: +        await self.__store_data("block_group.json", data) -    @classmethod -    def block_user(cls, user_id: str) -> bool: -        data = cls._load_block_user_list() -        now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") -        data[user_id] = {"time": now_time} -        try: -            cls._save_block_user_list(data) -            return True -        except Exception: -            return False +    async def __load_block_user(self) -> dict: +        return await self.__load_data("block_user.json") -    @classmethod -    def unblock_user(cls, user_id: str) -> bool: -        data: dict = cls._load_block_user_list() -        if user_id not in data: -            return False +    async def __store_block_user(self, data: dict) -> None: +        await self.__store_data("block_user.json", data) -        try: -            data.pop(user_id) -            cls._save_block_user_list(data) -            return True -        except Exception: -            return False +    async def load_friend_req(self) -> Dict[str, RequestInfo]: +        return await self.__load_data("friend_add.json") + +    async def store_friend_req(self, data: dict) -> None: +        await self.__store_data("friend_add.json", data) + +    async def load_group_req(self) -> Dict[str, RequestInfo]: +        return await self.__load_data("group_invite.json") + +    async def store_group_req(self, data: dict) -> None: +        await self.__store_data("group_invite.json", data) -    @classmethod -    def block_group(cls, group_id: str) -> bool: -        data = cls._load_block_group_list() -        now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") -        data[group_id] = {"time": now_time} +    async def block_group(self, group_id: str) -> None: +        data = await self.__load_block_group() +        data[group_id] = {"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}          try: -            cls._save_block_group_list(data) -            return True +            await self.__store_block_group(data)          except Exception: -            return False +            raise Exception("写入文件时失败") -    @classmethod -    def unblock_group(cls, group_id: str) -> bool: -        data: dict = cls._load_block_group_list() +    async def unblock_group(self, group_id: str) -> None: +        data = await self.__load_block_group()          if group_id not in data: -            return False +            raise Exception("群不存在于封禁名单")          try:              data.pop(group_id) -            cls._save_block_group_list(data) -            return True +            await self.__store_block_group(data)          except Exception: -            return False +            raise Exception("写入文件时失败") -    @staticmethod -    def control_global_service(service: str, is_enabled: bool) -> bool: -        """ -        Only SUPERUSER. -        """ +    async def block_user(self, user_id: str) -> None: +        data = await self.__load_block_user() +        data[user_id] = {"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}          try: -            data = ServiceTools(service).load_service() +            await self.__store_block_user(data)          except Exception: -            return False -        data.enabled = is_enabled -        ServiceTools(service).save_service(data.dict()) -        return True - -    @staticmethod -    def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool: -        """ -        Only SUPERUSER. -        """ -        try: -            data = ServiceTools(service).load_service() -        except Exception: -            return False -        temp_list: list = data.disable_user - -        if is_enabled: -            try: -                temp_list.remove(user_id) -            except Exception: -                return False -        else: -            if user_id in temp_list: -                return True - -            temp_list.append(user_id) +            raise Exception("写入文件时失败") -        data.disable_user = temp_list -        ServiceTools(service).save_service(data.dict()) -        return True +    async def unblock_user(self, user_id: str) -> None: +        data = await self.__load_block_user() +        if user_id not in data: +            raise Exception("用户不存在于封禁名单") -    @staticmethod -    def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool: -        """ -        SUPERUSER and GROUPADMIN or GROUPOWNER. -        Only current group. -        """          try: -            data = ServiceTools(service).load_service() +            data.pop(user_id) +            await self.__store_block_user(data)          except Exception: -            return False -        temp_list: list = data.disable_group +            raise Exception("写入文件时失败") -        if is_enabled: +    def toggle_global_service(self, service: str) -> bool: +        serv = ServiceTools(service) +        try: +            data = serv.load_service() +        except Exception as e: +            error_msg = str(e) +            raise Exception(error_msg) + +        data.enabled = not data.enabled +        serv.save_service(data) +        return data.enabled + +    def toggle_group_service(self, service: str, event) -> bool: +        if isinstance(event, GroupMessageEvent): +            group_id = str(event.group_id) +            serv = ServiceTools(service)              try: -                temp_list.remove(group_id) -            except Exception: -                return False +                data = serv.load_service() +            except Exception as e: +                error_msg = str(e) +                raise Exception(error_msg) + +            if group_id in data.disable_group: +                data.disable_group.remove(group_id) +                result = True +            else: +                data.disable_group.append(group_id) +                result = False +            serv.save_service(data) +            return result +        raise Exception("该功能只能在群聊中使用") + +    def toggle_user_service(self, service: str, event: MessageEvent) -> bool: +        user_id = event.get_user_id() +        serv = ServiceTools(service) +        try: +            data = serv.load_service() +        except Exception as e: +            error_msg = str(e) +            raise Exception(error_msg) + +        if user_id in data.disable_user: +            data.disable_user.remove(user_id) +            result = True          else: -            if group_id in temp_list: -                return True - -            temp_list.append(group_id) - -        data.disable_group = temp_list -        ServiceTools(service).save_service(data.dict()) -        return True - -    @staticmethod -    def load_friend_apply_list() -> dict: -        file_name = "friend_add.json" -        path = ESSENTIAL_DIR / file_name -        if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) -            return dict() +            data.disable_user.append(user_id) +            result = False +        serv.save_service(data) +        return result +    async def track_error(self, trace_id: str) -> str:          try: -            data = json.loads(path.read_bytes()) +            data = load_error(trace_id)          except Exception: -            data = dict() -        return data - -    @staticmethod -    def save_friend_apply_list(data: dict) -> None: -        file_name = "friend_add.json" -        path = ESSENTIAL_DIR / file_name -        if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) - -        with open(path, "w", encoding="utf-8") as w: -            w.write(json.dumps(data, indent=4)) +            raise Exception("未找到对应ID的信息") -    @staticmethod -    def load_invite_apply_list() -> dict: -        file_name = "group_invite.json" -        path = ESSENTIAL_DIR / file_name -        if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) -            return dict() +        return _TRACEBACK_FORMAT.format( +            trace_id=data.track_id, +            prompt=data.prompt, +            time=data.time, +            content=data.content, +        ) +    def __get_bot(self) -> Bot:          try: -            data = json.loads(path.read_bytes()) +            return get_bot()          except Exception: -            data = dict() -        return data +            raise Exception("无法获取 bot 实例") -    @staticmethod -    def save_invite_apply_list(data: dict) -> None: -        file_name = "group_invite.json" -        path = ESSENTIAL_DIR / file_name -        if not path.is_file(): -            with open(path, "w", encoding="utf-8") as w: -                w.write(json.dumps({})) - -        with open(path, "w", encoding="utf-8") as w: -            w.write(json.dumps(data, indent=4)) +    async def apply_friend_req(self, code: str) -> None: +        bot = self.__get_bot() +        try: +            await bot.call_api("set_friend_add_request", flag=code, approve=True) +        except Exception: +            raise Exception("同意失败,请尝试手动同意") +        data = await self.load_friend_req() +        data.pop(code) +        await self.store_friend_req(data) -    @staticmethod -    async def track_error(track_id: str) -> str: +    async def reject_friend_req(self, code: str) -> None: +        bot = self.__get_bot()          try: -            data = load_error(track_id) +            await bot.call_api("set_friend_add_request", flag=code, approve=False)          except Exception: -            return "请检查ID是否正确..." +            raise Exception("拒绝失败,请尝试手动拒绝") +        data = await self.load_friend_req() +        data.pop(code) +        await self.store_friend_req(data) -        prompt = data.get("prompt", "ignore") -        time = data.get("time", "ignore") -        content = data.get("content", "ignore") +    async def apply_group_req(self, code: str) -> None: +        bot = self.__get_bot() +        try: +            await bot.call_api( +                "set_group_add_request", flag=code, sub_type="invite", approve=True +            ) +        except Exception: +            raise Exception("同意失败,请尝试手动同意") +        data = await self.load_group_req() +        data.pop(code) +        await self.store_group_req(data) -        repo = _TRACK_BACK_FORMAT.format( -            track_id=track_id, prompt=prompt, time=time, content=content -        ) -        return repo +    async def reject_group_req(self, code: str) -> None: +        bot = self.__get_bot() +        try: +            await bot.call_api( +                "set_group_add_request", flag=code, sub_type="invite", approve=False +            ) +        except Exception: +            raise Exception("拒绝失败,请尝试手动拒绝") +        data = await self.load_group_req() +        data.pop(code) +        await self.store_group_req(data) diff --git a/ATRI/plugins/manage/models.py b/ATRI/plugins/manage/models.py index 90f8059..1ad8eaf 100644 --- a/ATRI/plugins/manage/models.py +++ b/ATRI/plugins/manage/models.py @@ -1,6 +1,12 @@  from pydantic import BaseModel +class RequestInfo(BaseModel): +    user_id: str +    comment: str +    time: str + +  class NonebotPluginInfo(BaseModel):      module_name: str      project_link: str diff --git a/ATRI/plugins/manage/plugin.py b/ATRI/plugins/manage/plugin.py index d46d1ef..f7d718b 100644 --- a/ATRI/plugins/manage/plugin.py +++ b/ATRI/plugins/manage/plugin.py @@ -33,7 +33,7 @@ class NonebotPluginManager:          return json.loads(self._conf_path.read_bytes()) -    def revise_list(self, is_del: bool): +    def revise_list(self, is_del: bool) -> None:          data = self.get_list()          if is_del:              if self._plugin_name in data: @@ -49,7 +49,7 @@ class NonebotPluginManager:          self._plugin_name = plugin_name          return self -    async def get_store_list(self): +    async def get_store_list(self) -> None:          global _plugin_list          if not _plugin_list: @@ -105,9 +105,8 @@ class NonebotPluginManager:              return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json"          return "完成~! 将在下次重启生效" -    @classmethod -    def upgrade_plugin(cls) -> list: -        if not (plugin_list := cls.get_list(cls)): +    def upgrade_plugin(self) -> list: +        if not (plugin_list := self.get_list()):              return list()          succ_list = list() @@ -121,8 +120,7 @@ class NonebotPluginManager:          return succ_list -    @classmethod -    def load_plugin(cls): -        plugin_list = cls.get_list(cls) +    def load_plugin(self) -> None: +        plugin_list = self.get_list()          for plugin in plugin_list:              nonebot.load_plugin(plugin) | 
