From 24644daeb153e781d162c638a79b3727df242802 Mon Sep 17 00:00:00 2001 From: Kyomotoi Date: Fri, 13 Aug 2021 17:52:10 +0800 Subject: =?UTF-8?q?=E2=9C=8F=EF=B8=8F=EF=BC=9ATypo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ATRI/plugins/manage/__init__.py | 362 ++++++++++++++++++++++++++++++++ ATRI/plugins/manage/data_source.py | 283 +++++++++++++++++++++++++ ATRI/plugins/manege/__init__.py | 408 ------------------------------------- ATRI/plugins/manege/data_source.py | 274 ------------------------- 4 files changed, 645 insertions(+), 682 deletions(-) create mode 100644 ATRI/plugins/manage/__init__.py create mode 100644 ATRI/plugins/manage/data_source.py delete mode 100644 ATRI/plugins/manege/__init__.py delete mode 100644 ATRI/plugins/manege/data_source.py diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py new file mode 100644 index 0000000..c1dfb3b --- /dev/null +++ b/ATRI/plugins/manage/__init__.py @@ -0,0 +1,362 @@ +import re + +from nonebot.typing import T_State +from nonebot.permission import SUPERUSER +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent +from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN + +from .data_source import Manage + + +block_user = Manage().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER) + +@block_user.handle() +async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["block_user"] = msg + +@block_user.got("block_user", "哪位?GKD!") +async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State): + user_id = state["block_user"] + quit_list = ["算了", "罢了"] + if user_id in quit_list: + await block_user.finish("...看来有人逃过一劫呢") + + is_ok = Manage().block_user(user_id) + if not is_ok: + await block_user.finish("kuso!封禁失败了...") + + await block_user.finish(f"用户 {user_id} 危!") + + +unblock_user = Manage().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER) + +@unblock_user.handle() +async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["unblock_user"] = msg + +@unblock_user.got("unblock_user", "哪位?GKD!") +async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + user_id = state["unblock_user"] + quit_list = ["算了", "罢了"] + if user_id in quit_list: + await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") + + is_ok = Manage().unblock_user(user_id) + if not is_ok: + await unblock_user.finish("kuso!解封失败了...") + + await unblock_user.finish(f"好欸!{user_id} 重获新生!") + + +block_group = Manage().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER) + +@block_group.handle() +async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["block_group"] = msg + +@block_group.got("block_group", "哪个群?GKD!") +async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State): + group_id = state["block_group"] + quit_list = ["算了", "罢了"] + if group_id in quit_list: + await block_group.finish("...看来有一群逃过一劫呢") + + is_ok = Manage().block_group(group_id) + if not is_ok: + await block_group.finish("kuso!封禁失败了...") + + await block_group.finish(f"群 {group_id} 危!") + + +unblock_group = Manage().on_command("解封群", "对目标群进行解封", permission=SUPERUSER) + +@unblock_group.handle() +async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["unblock_group"] = msg + +@unblock_group.got("unblock_group", "哪个群?GKD!") +async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + group_id = state["unblock_group"] + quit_list = ["算了", "罢了"] + if group_id in quit_list: + await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") + + is_ok = Manage().unblock_group(group_id) + if not is_ok: + await unblock_group.finish("kuso!解封失败了...") + + await unblock_group.finish(f"好欸!群 {group_id} 重获新生!") + + +global_block_service = Manage().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER) + +@global_block_service.handle() +async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["global_block_service"] = msg + +@global_block_service.got("global_block_service", "阿...是哪个服务呢") +async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State): + block_service = state["global_block_service"] + quit_list = ["算了", "罢了"] + if block_service in quit_list: + await global_block_service.finish("好吧...") + + is_ok = Manage().control_global_service(block_service, False) + if not is_ok: + await global_block_service.finish("kuso!禁用失败了...") + + await global_block_service.finish(f"服务 {block_service} 已被禁用") + + +global_unblock_service = Manage().on_command("全局启用", "全局启用某服务", permission=SUPERUSER) + +@global_unblock_service.handle() +async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["global_unblock_service"] = msg + +@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") +async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + unblock_service = state["global_unblock_service"] + quit_list = ["算了", "罢了"] + if unblock_service in quit_list: + await global_unblock_service.finish("好吧...") + + is_ok = Manage().control_global_service(unblock_service, True) + if not is_ok: + await global_unblock_service.finish("kuso!启用服务失败了...") + + await global_unblock_service.finish(f"服务 {unblock_service} 已启用") + + +user_block_service = Manage().on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER) + +@user_block_service.handle() +async def _user_block_service(bot: Bot, event: MessageEvent): + msg = str(event.message).strip() + pattern = r"对用户(.*?)禁用(.*)" + reg = re.findall(pattern, msg) + aim_user = reg[0] + aim_service = reg[1] + + is_ok = Manage().control_user_service(aim_service, aim_user, False) + if not is_ok: + await user_block_service.finish("禁用失败...请检查服务名是否正确") + await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}") + + + +user_unblock_service = Manage().on_regex(r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER) + +@user_unblock_service.handle() +async def _user_unblock_service(bot: Bot, event: MessageEvent): + msg = str(event.message).strip() + pattern = r"对用户(.*?)启用(.*)" + reg = re.findall(pattern, msg) + aim_user = reg[0] + aim_service = reg[1] + + is_ok = Manage().control_user_service(aim_service, aim_user, True) + if not is_ok: + await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中") + await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}") + + +group_block_service = Manage().on_command("禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) + +@group_block_service.handle() +async def _ready_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["group_block_service"] = msg + +@group_block_service.got("group_block_service", "阿...是哪个服务呢") +async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + aim_service = state["group_block_service"] + group_id = str(event.group_id) + quit_list = ["算了", "罢了"] + if aim_service in quit_list: + await group_block_service.finish("好吧...") + + is_ok = Manage().control_group_service(aim_service, group_id, False) + if not is_ok: + await group_block_service.finish("禁用失败...请检查服务名是否输入正确") + await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}") + + +group_unblock_service = Manage().on_command("启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) + +@group_unblock_service.handle() +async def _ready_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["group_unblock_service"] = msg + +@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") +async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + aim_service = state["group_unblock_service"] + group_id = str(event.group_id) + quit_list = ["算了", "罢了"] + if aim_service in quit_list: + await group_unblock_service.finish("好吧...") + + is_ok = Manage().control_group_service(aim_service, group_id, True) + if not is_ok: + await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中") + await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}") + + +get_friend_add_list = Manage().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER) + +@get_friend_add_list.handle() +async def _get_friend_add_list(bot: Bot, event: MessageEvent): + data = Manage().load_friend_apply_list() + temp_list = list() + for i in data: + apply_code = i + apply_user = data[i]["user_id"] + apply_comment = data[i]["comment"] + temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" + temp_list.append(temp_msg) + + msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) + msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" + await get_friend_add_list.finish(msg1) + + +approve_friend_add = Manage().on_command("同意好友", "同意好友申请", permission=SUPERUSER) + +@approve_friend_add.handle() +async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["approve_friend_add"] + +@approve_friend_add.got("approve_friend_add", "申请码GKD!") +async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["approve_friend_add"] + quit_list = ["算了", "罢了"] + if apply_code in quit_list: + await approve_friend_add.finish("好吧...") + + try: + await bot.set_friend_add_request(flag=apply_code, approve=True) + except BaseException: + await approve_friend_add.finish("同意失败...尝试下手动?") + data = Manage().load_friend_apply_list() + data.pop(apply_code) + Manage().save_friend_apply_list(data) + await approve_friend_add.finish("好欸!申请已通过!") + + +refuse_friend_add = Manage().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER) + +@refuse_friend_add.handle() +async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["refuse_friend_add"] + +@refuse_friend_add.got("refuse_friend_add", "申请码GKD!") +async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["refuse_friend_add"] + quit_list = ["算了", "罢了"] + if apply_code in quit_list: + await refuse_friend_add.finish("好吧...") + + try: + await bot.set_friend_add_request(flag=apply_code, approve=False) + except BaseException: + await refuse_friend_add.finish("拒绝失败...尝试下手动?") + data = Manage().load_friend_apply_list() + data.pop(apply_code) + Manage().save_friend_apply_list(data) + await refuse_friend_add.finish("已拒绝!") + + +get_group_invite_list = Manage().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER) + +@get_group_invite_list.handle() +async def _get_group_invite_list(bot: Bot, event: MessageEvent): + data = Manage().load_invite_apply_list() + temp_list = list() + for i in data: + apply_code = i + apply_user = data[i]["user_id"] + apply_comment = data[i]["comment"] + temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" + temp_list.append(temp_msg) + + msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) + msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" + await get_friend_add_list.finish(msg1) + + +approve_group_invite = Manage().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER) + +@approve_group_invite.handle() +async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["approve_group_invite"] + +@approve_group_invite.got("approve_group_invite", "申请码GKD!") +async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["approve_group_invite"] + quit_list = ["算了", "罢了"] + if apply_code in quit_list: + await approve_group_invite.finish("好吧...") + + try: + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=True) + except BaseException: + await approve_group_invite.finish("同意失败...尝试下手动?") + data = Manage().load_invite_apply_list() + data.pop(apply_code) + Manage().save_invite_apply_list(data) + await approve_group_invite.finish("好欸!申请已通过!") + + +refuse_group_invite = Manage().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER) + +@refuse_group_invite.handle() +async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["refuse_group_invite"] + +@refuse_group_invite.got("refuse_group_invite", "申请码GKD!") +async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["refuse_group_invite"] + quit_list = ["算了", "罢了"] + if apply_code in quit_list: + await refuse_group_invite.finish("好吧...") + + try: + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=False) + except BaseException: + await refuse_group_invite.finish("拒绝失败...尝试下手动?") + data = Manage().load_invite_apply_list() + data.pop(apply_code) + Manage().save_invite_apply_list(data) + await refuse_group_invite.finish("已拒绝!") + + +track_error = Manage().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) + +@track_error.handle() +async def _track_error(bot: Bot, event: MessageEvent): + track_id = str(event.message).strip() + repo = await Manage().track_error(track_id) + await track_error.finish(repo) diff --git a/ATRI/plugins/manage/data_source.py b/ATRI/plugins/manage/data_source.py new file mode 100644 index 0000000..74d81d4 --- /dev/null +++ b/ATRI/plugins/manage/data_source.py @@ -0,0 +1,283 @@ +import os +import json +from pathlib import Path +from datetime import datetime + +from ATRI.service import Service, ServiceTools +from ATRI.utils import UbuntuPaste +from ATRI.exceptions import ReadFileError, load_error + + +MANAGE_DIR = Path(".") / "data" / "database" / "manege" +ESSENTIAL_DIR = Path(".") / "data" / "database" / "essential" +os.makedirs(MANAGE_DIR, exist_ok=True) +os.makedirs(ESSENTIAL_DIR, exist_ok=True) + + +TRACK_BACK_FORMAT = """ +Track ID:{track_id} +Prompt: {prompt} +Time: {time} +{content} +""".strip() + + +__doc__ = """ +控制bot的各项服务 +""" + + +class Manage(Service): + + def __init__(self): + Service.__init__(self, "管理", __doc__, True) + + @staticmethod + def _load_block_user_list() -> dict: + """ + 文件结构: + { + "Block user ID": { + "time": "Block time" + } + } + """ + file_name = "block_user.json" + path = MANAGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + try: + data = json.loads(path.read_bytes()) + except BaseException: + data = dict() + return data + + + @staticmethod + def _save_block_user_list(data: dict) -> None: + file_name = "block_user.json" + path = MANAGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + def _load_block_group_list() -> dict: + """ + 文件结构: + { + "Block group ID": { + "time": "Block time" + } + } + """ + file_name = "block_group.json" + path = MANAGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + + try: + data = json.loads(path.read_bytes()) + except BaseException: + data = dict() + return data + + @staticmethod + def _save_block_group_list(data: dict) -> None: + file_name = "block_group.json" + path = MANAGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @classmethod + def block_user(cls, user_id: str) -> bool: + data = cls._load_block_user_list() + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + data[user_id] = { + "time": now_time + } + try: + cls._save_block_user_list(data) + return True + except BaseException: + return False + + @classmethod + def unblock_user(cls, user_id: str) -> bool: + data: dict = cls._load_block_user_list() + if user_id not in data: + return False + + try: + data.pop(user_id) + cls._save_block_user_list(data) + return True + except BaseException: + return False + + @classmethod + def block_group(cls, group_id: str) -> bool: + data = cls._load_block_group_list() + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + data[group_id] = { + "time": now_time + } + try: + cls._save_block_group_list(data) + return True + except BaseException: + return False + + @classmethod + def unblock_group(cls, group_id: str) -> bool: + data: dict = cls._load_block_group_list() + if group_id not in data: + return False + + try: + data.pop(group_id) + cls._save_block_group_list(data) + return True + except BaseException: + return False + + @staticmethod + def control_global_service(service: str, is_enabled: bool) -> bool: + """ + Only SUPERUSER. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + data["enabled"] = is_enabled + ServiceTools().save_service(data, service) + return True + + @staticmethod + def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool: + """ + Only SUPERUSER. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + temp_list: list = data.get("disable_user", list()) + + if is_enabled: + try: + temp_list.remove(user_id) + except BaseException: + return False + else: + temp_list.append(user_id) + data["disable_user"] = temp_list + ServiceTools().save_service(data, service) + return True + + @staticmethod + def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool: + """ + SUPERUSER and GROUPADMIN or GROUPOWNER. + Only current group. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + temp_list: list = data.get("disable_group", list()) + + if is_enabled: + try: + temp_list.remove(group_id) + except BaseException: + return False + else: + temp_list.append(group_id) + data["disable_group"] = temp_list + ServiceTools().save_service(data, service) + return True + + @staticmethod + def load_friend_apply_list() -> dict: + file_name = "friend_add.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + + try: + data = json.loads(path.read_bytes()) + except BaseException: + data = dict() + return data + + @staticmethod + def save_friend_apply_list(data: dict) -> None: + file_name = "friend_add.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + def load_invite_apply_list() -> dict: + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + + try: + data = json.loads(path.read_bytes()) + except BaseException: + data = dict() + return data + + @staticmethod + def save_invite_apply_list(data: dict) -> None: + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + async def track_error(track_id: str) -> str: + try: + data = load_error(track_id) + except ReadFileError: + return "请检查ID是否正确..." + + prompt = data.get("prompt", "ignore") + time = data.get("time", "ignore") + content = data.get("content", "ignore") + + msg0 = TRACK_BACK_FORMAT.format( + track_id=track_id, + prompt=prompt, + time=time, + content=content + ) + repo = f"详细请移步此处~\n{await UbuntuPaste(content=msg0).paste()}" + return repo diff --git a/ATRI/plugins/manege/__init__.py b/ATRI/plugins/manege/__init__.py deleted file mode 100644 index 7a1d45c..0000000 --- a/ATRI/plugins/manege/__init__.py +++ /dev/null @@ -1,408 +0,0 @@ -import re - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent -from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN - -from .data_source import Manege - - -block_user = Manege().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER) - - -@block_user.handle() -async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["block_user"] = msg - - -@block_user.got("block_user", "哪位?GKD!") -async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State): - user_id = state["block_user"] - quit_list = ["算了", "罢了"] - if user_id in quit_list: - await block_user.finish("...看来有人逃过一劫呢") - - is_ok = Manege().block_user(user_id) - if not is_ok: - await block_user.finish("kuso!封禁失败了...") - - await block_user.finish(f"用户 {user_id} 危!") - - -unblock_user = Manege().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER) - - -@unblock_user.handle() -async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["unblock_user"] = msg - - -@unblock_user.got("unblock_user", "哪位?GKD!") -async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State): - user_id = state["unblock_user"] - quit_list = ["算了", "罢了"] - if user_id in quit_list: - await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") - - is_ok = Manege().unblock_user(user_id) - if not is_ok: - await unblock_user.finish("kuso!解封失败了...") - - await unblock_user.finish(f"好欸!{user_id} 重获新生!") - - -block_group = Manege().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER) - - -@block_group.handle() -async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["block_group"] = msg - - -@block_group.got("block_group", "哪个群?GKD!") -async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State): - group_id = state["block_group"] - quit_list = ["算了", "罢了"] - if group_id in quit_list: - await block_group.finish("...看来有一群逃过一劫呢") - - is_ok = Manege().block_group(group_id) - if not is_ok: - await block_group.finish("kuso!封禁失败了...") - - await block_group.finish(f"群 {group_id} 危!") - - -unblock_group = Manege().on_command("解封群", "对目标群进行解封", permission=SUPERUSER) - - -@unblock_group.handle() -async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["unblock_group"] = msg - - -@unblock_group.got("unblock_group", "哪个群?GKD!") -async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State): - group_id = state["unblock_group"] - quit_list = ["算了", "罢了"] - if group_id in quit_list: - await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") - - is_ok = Manege().unblock_group(group_id) - if not is_ok: - await unblock_group.finish("kuso!解封失败了...") - - await unblock_group.finish(f"好欸!群 {group_id} 重获新生!") - - -global_block_service = Manege().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER) - - -@global_block_service.handle() -async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["global_block_service"] = msg - - -@global_block_service.got("global_block_service", "阿...是哪个服务呢") -async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State): - block_service = state["global_block_service"] - quit_list = ["算了", "罢了"] - if block_service in quit_list: - await global_block_service.finish("好吧...") - - is_ok = Manege().control_global_service(block_service, False) - if not is_ok: - await global_block_service.finish("kuso!禁用失败了...") - - await global_block_service.finish(f"服务 {block_service} 已被禁用") - - -global_unblock_service = Manege().on_command("全局启用", "全局启用某服务", permission=SUPERUSER) - - -@global_unblock_service.handle() -async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["global_unblock_service"] = msg - - -@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") -async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): - unblock_service = state["global_unblock_service"] - quit_list = ["算了", "罢了"] - if unblock_service in quit_list: - await global_unblock_service.finish("好吧...") - - is_ok = Manege().control_global_service(unblock_service, True) - if not is_ok: - await global_unblock_service.finish("kuso!启用服务失败了...") - - await global_unblock_service.finish(f"服务 {unblock_service} 已启用") - - -user_block_service = Manege().on_regex( - r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER -) - - -@user_block_service.handle() -async def _user_block_service(bot: Bot, event: MessageEvent): - msg = str(event.message).strip() - pattern = r"对用户(.*?)禁用(.*)" - reg = re.findall(pattern, msg) - aim_user = reg[0] - aim_service = reg[1] - - is_ok = Manege().control_user_service(aim_service, aim_user, False) - if not is_ok: - await user_block_service.finish("禁用失败...请检查服务名是否正确") - await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}") - - -user_unblock_service = Manege().on_regex( - r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER -) - - -@user_unblock_service.handle() -async def _user_unblock_service(bot: Bot, event: MessageEvent): - msg = str(event.message).strip() - pattern = r"对用户(.*?)启用(.*)" - reg = re.findall(pattern, msg) - aim_user = reg[0] - aim_service = reg[1] - - is_ok = Manege().control_user_service(aim_service, aim_user, True) - if not is_ok: - await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中") - await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}") - - -group_block_service = Manege().on_command( - "禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN -) - - -@group_block_service.handle() -async def _ready_group_block_service( - bot: Bot, event: GroupMessageEvent, state: T_State -): - msg = str(event.message).strip() - if msg: - state["group_block_service"] = msg - - -@group_block_service.got("group_block_service", "阿...是哪个服务呢") -async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): - aim_service = state["group_block_service"] - group_id = str(event.group_id) - quit_list = ["算了", "罢了"] - if aim_service in quit_list: - await group_block_service.finish("好吧...") - - is_ok = Manege().control_group_service(aim_service, group_id, False) - if not is_ok: - await group_block_service.finish("禁用失败...请检查服务名是否输入正确") - await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}") - - -group_unblock_service = Manege().on_command( - "启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN -) - - -@group_unblock_service.handle() -async def _ready_group_unblock_service( - bot: Bot, event: GroupMessageEvent, state: T_State -): - msg = str(event.message).strip() - if msg: - state["group_unblock_service"] = msg - - -@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") -async def _deal_group_unblock_service( - bot: Bot, event: GroupMessageEvent, state: T_State -): - aim_service = state["group_unblock_service"] - group_id = str(event.group_id) - quit_list = ["算了", "罢了"] - if aim_service in quit_list: - await group_unblock_service.finish("好吧...") - - is_ok = Manege().control_group_service(aim_service, group_id, True) - if not is_ok: - await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中") - await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}") - - -get_friend_add_list = Manege().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER) - - -@get_friend_add_list.handle() -async def _get_friend_add_list(bot: Bot, event: MessageEvent): - data = Manege().load_friend_apply_list() - temp_list = list() - for i in data: - apply_code = i - apply_user = data[i]["user_id"] - apply_comment = data[i]["comment"] - temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" - temp_list.append(temp_msg) - - msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) - msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" - await get_friend_add_list.finish(msg1) - - -approve_friend_add = Manege().on_command("同意好友", "同意好友申请", permission=SUPERUSER) - - -@approve_friend_add.handle() -async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["approve_friend_add"] - - -@approve_friend_add.got("approve_friend_add", "申请码GKD!") -async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["approve_friend_add"] - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await approve_friend_add.finish("好吧...") - - try: - await bot.set_friend_add_request(flag=apply_code, approve=True) - except BaseException: - await approve_friend_add.finish("同意失败...尝试下手动?") - data = Manege().load_friend_apply_list() - data.pop(apply_code) - Manege().save_friend_apply_list(data) - await approve_friend_add.finish("好欸!申请已通过!") - - -refuse_friend_add = Manege().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER) - - -@refuse_friend_add.handle() -async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["refuse_friend_add"] - - -@refuse_friend_add.got("refuse_friend_add", "申请码GKD!") -async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["refuse_friend_add"] - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await refuse_friend_add.finish("好吧...") - - try: - await bot.set_friend_add_request(flag=apply_code, approve=False) - except BaseException: - await refuse_friend_add.finish("拒绝失败...尝试下手动?") - data = Manege().load_friend_apply_list() - data.pop(apply_code) - Manege().save_friend_apply_list(data) - await refuse_friend_add.finish("已拒绝!") - - -get_group_invite_list = Manege().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER) - - -@get_group_invite_list.handle() -async def _get_group_invite_list(bot: Bot, event: MessageEvent): - data = Manege().load_invite_apply_list() - temp_list = list() - for i in data: - apply_code = i - apply_user = data[i]["user_id"] - apply_comment = data[i]["comment"] - temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" - temp_list.append(temp_msg) - - msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) - msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" - await get_friend_add_list.finish(msg1) - - -approve_group_invite = Manege().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER) - - -@approve_group_invite.handle() -async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["approve_group_invite"] - - -@approve_group_invite.got("approve_group_invite", "申请码GKD!") -async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["approve_group_invite"] - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await approve_group_invite.finish("好吧...") - - try: - await bot.set_group_add_request( - flag=apply_code, sub_type="invite", approve=True - ) - except BaseException: - await approve_group_invite.finish("同意失败...尝试下手动?") - data = Manege().load_invite_apply_list() - data.pop(apply_code) - Manege().save_invite_apply_list(data) - await approve_group_invite.finish("好欸!申请已通过!") - - -refuse_group_invite = Manege().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER) - - -@refuse_group_invite.handle() -async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): - msg = str(event.message).strip() - if msg: - state["refuse_group_invite"] - - -@refuse_group_invite.got("refuse_group_invite", "申请码GKD!") -async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): - apply_code = state["refuse_group_invite"] - quit_list = ["算了", "罢了"] - if apply_code in quit_list: - await refuse_group_invite.finish("好吧...") - - try: - await bot.set_group_add_request( - flag=apply_code, sub_type="invite", approve=False - ) - except BaseException: - await refuse_group_invite.finish("拒绝失败...尝试下手动?") - data = Manege().load_invite_apply_list() - data.pop(apply_code) - Manege().save_invite_apply_list(data) - await refuse_group_invite.finish("已拒绝!") - - -track_error = Manege().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) - - -@track_error.handle() -async def _track_error(bot: Bot, event: MessageEvent): - track_id = str(event.message).strip() - repo = await Manege().track_error(track_id) - await track_error.finish(repo) diff --git a/ATRI/plugins/manege/data_source.py b/ATRI/plugins/manege/data_source.py deleted file mode 100644 index 8e61492..0000000 --- a/ATRI/plugins/manege/data_source.py +++ /dev/null @@ -1,274 +0,0 @@ -import os -import json -from pathlib import Path -from datetime import datetime - -from ATRI.service import Service, ServiceTools -from ATRI.utils import UbuntuPaste -from ATRI.exceptions import ReadFileError, load_error - - -MANEGE_DIR = Path(".") / "data" / "database" / "manege" -ESSENTIAL_DIR = Path(".") / "data" / "database" / "essential" -os.makedirs(MANEGE_DIR, exist_ok=True) -os.makedirs(ESSENTIAL_DIR, exist_ok=True) - - -TRACK_BACK_FORMAT = """ -Track ID:{track_id} -Prompt: {prompt} -Time: {time} -{content} -""".strip() - - -__doc__ = """ -控制bot的各项服务 -""" - - -class Manege(Service): - def __init__(self): - Service.__init__(self, "管理", __doc__, True) - - @staticmethod - def _load_block_user_list() -> dict: - """ - 文件结构: - { - "Block user ID": { - "time": "Block time" - } - } - """ - file_name = "block_user.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def _save_block_user_list(data: dict) -> None: - file_name = "block_user.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @staticmethod - def _load_block_group_list() -> dict: - """ - 文件结构: - { - "Block group ID": { - "time": "Block time" - } - } - """ - file_name = "block_group.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def _save_block_group_list(data: dict) -> None: - file_name = "block_group.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @classmethod - def block_user(cls, user_id: str) -> bool: - data = cls._load_block_user_list() - now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data[user_id] = {"time": now_time} - try: - cls._save_block_user_list(data) - return True - except BaseException: - return False - - @classmethod - def unblock_user(cls, user_id: str) -> bool: - data: dict = cls._load_block_user_list() - if user_id not in data: - return False - - try: - data.pop(user_id) - cls._save_block_user_list(data) - return True - except BaseException: - return False - - @classmethod - def block_group(cls, group_id: str) -> bool: - data = cls._load_block_group_list() - now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data[group_id] = {"time": now_time} - try: - cls._save_block_group_list(data) - return True - except BaseException: - return False - - @classmethod - def unblock_group(cls, group_id: str) -> bool: - data: dict = cls._load_block_group_list() - if group_id not in data: - return False - - try: - data.pop(group_id) - cls._save_block_group_list(data) - return True - except BaseException: - return False - - @staticmethod - def control_global_service(service: str, is_enabled: bool) -> bool: - """ - Only SUPERUSER. - """ - try: - data = ServiceTools().load_service(service) - except BaseException: - return False - data["enabled"] = is_enabled - ServiceTools().save_service(data, service) - return True - - @staticmethod - def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool: - """ - Only SUPERUSER. - """ - try: - data = ServiceTools().load_service(service) - except BaseException: - return False - temp_list: list = data.get("disable_user", list()) - - if is_enabled: - try: - temp_list.remove(user_id) - except BaseException: - return False - else: - temp_list.append(user_id) - data["disable_user"] = temp_list - ServiceTools().save_service(data, service) - return True - - @staticmethod - def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool: - """ - SUPERUSER and GROUPADMIN or GROUPOWNER. - Only current group. - """ - try: - data = ServiceTools().load_service(service) - except BaseException: - return False - temp_list: list = data.get("disable_group", list()) - - if is_enabled: - try: - temp_list.remove(group_id) - except BaseException: - return False - else: - temp_list.append(group_id) - data["disable_group"] = temp_list - ServiceTools().save_service(data, service) - return True - - @staticmethod - def load_friend_apply_list() -> dict: - file_name = "friend_add.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def save_friend_apply_list(data: dict) -> None: - file_name = "friend_add.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @staticmethod - def load_invite_apply_list() -> dict: - file_name = "group_invite.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def save_invite_apply_list(data: dict) -> None: - file_name = "group_invite.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @staticmethod - async def track_error(track_id: str) -> str: - try: - data = load_error(track_id) - except ReadFileError: - return "请检查ID是否正确..." - - prompt = data.get("prompt", "ignore") - time = data.get("time", "ignore") - content = data.get("content", "ignore") - - msg0 = TRACK_BACK_FORMAT.format( - track_id=track_id, prompt=prompt, time=time, content=content - ) - repo = f"详细请移步此处~\n{await UbuntuPaste(content=msg0).paste()}" - return repo -- cgit v1.2.3