diff options
Diffstat (limited to 'ATRI/plugins/manege/data_source.py')
-rw-r--r-- | ATRI/plugins/manege/data_source.py | 274 |
1 files changed, 0 insertions, 274 deletions
diff --git a/ATRI/plugins/manege/data_source.py b/ATRI/plugins/manege/data_source.py deleted file mode 100644 index 8e61492..0000000 --- a/ATRI/plugins/manege/data_source.py +++ /dev/null @@ -1,274 +0,0 @@ -import os -import json -from pathlib import Path -from datetime import datetime - -from ATRI.service import Service, ServiceTools -from ATRI.utils import UbuntuPaste -from ATRI.exceptions import ReadFileError, load_error - - -MANEGE_DIR = Path(".") / "data" / "database" / "manege" -ESSENTIAL_DIR = Path(".") / "data" / "database" / "essential" -os.makedirs(MANEGE_DIR, exist_ok=True) -os.makedirs(ESSENTIAL_DIR, exist_ok=True) - - -TRACK_BACK_FORMAT = """ -Track ID:{track_id} -Prompt: {prompt} -Time: {time} -{content} -""".strip() - - -__doc__ = """ -控制bot的各项服务 -""" - - -class Manege(Service): - def __init__(self): - Service.__init__(self, "管理", __doc__, True) - - @staticmethod - def _load_block_user_list() -> dict: - """ - 文件结构: - { - "Block user ID": { - "time": "Block time" - } - } - """ - file_name = "block_user.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def _save_block_user_list(data: dict) -> None: - file_name = "block_user.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @staticmethod - def _load_block_group_list() -> dict: - """ - 文件结构: - { - "Block group ID": { - "time": "Block time" - } - } - """ - file_name = "block_group.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def _save_block_group_list(data: dict) -> None: - file_name = "block_group.json" - path = MANEGE_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @classmethod - def block_user(cls, user_id: str) -> bool: - data = cls._load_block_user_list() - now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data[user_id] = {"time": now_time} - try: - cls._save_block_user_list(data) - return True - except BaseException: - return False - - @classmethod - def unblock_user(cls, user_id: str) -> bool: - data: dict = cls._load_block_user_list() - if user_id not in data: - return False - - try: - data.pop(user_id) - cls._save_block_user_list(data) - return True - except BaseException: - return False - - @classmethod - def block_group(cls, group_id: str) -> bool: - data = cls._load_block_group_list() - now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data[group_id] = {"time": now_time} - try: - cls._save_block_group_list(data) - return True - except BaseException: - return False - - @classmethod - def unblock_group(cls, group_id: str) -> bool: - data: dict = cls._load_block_group_list() - if group_id not in data: - return False - - try: - data.pop(group_id) - cls._save_block_group_list(data) - return True - except BaseException: - return False - - @staticmethod - def control_global_service(service: str, is_enabled: bool) -> bool: - """ - Only SUPERUSER. - """ - try: - data = ServiceTools().load_service(service) - except BaseException: - return False - data["enabled"] = is_enabled - ServiceTools().save_service(data, service) - return True - - @staticmethod - def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool: - """ - Only SUPERUSER. - """ - try: - data = ServiceTools().load_service(service) - except BaseException: - return False - temp_list: list = data.get("disable_user", list()) - - if is_enabled: - try: - temp_list.remove(user_id) - except BaseException: - return False - else: - temp_list.append(user_id) - data["disable_user"] = temp_list - ServiceTools().save_service(data, service) - return True - - @staticmethod - def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool: - """ - SUPERUSER and GROUPADMIN or GROUPOWNER. - Only current group. - """ - try: - data = ServiceTools().load_service(service) - except BaseException: - return False - temp_list: list = data.get("disable_group", list()) - - if is_enabled: - try: - temp_list.remove(group_id) - except BaseException: - return False - else: - temp_list.append(group_id) - data["disable_group"] = temp_list - ServiceTools().save_service(data, service) - return True - - @staticmethod - def load_friend_apply_list() -> dict: - file_name = "friend_add.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def save_friend_apply_list(data: dict) -> None: - file_name = "friend_add.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @staticmethod - def load_invite_apply_list() -> dict: - file_name = "group_invite.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return dict() - - try: - data = json.loads(path.read_bytes()) - except BaseException: - data = dict() - return data - - @staticmethod - def save_invite_apply_list(data: dict) -> None: - file_name = "group_invite.json" - path = ESSENTIAL_DIR / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - - @staticmethod - async def track_error(track_id: str) -> str: - try: - data = load_error(track_id) - except ReadFileError: - return "请检查ID是否正确..." - - prompt = data.get("prompt", "ignore") - time = data.get("time", "ignore") - content = data.get("content", "ignore") - - msg0 = TRACK_BACK_FORMAT.format( - track_id=track_id, prompt=prompt, time=time, content=content - ) - repo = f"详细请移步此处~\n{await UbuntuPaste(content=msg0).paste()}" - return repo |