summaryrefslogtreecommitdiff
path: root/ATRI/plugins/manage
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/manage')
-rw-r--r--ATRI/plugins/manage/__init__.py362
-rw-r--r--ATRI/plugins/manage/data_source.py283
2 files changed, 645 insertions, 0 deletions
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py
new file mode 100644
index 0000000..c1dfb3b
--- /dev/null
+++ b/ATRI/plugins/manage/__init__.py
@@ -0,0 +1,362 @@
+import re
+
+from nonebot.typing import T_State
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
+from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN
+
+from .data_source import Manage
+
+
+block_user = Manage().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER)
+
+@block_user.handle()
+async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["block_user"] = msg
+
+@block_user.got("block_user", "哪位?GKD!")
+async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State):
+ user_id = state["block_user"]
+ quit_list = ["算了", "罢了"]
+ if user_id in quit_list:
+ await block_user.finish("...看来有人逃过一劫呢")
+
+ is_ok = Manage().block_user(user_id)
+ if not is_ok:
+ await block_user.finish("kuso!封禁失败了...")
+
+ await block_user.finish(f"用户 {user_id} 危!")
+
+
+unblock_user = Manage().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER)
+
+@unblock_user.handle()
+async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["unblock_user"] = msg
+
+@unblock_user.got("unblock_user", "哪位?GKD!")
+async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
+ user_id = state["unblock_user"]
+ quit_list = ["算了", "罢了"]
+ if user_id in quit_list:
+ await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了")
+
+ is_ok = Manage().unblock_user(user_id)
+ if not is_ok:
+ await unblock_user.finish("kuso!解封失败了...")
+
+ await unblock_user.finish(f"好欸!{user_id} 重获新生!")
+
+
+block_group = Manage().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER)
+
+@block_group.handle()
+async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["block_group"] = msg
+
+@block_group.got("block_group", "哪个群?GKD!")
+async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State):
+ group_id = state["block_group"]
+ quit_list = ["算了", "罢了"]
+ if group_id in quit_list:
+ await block_group.finish("...看来有一群逃过一劫呢")
+
+ is_ok = Manage().block_group(group_id)
+ if not is_ok:
+ await block_group.finish("kuso!封禁失败了...")
+
+ await block_group.finish(f"群 {group_id} 危!")
+
+
+unblock_group = Manage().on_command("解封群", "对目标群进行解封", permission=SUPERUSER)
+
+@unblock_group.handle()
+async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["unblock_group"] = msg
+
+@unblock_group.got("unblock_group", "哪个群?GKD!")
+async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
+ group_id = state["unblock_group"]
+ quit_list = ["算了", "罢了"]
+ if group_id in quit_list:
+ await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了")
+
+ is_ok = Manage().unblock_group(group_id)
+ if not is_ok:
+ await unblock_group.finish("kuso!解封失败了...")
+
+ await unblock_group.finish(f"好欸!群 {group_id} 重获新生!")
+
+
+global_block_service = Manage().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER)
+
+@global_block_service.handle()
+async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["global_block_service"] = msg
+
+@global_block_service.got("global_block_service", "阿...是哪个服务呢")
+async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State):
+ block_service = state["global_block_service"]
+ quit_list = ["算了", "罢了"]
+ if block_service in quit_list:
+ await global_block_service.finish("好吧...")
+
+ is_ok = Manage().control_global_service(block_service, False)
+ if not is_ok:
+ await global_block_service.finish("kuso!禁用失败了...")
+
+ await global_block_service.finish(f"服务 {block_service} 已被禁用")
+
+
+global_unblock_service = Manage().on_command("全局启用", "全局启用某服务", permission=SUPERUSER)
+
+@global_unblock_service.handle()
+async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["global_unblock_service"] = msg
+
+@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢")
+async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
+ unblock_service = state["global_unblock_service"]
+ quit_list = ["算了", "罢了"]
+ if unblock_service in quit_list:
+ await global_unblock_service.finish("好吧...")
+
+ is_ok = Manage().control_global_service(unblock_service, True)
+ if not is_ok:
+ await global_unblock_service.finish("kuso!启用服务失败了...")
+
+ await global_unblock_service.finish(f"服务 {unblock_service} 已启用")
+
+
+user_block_service = Manage().on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER)
+
+@user_block_service.handle()
+async def _user_block_service(bot: Bot, event: MessageEvent):
+ msg = str(event.message).strip()
+ pattern = r"对用户(.*?)禁用(.*)"
+ reg = re.findall(pattern, msg)
+ aim_user = reg[0]
+ aim_service = reg[1]
+
+ is_ok = Manage().control_user_service(aim_service, aim_user, False)
+ if not is_ok:
+ await user_block_service.finish("禁用失败...请检查服务名是否正确")
+ await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}")
+
+
+
+user_unblock_service = Manage().on_regex(r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER)
+
+@user_unblock_service.handle()
+async def _user_unblock_service(bot: Bot, event: MessageEvent):
+ msg = str(event.message).strip()
+ pattern = r"对用户(.*?)启用(.*)"
+ reg = re.findall(pattern, msg)
+ aim_user = reg[0]
+ aim_service = reg[1]
+
+ is_ok = Manage().control_user_service(aim_service, aim_user, True)
+ if not is_ok:
+ await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中")
+ await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}")
+
+
+group_block_service = Manage().on_command("禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN)
+
+@group_block_service.handle()
+async def _ready_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["group_block_service"] = msg
+
+@group_block_service.got("group_block_service", "阿...是哪个服务呢")
+async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+ aim_service = state["group_block_service"]
+ group_id = str(event.group_id)
+ quit_list = ["算了", "罢了"]
+ if aim_service in quit_list:
+ await group_block_service.finish("好吧...")
+
+ is_ok = Manage().control_group_service(aim_service, group_id, False)
+ if not is_ok:
+ await group_block_service.finish("禁用失败...请检查服务名是否输入正确")
+ await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}")
+
+
+group_unblock_service = Manage().on_command("启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN)
+
+@group_unblock_service.handle()
+async def _ready_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["group_unblock_service"] = msg
+
+@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢")
+async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+ aim_service = state["group_unblock_service"]
+ group_id = str(event.group_id)
+ quit_list = ["算了", "罢了"]
+ if aim_service in quit_list:
+ await group_unblock_service.finish("好吧...")
+
+ is_ok = Manage().control_group_service(aim_service, group_id, True)
+ if not is_ok:
+ await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中")
+ await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}")
+
+
+get_friend_add_list = Manage().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER)
+
+@get_friend_add_list.handle()
+async def _get_friend_add_list(bot: Bot, event: MessageEvent):
+ data = Manage().load_friend_apply_list()
+ temp_list = list()
+ for i in data:
+ apply_code = i
+ apply_user = data[i]["user_id"]
+ apply_comment = data[i]["comment"]
+ temp_msg = f"{apply_user} | {apply_comment} | {apply_code}"
+ temp_list.append(temp_msg)
+
+ msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list))
+ msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定"
+ await get_friend_add_list.finish(msg1)
+
+
+approve_friend_add = Manage().on_command("同意好友", "同意好友申请", permission=SUPERUSER)
+
+@approve_friend_add.handle()
+async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["approve_friend_add"]
+
+@approve_friend_add.got("approve_friend_add", "申请码GKD!")
+async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
+ apply_code = state["approve_friend_add"]
+ quit_list = ["算了", "罢了"]
+ if apply_code in quit_list:
+ await approve_friend_add.finish("好吧...")
+
+ try:
+ await bot.set_friend_add_request(flag=apply_code, approve=True)
+ except BaseException:
+ await approve_friend_add.finish("同意失败...尝试下手动?")
+ data = Manage().load_friend_apply_list()
+ data.pop(apply_code)
+ Manage().save_friend_apply_list(data)
+ await approve_friend_add.finish("好欸!申请已通过!")
+
+
+refuse_friend_add = Manage().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER)
+
+@refuse_friend_add.handle()
+async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["refuse_friend_add"]
+
+@refuse_friend_add.got("refuse_friend_add", "申请码GKD!")
+async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
+ apply_code = state["refuse_friend_add"]
+ quit_list = ["算了", "罢了"]
+ if apply_code in quit_list:
+ await refuse_friend_add.finish("好吧...")
+
+ try:
+ await bot.set_friend_add_request(flag=apply_code, approve=False)
+ except BaseException:
+ await refuse_friend_add.finish("拒绝失败...尝试下手动?")
+ data = Manage().load_friend_apply_list()
+ data.pop(apply_code)
+ Manage().save_friend_apply_list(data)
+ await refuse_friend_add.finish("已拒绝!")
+
+
+get_group_invite_list = Manage().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER)
+
+@get_group_invite_list.handle()
+async def _get_group_invite_list(bot: Bot, event: MessageEvent):
+ data = Manage().load_invite_apply_list()
+ temp_list = list()
+ for i in data:
+ apply_code = i
+ apply_user = data[i]["user_id"]
+ apply_comment = data[i]["comment"]
+ temp_msg = f"{apply_user} | {apply_comment} | {apply_code}"
+ temp_list.append(temp_msg)
+
+ msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list))
+ msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定"
+ await get_friend_add_list.finish(msg1)
+
+
+approve_group_invite = Manage().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER)
+
+@approve_group_invite.handle()
+async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["approve_group_invite"]
+
+@approve_group_invite.got("approve_group_invite", "申请码GKD!")
+async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
+ apply_code = state["approve_group_invite"]
+ quit_list = ["算了", "罢了"]
+ if apply_code in quit_list:
+ await approve_group_invite.finish("好吧...")
+
+ try:
+ await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=True)
+ except BaseException:
+ await approve_group_invite.finish("同意失败...尝试下手动?")
+ data = Manage().load_invite_apply_list()
+ data.pop(apply_code)
+ Manage().save_invite_apply_list(data)
+ await approve_group_invite.finish("好欸!申请已通过!")
+
+
+refuse_group_invite = Manage().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER)
+
+@refuse_group_invite.handle()
+async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ if msg:
+ state["refuse_group_invite"]
+
+@refuse_group_invite.got("refuse_group_invite", "申请码GKD!")
+async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
+ apply_code = state["refuse_group_invite"]
+ quit_list = ["算了", "罢了"]
+ if apply_code in quit_list:
+ await refuse_group_invite.finish("好吧...")
+
+ try:
+ await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=False)
+ except BaseException:
+ await refuse_group_invite.finish("拒绝失败...尝试下手动?")
+ data = Manage().load_invite_apply_list()
+ data.pop(apply_code)
+ Manage().save_invite_apply_list(data)
+ await refuse_group_invite.finish("已拒绝!")
+
+
+track_error = Manage().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"})
+
+@track_error.handle()
+async def _track_error(bot: Bot, event: MessageEvent):
+ track_id = str(event.message).strip()
+ repo = await Manage().track_error(track_id)
+ await track_error.finish(repo)
diff --git a/ATRI/plugins/manage/data_source.py b/ATRI/plugins/manage/data_source.py
new file mode 100644
index 0000000..74d81d4
--- /dev/null
+++ b/ATRI/plugins/manage/data_source.py
@@ -0,0 +1,283 @@
+import os
+import json
+from pathlib import Path
+from datetime import datetime
+
+from ATRI.service import Service, ServiceTools
+from ATRI.utils import UbuntuPaste
+from ATRI.exceptions import ReadFileError, load_error
+
+
+MANAGE_DIR = Path(".") / "data" / "database" / "manege"
+ESSENTIAL_DIR = Path(".") / "data" / "database" / "essential"
+os.makedirs(MANAGE_DIR, exist_ok=True)
+os.makedirs(ESSENTIAL_DIR, exist_ok=True)
+
+
+TRACK_BACK_FORMAT = """
+Track ID:{track_id}
+Prompt: {prompt}
+Time: {time}
+{content}
+""".strip()
+
+
+__doc__ = """
+控制bot的各项服务
+"""
+
+
+class Manage(Service):
+
+ def __init__(self):
+ Service.__init__(self, "管理", __doc__, True)
+
+ @staticmethod
+ def _load_block_user_list() -> dict:
+ """
+ 文件结构:
+ {
+ "Block user ID": {
+ "time": "Block time"
+ }
+ }
+ """
+ file_name = "block_user.json"
+ path = MANAGE_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ return dict()
+ try:
+ data = json.loads(path.read_bytes())
+ except BaseException:
+ data = dict()
+ return data
+
+
+ @staticmethod
+ def _save_block_user_list(data: dict) -> None:
+ file_name = "block_user.json"
+ path = MANAGE_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+
+ @staticmethod
+ def _load_block_group_list() -> dict:
+ """
+ 文件结构:
+ {
+ "Block group ID": {
+ "time": "Block time"
+ }
+ }
+ """
+ file_name = "block_group.json"
+ path = MANAGE_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ return dict()
+
+ try:
+ data = json.loads(path.read_bytes())
+ except BaseException:
+ data = dict()
+ return data
+
+ @staticmethod
+ def _save_block_group_list(data: dict) -> None:
+ file_name = "block_group.json"
+ path = MANAGE_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+
+ @classmethod
+ def block_user(cls, user_id: str) -> bool:
+ data = cls._load_block_user_list()
+ now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ data[user_id] = {
+ "time": now_time
+ }
+ try:
+ cls._save_block_user_list(data)
+ return True
+ except BaseException:
+ return False
+
+ @classmethod
+ def unblock_user(cls, user_id: str) -> bool:
+ data: dict = cls._load_block_user_list()
+ if user_id not in data:
+ return False
+
+ try:
+ data.pop(user_id)
+ cls._save_block_user_list(data)
+ return True
+ except BaseException:
+ return False
+
+ @classmethod
+ def block_group(cls, group_id: str) -> bool:
+ data = cls._load_block_group_list()
+ now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ data[group_id] = {
+ "time": now_time
+ }
+ try:
+ cls._save_block_group_list(data)
+ return True
+ except BaseException:
+ return False
+
+ @classmethod
+ def unblock_group(cls, group_id: str) -> bool:
+ data: dict = cls._load_block_group_list()
+ if group_id not in data:
+ return False
+
+ try:
+ data.pop(group_id)
+ cls._save_block_group_list(data)
+ return True
+ except BaseException:
+ return False
+
+ @staticmethod
+ def control_global_service(service: str, is_enabled: bool) -> bool:
+ """
+ Only SUPERUSER.
+ """
+ try:
+ data = ServiceTools().load_service(service)
+ except BaseException:
+ return False
+ data["enabled"] = is_enabled
+ ServiceTools().save_service(data, service)
+ return True
+
+ @staticmethod
+ def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool:
+ """
+ Only SUPERUSER.
+ """
+ try:
+ data = ServiceTools().load_service(service)
+ except BaseException:
+ return False
+ temp_list: list = data.get("disable_user", list())
+
+ if is_enabled:
+ try:
+ temp_list.remove(user_id)
+ except BaseException:
+ return False
+ else:
+ temp_list.append(user_id)
+ data["disable_user"] = temp_list
+ ServiceTools().save_service(data, service)
+ return True
+
+ @staticmethod
+ def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool:
+ """
+ SUPERUSER and GROUPADMIN or GROUPOWNER.
+ Only current group.
+ """
+ try:
+ data = ServiceTools().load_service(service)
+ except BaseException:
+ return False
+ temp_list: list = data.get("disable_group", list())
+
+ if is_enabled:
+ try:
+ temp_list.remove(group_id)
+ except BaseException:
+ return False
+ else:
+ temp_list.append(group_id)
+ data["disable_group"] = temp_list
+ ServiceTools().save_service(data, service)
+ return True
+
+ @staticmethod
+ def load_friend_apply_list() -> dict:
+ file_name = "friend_add.json"
+ path = ESSENTIAL_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ return dict()
+
+ try:
+ data = json.loads(path.read_bytes())
+ except BaseException:
+ data = dict()
+ return data
+
+ @staticmethod
+ def save_friend_apply_list(data: dict) -> None:
+ file_name = "friend_add.json"
+ path = ESSENTIAL_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+
+ @staticmethod
+ def load_invite_apply_list() -> dict:
+ file_name = "group_invite.json"
+ path = ESSENTIAL_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ return dict()
+
+ try:
+ data = json.loads(path.read_bytes())
+ except BaseException:
+ data = dict()
+ return data
+
+ @staticmethod
+ def save_invite_apply_list(data: dict) -> None:
+ file_name = "group_invite.json"
+ path = ESSENTIAL_DIR / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+
+ @staticmethod
+ async def track_error(track_id: str) -> str:
+ try:
+ data = load_error(track_id)
+ except ReadFileError:
+ return "请检查ID是否正确..."
+
+ prompt = data.get("prompt", "ignore")
+ time = data.get("time", "ignore")
+ content = data.get("content", "ignore")
+
+ msg0 = TRACK_BACK_FORMAT.format(
+ track_id=track_id,
+ prompt=prompt,
+ time=time,
+ content=content
+ )
+ repo = f"详细请移步此处~\n{await UbuntuPaste(content=msg0).paste()}"
+ return repo