diff options
author | Kyomotoi <[email protected]> | 2021-08-13 17:52:10 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2021-08-13 17:52:10 +0800 |
commit | 24644daeb153e781d162c638a79b3727df242802 (patch) | |
tree | 6e6439b46dace352b1e1b87d0e0415dd75c2ad97 | |
parent | 7bc3f22086d3361deb8cc5d383fca48d4d16213b (diff) | |
download | ATRI-24644daeb153e781d162c638a79b3727df242802.tar.gz ATRI-24644daeb153e781d162c638a79b3727df242802.tar.bz2 ATRI-24644daeb153e781d162c638a79b3727df242802.zip |
✏️:Typo
-rw-r--r-- | ATRI/plugins/manage/__init__.py (renamed from ATRI/plugins/manege/__init__.py) | 178 | ||||
-rw-r--r-- | ATRI/plugins/manage/data_source.py (renamed from ATRI/plugins/manege/data_source.py) | 71 |
2 files changed, 106 insertions, 143 deletions
diff --git a/ATRI/plugins/manege/__init__.py b/ATRI/plugins/manage/__init__.py index 7a1d45c..c1dfb3b 100644 --- a/ATRI/plugins/manege/__init__.py +++ b/ATRI/plugins/manage/__init__.py @@ -5,11 +5,10 @@ from nonebot.permission import SUPERUSER from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN -from .data_source import Manege +from .data_source import Manage -block_user = Manege().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER) - +block_user = Manage().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER) @block_user.handle() async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State): @@ -17,23 +16,21 @@ async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State): if msg: state["block_user"] = msg - @block_user.got("block_user", "哪位?GKD!") async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State): user_id = state["block_user"] quit_list = ["算了", "罢了"] if user_id in quit_list: await block_user.finish("...看来有人逃过一劫呢") - - is_ok = Manege().block_user(user_id) + + is_ok = Manage().block_user(user_id) if not is_ok: await block_user.finish("kuso!封禁失败了...") - + await block_user.finish(f"用户 {user_id} 危!") -unblock_user = Manege().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER) - +unblock_user = Manage().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER) @unblock_user.handle() async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State): @@ -41,23 +38,21 @@ async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State): if msg: state["unblock_user"] = msg - @unblock_user.got("unblock_user", "哪位?GKD!") async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State): user_id = state["unblock_user"] quit_list = ["算了", "罢了"] if user_id in quit_list: await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") - - is_ok = Manege().unblock_user(user_id) + + is_ok = Manage().unblock_user(user_id) if not is_ok: await unblock_user.finish("kuso!解封失败了...") - + await unblock_user.finish(f"好欸!{user_id} 重获新生!") -block_group = Manege().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER) - +block_group = Manage().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER) @block_group.handle() async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State): @@ -65,23 +60,21 @@ async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State): if msg: state["block_group"] = msg - @block_group.got("block_group", "哪个群?GKD!") async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State): group_id = state["block_group"] quit_list = ["算了", "罢了"] if group_id in quit_list: await block_group.finish("...看来有一群逃过一劫呢") - - is_ok = Manege().block_group(group_id) + + is_ok = Manage().block_group(group_id) if not is_ok: await block_group.finish("kuso!封禁失败了...") - + await block_group.finish(f"群 {group_id} 危!") -unblock_group = Manege().on_command("解封群", "对目标群进行解封", permission=SUPERUSER) - +unblock_group = Manage().on_command("解封群", "对目标群进行解封", permission=SUPERUSER) @unblock_group.handle() async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State): @@ -89,23 +82,21 @@ async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State): if msg: state["unblock_group"] = msg - @unblock_group.got("unblock_group", "哪个群?GKD!") async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State): group_id = state["unblock_group"] quit_list = ["算了", "罢了"] if group_id in quit_list: await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") - - is_ok = Manege().unblock_group(group_id) + + is_ok = Manage().unblock_group(group_id) if not is_ok: await unblock_group.finish("kuso!解封失败了...") - + await unblock_group.finish(f"好欸!群 {group_id} 重获新生!") -global_block_service = Manege().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER) - +global_block_service = Manage().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER) @global_block_service.handle() async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State): @@ -113,23 +104,21 @@ async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State): if msg: state["global_block_service"] = msg - @global_block_service.got("global_block_service", "阿...是哪个服务呢") async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State): block_service = state["global_block_service"] quit_list = ["算了", "罢了"] if block_service in quit_list: await global_block_service.finish("好吧...") - - is_ok = Manege().control_global_service(block_service, False) + + is_ok = Manage().control_global_service(block_service, False) if not is_ok: await global_block_service.finish("kuso!禁用失败了...") - + await global_block_service.finish(f"服务 {block_service} 已被禁用") -global_unblock_service = Manege().on_command("全局启用", "全局启用某服务", permission=SUPERUSER) - +global_unblock_service = Manage().on_command("全局启用", "全局启用某服务", permission=SUPERUSER) @global_unblock_service.handle() async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State): @@ -137,25 +126,21 @@ async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State): if msg: state["global_unblock_service"] = msg - @global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): unblock_service = state["global_unblock_service"] quit_list = ["算了", "罢了"] if unblock_service in quit_list: await global_unblock_service.finish("好吧...") - - is_ok = Manege().control_global_service(unblock_service, True) + + is_ok = Manage().control_global_service(unblock_service, True) if not is_ok: await global_unblock_service.finish("kuso!启用服务失败了...") - + await global_unblock_service.finish(f"服务 {unblock_service} 已启用") -user_block_service = Manege().on_regex( - r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER -) - +user_block_service = Manage().on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER) @user_block_service.handle() async def _user_block_service(bot: Bot, event: MessageEvent): @@ -164,17 +149,15 @@ async def _user_block_service(bot: Bot, event: MessageEvent): reg = re.findall(pattern, msg) aim_user = reg[0] aim_service = reg[1] - - is_ok = Manege().control_user_service(aim_service, aim_user, False) + + is_ok = Manage().control_user_service(aim_service, aim_user, False) if not is_ok: await user_block_service.finish("禁用失败...请检查服务名是否正确") await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}") + -user_unblock_service = Manege().on_regex( - r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER -) - +user_unblock_service = Manage().on_regex(r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER) @user_unblock_service.handle() async def _user_unblock_service(bot: Bot, event: MessageEvent): @@ -183,27 +166,21 @@ async def _user_unblock_service(bot: Bot, event: MessageEvent): reg = re.findall(pattern, msg) aim_user = reg[0] aim_service = reg[1] - - is_ok = Manege().control_user_service(aim_service, aim_user, True) + + is_ok = Manage().control_user_service(aim_service, aim_user, True) if not is_ok: await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中") await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}") -group_block_service = Manege().on_command( - "禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN -) - +group_block_service = Manage().on_command("禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) @group_block_service.handle() -async def _ready_group_block_service( - bot: Bot, event: GroupMessageEvent, state: T_State -): +async def _ready_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): msg = str(event.message).strip() if msg: state["group_block_service"] = msg - @group_block_service.got("group_block_service", "阿...是哪个服务呢") async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): aim_service = state["group_block_service"] @@ -211,49 +188,40 @@ async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T quit_list = ["算了", "罢了"] if aim_service in quit_list: await group_block_service.finish("好吧...") - - is_ok = Manege().control_group_service(aim_service, group_id, False) + + is_ok = Manage().control_group_service(aim_service, group_id, False) if not is_ok: await group_block_service.finish("禁用失败...请检查服务名是否输入正确") await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}") -group_unblock_service = Manege().on_command( - "启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN -) - +group_unblock_service = Manage().on_command("启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) @group_unblock_service.handle() -async def _ready_group_unblock_service( - bot: Bot, event: GroupMessageEvent, state: T_State -): +async def _ready_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): msg = str(event.message).strip() if msg: state["group_unblock_service"] = msg - @group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") -async def _deal_group_unblock_service( - bot: Bot, event: GroupMessageEvent, state: T_State -): +async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): aim_service = state["group_unblock_service"] group_id = str(event.group_id) quit_list = ["算了", "罢了"] if aim_service in quit_list: await group_unblock_service.finish("好吧...") - - is_ok = Manege().control_group_service(aim_service, group_id, True) + + is_ok = Manage().control_group_service(aim_service, group_id, True) if not is_ok: await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中") await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}") -get_friend_add_list = Manege().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER) - +get_friend_add_list = Manage().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER) @get_friend_add_list.handle() async def _get_friend_add_list(bot: Bot, event: MessageEvent): - data = Manege().load_friend_apply_list() + data = Manage().load_friend_apply_list() temp_list = list() for i in data: apply_code = i @@ -265,10 +233,9 @@ async def _get_friend_add_list(bot: Bot, event: MessageEvent): msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" await get_friend_add_list.finish(msg1) + - -approve_friend_add = Manege().on_command("同意好友", "同意好友申请", permission=SUPERUSER) - +approve_friend_add = Manage().on_command("同意好友", "同意好友申请", permission=SUPERUSER) @approve_friend_add.handle() async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): @@ -276,26 +243,24 @@ async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_Stat if msg: state["approve_friend_add"] - @approve_friend_add.got("approve_friend_add", "申请码GKD!") async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): apply_code = state["approve_friend_add"] quit_list = ["算了", "罢了"] if apply_code in quit_list: await approve_friend_add.finish("好吧...") - + try: await bot.set_friend_add_request(flag=apply_code, approve=True) except BaseException: await approve_friend_add.finish("同意失败...尝试下手动?") - data = Manege().load_friend_apply_list() + data = Manage().load_friend_apply_list() data.pop(apply_code) - Manege().save_friend_apply_list(data) + Manage().save_friend_apply_list(data) await approve_friend_add.finish("好欸!申请已通过!") -refuse_friend_add = Manege().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER) - +refuse_friend_add = Manage().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER) @refuse_friend_add.handle() async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): @@ -303,30 +268,28 @@ async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State if msg: state["refuse_friend_add"] - @refuse_friend_add.got("refuse_friend_add", "申请码GKD!") async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): apply_code = state["refuse_friend_add"] quit_list = ["算了", "罢了"] if apply_code in quit_list: await refuse_friend_add.finish("好吧...") - + try: await bot.set_friend_add_request(flag=apply_code, approve=False) except BaseException: await refuse_friend_add.finish("拒绝失败...尝试下手动?") - data = Manege().load_friend_apply_list() + data = Manage().load_friend_apply_list() data.pop(apply_code) - Manege().save_friend_apply_list(data) + Manage().save_friend_apply_list(data) await refuse_friend_add.finish("已拒绝!") -get_group_invite_list = Manege().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER) - +get_group_invite_list = Manage().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER) @get_group_invite_list.handle() async def _get_group_invite_list(bot: Bot, event: MessageEvent): - data = Manege().load_invite_apply_list() + data = Manage().load_invite_apply_list() temp_list = list() for i in data: apply_code = i @@ -340,8 +303,7 @@ async def _get_group_invite_list(bot: Bot, event: MessageEvent): await get_friend_add_list.finish(msg1) -approve_group_invite = Manege().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER) - +approve_group_invite = Manage().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER) @approve_group_invite.handle() async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): @@ -349,28 +311,24 @@ async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_St if msg: state["approve_group_invite"] - @approve_group_invite.got("approve_group_invite", "申请码GKD!") async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): apply_code = state["approve_group_invite"] quit_list = ["算了", "罢了"] if apply_code in quit_list: await approve_group_invite.finish("好吧...") - + try: - await bot.set_group_add_request( - flag=apply_code, sub_type="invite", approve=True - ) + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=True) except BaseException: await approve_group_invite.finish("同意失败...尝试下手动?") - data = Manege().load_invite_apply_list() + data = Manage().load_invite_apply_list() data.pop(apply_code) - Manege().save_invite_apply_list(data) + Manage().save_invite_apply_list(data) await approve_group_invite.finish("好欸!申请已通过!") -refuse_group_invite = Manege().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER) - +refuse_group_invite = Manage().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER) @refuse_group_invite.handle() async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): @@ -378,31 +336,27 @@ async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_Sta if msg: state["refuse_group_invite"] - @refuse_group_invite.got("refuse_group_invite", "申请码GKD!") async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): apply_code = state["refuse_group_invite"] quit_list = ["算了", "罢了"] if apply_code in quit_list: await refuse_group_invite.finish("好吧...") - + try: - await bot.set_group_add_request( - flag=apply_code, sub_type="invite", approve=False - ) + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=False) except BaseException: await refuse_group_invite.finish("拒绝失败...尝试下手动?") - data = Manege().load_invite_apply_list() + data = Manage().load_invite_apply_list() data.pop(apply_code) - Manege().save_invite_apply_list(data) + Manage().save_invite_apply_list(data) await refuse_group_invite.finish("已拒绝!") -track_error = Manege().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) - +track_error = Manage().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) @track_error.handle() async def _track_error(bot: Bot, event: MessageEvent): track_id = str(event.message).strip() - repo = await Manege().track_error(track_id) + repo = await Manage().track_error(track_id) await track_error.finish(repo) diff --git a/ATRI/plugins/manege/data_source.py b/ATRI/plugins/manage/data_source.py index 8e61492..74d81d4 100644 --- a/ATRI/plugins/manege/data_source.py +++ b/ATRI/plugins/manage/data_source.py @@ -8,9 +8,9 @@ from ATRI.utils import UbuntuPaste from ATRI.exceptions import ReadFileError, load_error -MANEGE_DIR = Path(".") / "data" / "database" / "manege" +MANAGE_DIR = Path(".") / "data" / "database" / "manege" ESSENTIAL_DIR = Path(".") / "data" / "database" / "essential" -os.makedirs(MANEGE_DIR, exist_ok=True) +os.makedirs(MANAGE_DIR, exist_ok=True) os.makedirs(ESSENTIAL_DIR, exist_ok=True) @@ -27,7 +27,8 @@ __doc__ = """ """ -class Manege(Service): +class Manage(Service): + def __init__(self): Service.__init__(self, "管理", __doc__, True) @@ -42,7 +43,7 @@ class Manege(Service): } """ file_name = "block_user.json" - path = MANEGE_DIR / file_name + path = MANAGE_DIR / file_name if not path.is_file(): with open(path, "w", encoding="utf-8") as w: w.write(json.dumps({})) @@ -52,18 +53,19 @@ class Manege(Service): except BaseException: data = dict() return data - + + @staticmethod def _save_block_user_list(data: dict) -> None: file_name = "block_user.json" - path = MANEGE_DIR / file_name + path = MANAGE_DIR / file_name if not path.is_file(): with open(path, "w", encoding="utf-8") as w: w.write(json.dumps({})) - + with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(data, indent=4)) - + @staticmethod def _load_block_group_list() -> dict: """ @@ -75,70 +77,74 @@ class Manege(Service): } """ file_name = "block_group.json" - path = MANEGE_DIR / file_name + path = MANAGE_DIR / file_name if not path.is_file(): with open(path, "w", encoding="utf-8") as w: w.write(json.dumps({})) return dict() - + try: data = json.loads(path.read_bytes()) except BaseException: data = dict() return data - + @staticmethod def _save_block_group_list(data: dict) -> None: file_name = "block_group.json" - path = MANEGE_DIR / file_name + path = MANAGE_DIR / file_name if not path.is_file(): with open(path, "w", encoding="utf-8") as w: w.write(json.dumps({})) - + with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(data, indent=4)) - + @classmethod def block_user(cls, user_id: str) -> bool: data = cls._load_block_user_list() now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data[user_id] = {"time": now_time} + data[user_id] = { + "time": now_time + } try: cls._save_block_user_list(data) return True except BaseException: return False - + @classmethod def unblock_user(cls, user_id: str) -> bool: data: dict = cls._load_block_user_list() if user_id not in data: return False - + try: data.pop(user_id) cls._save_block_user_list(data) return True except BaseException: return False - + @classmethod def block_group(cls, group_id: str) -> bool: data = cls._load_block_group_list() now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data[group_id] = {"time": now_time} + data[group_id] = { + "time": now_time + } try: cls._save_block_group_list(data) return True except BaseException: return False - + @classmethod def unblock_group(cls, group_id: str) -> bool: data: dict = cls._load_block_group_list() if group_id not in data: return False - + try: data.pop(group_id) cls._save_block_group_list(data) @@ -158,7 +164,7 @@ class Manege(Service): data["enabled"] = is_enabled ServiceTools().save_service(data, service) return True - + @staticmethod def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool: """ @@ -169,7 +175,7 @@ class Manege(Service): except BaseException: return False temp_list: list = data.get("disable_user", list()) - + if is_enabled: try: temp_list.remove(user_id) @@ -180,7 +186,7 @@ class Manege(Service): data["disable_user"] = temp_list ServiceTools().save_service(data, service) return True - + @staticmethod def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool: """ @@ -192,7 +198,7 @@ class Manege(Service): except BaseException: return False temp_list: list = data.get("disable_group", list()) - + if is_enabled: try: temp_list.remove(group_id) @@ -218,7 +224,7 @@ class Manege(Service): except BaseException: data = dict() return data - + @staticmethod def save_friend_apply_list(data: dict) -> None: file_name = "friend_add.json" @@ -229,7 +235,7 @@ class Manege(Service): with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(data, indent=4)) - + @staticmethod def load_invite_apply_list() -> dict: file_name = "group_invite.json" @@ -244,7 +250,7 @@ class Manege(Service): except BaseException: data = dict() return data - + @staticmethod def save_invite_apply_list(data: dict) -> None: file_name = "group_invite.json" @@ -262,13 +268,16 @@ class Manege(Service): data = load_error(track_id) except ReadFileError: return "请检查ID是否正确..." - + prompt = data.get("prompt", "ignore") time = data.get("time", "ignore") content = data.get("content", "ignore") - + msg0 = TRACK_BACK_FORMAT.format( - track_id=track_id, prompt=prompt, time=time, content=content + track_id=track_id, + prompt=prompt, + time=time, + content=content ) repo = f"详细请移步此处~\n{await UbuntuPaste(content=msg0).paste()}" return repo |