diff options
Diffstat (limited to 'ATRI/service.py')
-rw-r--r-- | ATRI/service.py | 568 |
1 files changed, 301 insertions, 267 deletions
diff --git a/ATRI/service.py b/ATRI/service.py index 989179b..4798fd5 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -2,17 +2,15 @@ import os import re import json from pathlib import Path -from datetime import datetime -from typing import Dict, Any, List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING +from pydantic import BaseModel +from typing import List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING + from nonebot.matcher import Matcher from nonebot.permission import Permission -from nonebot.plugin import on_message from nonebot.typing import T_State, T_Handler, T_RuleChecker from nonebot.rule import Rule, command, keyword, regex -from .log import logger as log -from .config import NetworkPost -from .utils.request import post_bytes +from ATRI.exceptions import ReadFileError, WriteError if TYPE_CHECKING: from nonebot.adapters import Bot, Event @@ -24,148 +22,168 @@ os.makedirs(SERVICE_DIR, exist_ok=True) os.makedirs(SERVICES_DIR, exist_ok=True) -is_sleep: bool = False -matcher_list: list = [] - - -def _load_block_list() -> dict: - file_name = "ban.json" - file = SERVICE_DIR / file_name - try: - data = json.loads(file.read_bytes()) - except: - data = {"user": {}, "group": {}} - with open(file, "w") as r: - r.write(json.dumps(data, indent=4)) - return data - - -def _save_block_list(data: dict) -> None: - file_name = "ban.json" - file = SERVICE_DIR / file_name - with open(file, "w") as r: - r.write(json.dumps(data, indent=4)) +class ServiceInfo(BaseModel): + service: str + docs: str + cmd_list: dict + enabled: bool + only_admin: bool + disable_user: list + disable_group: list -def _load_service_config(service: str, docs: str = None) -> dict: - file_name = service.replace("/", "") + ".json" - file = SERVICES_DIR / file_name - try: - data = json.loads(file.read_bytes()) - except: - service_info = { - "command": service, - "docs": docs, - "enabled": True, - "disable_user": {}, - "disable_group": {}, - } - with open(file, "w") as r: - r.write(json.dumps(service_info, indent=4)) - data = service_info - return data - - -def _save_service_config(service: str, data: dict) -> None: - file_name = service.replace("/", "") + ".json" - file = SERVICES_DIR / file_name - with open(file, "w") as r: - r.write(json.dumps(data, indent=4)) +class CommandInfo(BaseModel): + type: str + docs: str + aliases: list class Service: """ 集成一套服务管理,对功能信息进行持久化 - 计划搭配前端使用 + 服务文件结构: + { + "service": "Service name", + "docs": "Main helps and commands", + "cmd_list": { + "/cmd0": { + "type": "Command type", + "docs": "Command help", + "aliases": ["More trigger ways."] + } + }, + "enabled": True, + "only_admin": False, + "disable_user": [], + "disable_group": [] + } """ - - @staticmethod - def manual_reg_service(service: str, docs: str = None): - file_name = service.replace("/", "") + ".json" - file = SERVICES_DIR / file_name - service_info = { - "command": service, - "docs": docs, - "enabled": True, - "disable_user": {}, - "disable_group": {}, - } - with open(file, "w") as r: - r.write(json.dumps(service_info, indent=4)) - - @staticmethod - def auth_service(service: str, user: str, group: str = None) -> bool: - data = _load_service_config(service) - if user in data["disable_user"]: - return False - else: - if group in data["disable_group"]: - return False - else: - return True - - @staticmethod - def control_service( - service: str, - is_global: bool, - is_enabled: int, - user: str = None, - group: str = None, - ) -> None: - data = _load_service_config(service) - is_enabled = bool(is_enabled) - - if is_global: - status = "disabled" if is_enabled else "enabled" - data["enabled"] = is_enabled - log.info(f"\033[33mService: {service} has been {status}.\033[33m") - else: - if user: - if not is_enabled: - data["disable_user"][user] = str(datetime.now()) - log.info( - f"\033[33mNew service blocked user: {user}\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) - else: - if user in data["disable_user"]: - del data["disable_user"][user] - log.info( - f"\033[33mUser: {user} has been unblock\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) - else: - if not is_enabled: - data["disable_group"][group] = str(datetime.now()) - log.info( - f"\033[33mNew service blocked group: {group}\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) + + def __init__(self, + service: str, + docs: str = None, + only_admin: bool = False, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + state: Optional[T_State] = None): + self.service = service + self.docs = docs + self.only_admin = only_admin + self.rule = rule + self.permission = permission + self.handlers = handlers + self.temp = temp + self.priority = priority + self.state = state + + def _generate_service_config(self, service: str = None, docs: str = None) -> None: + if not service: + service = self.service + if not docs: + docs = self.docs or str() + + path = SERVICES_DIR / f"{service}.json" + data = ServiceInfo( + service=service, + docs=docs, + cmd_list=dict(), + enabled=True, + only_admin=self.only_admin, + disable_user=list(), + disable_group=list() + ) + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + except WriteError: + raise WriteError("Write service info failed!") + + def save_service(self, service_data: dict, service: str = None) -> None: + if not service: + service = self.service + + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + self._generate_service_config() + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(service_data, indent=4)) + + def load_service(self, service: str = None) -> dict: + if not service: + service = self.service + + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + self._generate_service_config() + + try: + data = json.loads(path.read_bytes()) + except ReadFileError: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + self._generate_service_config() + data = json.loads(path.read_bytes()) + return data + + def _save_cmds(self, cmds: dict) -> None: + data = self.load_service(self.service) + temp_data: dict = data["cmd_list"] + temp_data.update(cmds) + self.save_service(data) + + def _load_cmds(self) -> dict: + path = SERVICES_DIR / f"{self.service}.json" + if not path.is_file(): + self._generate_service_config() + + data = json.loads(path.read_bytes()) + return data["cmd_list"] + + def on_message(self, + docs: str = None, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + handlers: Optional[List[T_Handler]] = None, + block: bool = True, + priority: int = None, + state: Optional[T_State] = None) -> Type[Matcher]: + if not rule: + rule = self.rule + if not permission: + permission = self.permission + if not handlers: + handlers = self.handlers + if not priority: + priority = self.priority + if not state: + state = self.state + + if docs: + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "message" + str(a) + if _type not in cmd_list: + break else: - if group in data["disable_group"]: - del data["disable_group"][group] - log.info( - f"\033[33mGroup: {group} has been unblock\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) - _save_service_config(service, data) - - @staticmethod - def on_message( - rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = True, - state: Optional[T_State] = None, - ) -> Type[Matcher]: + a += 1 + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + matcher = Matcher.new( "message", Rule() & rule, permission or Permission(), - temp=temp, + temp=self.temp, priority=priority, block=block, handlers=handlers, @@ -173,59 +191,86 @@ class Service: ) return matcher - @staticmethod - def on_notice( - rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - ) -> Type[Matcher]: + def on_notice(self, docs: str, block: bool = True) -> Type[Matcher]: + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "notice" + str(a) + if _type not in cmd_list: + break + else: + a += 1 + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + matcher = Matcher.new( "notice", - Rule() & rule, + Rule() & self.rule, Permission(), - temp=temp, - priority=priority, + temp=self.temp, + priority=self.priority, block=block, - handlers=handlers, - default_state=state, + handlers=self.handlers, + default_state=self.state, ) return matcher - @staticmethod - def on_request( - rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - ) -> Type[Matcher]: + def on_request(self, docs: str, block: bool = True) -> Type[Matcher]: + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "request" + str(a) + if _type not in cmd_list: + break + else: + a += 1 + + cmd_list[_type] =CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + matcher = Matcher.new( "request", - Rule() & rule, + Rule() & self.rule, Permission(), - temp=temp, - priority=priority, + temp=self.temp, + priority=self.priority, block=block, - handlers=handlers, - default_state=state, + handlers=self.handlers, + default_state=self.state, ) return matcher - @classmethod def on_command( - cls, + self, cmd: Union[str, Tuple[str, ...]], - docs: Optional[str] = None, + docs: str, rule: Optional[Union[Rule, T_RuleChecker]] = None, aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, **kwargs, ) -> Type[Matcher]: + _type = "command" + cmd_list = self._load_cmds() + if not rule: + rule = self.rule + if not aliases: + aliases = set() + + cmd_list[cmd] = CommandInfo( + type=_type, + docs=docs, + aliases=list(aliases) + ).dict() + self._save_cmds(cmd_list) + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): message = event.get_message() segment = message.pop(0) @@ -239,118 +284,107 @@ class Service: handlers.insert(0, _strip_cmd) commands = set([cmd]) | (aliases or set()) - _load_service_config(str(cmd), docs) - return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs) + return self.on_message(rule=command(*commands) & rule, handlers=handlers, **kwargs) - @classmethod def on_keyword( - cls, + self, keywords: Set[str], - docs: Optional[str] = None, + docs: str, rule: Optional[Union[Rule, T_RuleChecker]] = None, **kwargs, ) -> Type[Matcher]: - _load_service_config(list(keywords)[0], docs) - return cls.on_message(keyword(*keywords) & rule, **kwargs) + if not rule: + rule = self.rule + + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "keyword" + str(a) + if _type not in cmd_list: + break + else: + a += 1 + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + + return self.on_message(rule=keyword(*keywords) & rule, **kwargs) - @classmethod def on_regex( - cls, + self, pattern: str, + docs: str, flags: Union[int, re.RegexFlag] = 0, rule: Optional[Union[Rule, T_RuleChecker]] = None, **kwargs, ) -> Type[Matcher]: - return on_message(regex(pattern, flags) & rule, **kwargs) - - class NetworkPost: - URL = f"http://{NetworkPost.host}:" f"{NetworkPost.port}/" - - @classmethod - async def send_private_msg( - cls, user_id: int, message: str, auto_escape: bool = False - ) -> Dict[str, Any]: - url = cls.URL + "send_private_msg?" - params = { - "user_id": user_id, - "message": message, - "auto_escape": f"{auto_escape}", - } - result = json.loads(await post_bytes(url, params)) - log.debug(result) - return result - - @classmethod - def send_group_msg( - cls, group_id: int, message: Union[str], auto_escape: Optional[bool] = ... - ) -> Dict[str, Any]: - ... - - @classmethod - async def send_msg( - cls, - message_type: Optional[str] = "", - user_id: Optional[int] = None, - group_id: Optional[int] = None, - message=Union[str], - auto_escape: bool = False, - ) -> Dict[str, Any]: - url = cls.URL + "send_msg?" - params = { - "message_type": "", - "user_id": user_id, - "group_id": group_id, - "message": message, - "auto_escape": str(auto_escape), - } - result = json.loads(await post_bytes(url, params)) - log.debug(result) - return result + _type = "regex" + if not rule: + rule = self.rule + + cmd_list = self._load_cmds() + cmd_list[pattern] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + + return self.on_message(rule=regex(pattern, flags) & rule, **kwargs) + + +class ServiceTools(object): + + @staticmethod + def save_service(service_data: dict, service: str) -> None: + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + raise ReadFileError( + f"Can't find service: ({service}) file.\n" + "Please delete all file in data/service/services.\n" + "Next reboot bot." + ) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(service_data, indent=4)) + + @staticmethod + def load_service(service: str) -> dict: + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + raise ReadFileError( + f"Can't find service: ({service}) file.\n" + "Please delete all file in data/service/services.\n" + "Next reboot bot." + ) + + with open(path, "r", encoding="utf-8") as r: + data = json.loads(r.read()) + return data - class Dormant: - @staticmethod - def is_dormant() -> bool: - return False if is_sleep else True + @classmethod + def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool: + data = cls.load_service(service) + + auth_global = data.get("enabled", True) + auth_user = data.get("disable_user", list()) + auth_group = data.get("disable_group", list()) + + if user_id: + if user_id in auth_user: + return False - @staticmethod - def control_dormant(is_enable: bool) -> None: - global is_sleep - if is_enable: - is_sleep = True + if group_id: + if group_id in auth_group: + return False else: - is_sleep = False - - class BlockSystem: - file_name = "ban.json" - path = SERVICE_DIR / file_name - - @staticmethod - def auth_user(user: str) -> bool: - return False if user in _load_block_list()["user"] else True - - @staticmethod - def auth_group(group: str) -> bool: - return False if group in _load_block_list()["group"] else True - - @staticmethod - def control_list(is_enabled: bool, user: str = None, group: str = None) -> None: - data = _load_block_list() - if user: - if is_enabled: - data["user"][user] = str(datetime.now()) - log.info( - f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m" - ) - else: - del data["user"][str(user)] - log.info(f"\033[33mUser {user} has been unblock.\033[33m") - elif group: - if is_enabled: - data["group"][group] = str(datetime.now()) - log.info( - f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m" - ) - else: - del data["group"][str(group)] - log.info(f"\033[33mGroup {group} has been unblock.\033[33m") - _save_block_list(data) + return True + + if not auth_global: + return False + else: + return True |