diff options
Diffstat (limited to 'ATRI/service.py')
-rw-r--r-- | ATRI/service.py | 145 |
1 files changed, 82 insertions, 63 deletions
diff --git a/ATRI/service.py b/ATRI/service.py index 378f2ca..cc3243b 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -16,7 +16,7 @@ if TYPE_CHECKING: from nonebot.adapters import Bot, Event -SERVICE_DIR = Path(".") / "ATRI" / "data" / "service" +SERVICE_DIR = Path(".") / "data" / "service" SERVICES_DIR = SERVICE_DIR / "services" os.makedirs(SERVICE_DIR, exist_ok=True) os.makedirs(SERVICES_DIR, exist_ok=True) @@ -58,19 +58,17 @@ class Service: "disable_group": [] } """ - - def __init__( - self, - service: str, - docs: str = None, - only_admin: bool = False, - rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - state: Optional[T_State] = None, - ): + + def __init__(self, + service: str, + docs: str = None, + only_admin: bool = False, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + state: Optional[T_State] = None): self.service = service self.docs = docs self.only_admin = only_admin @@ -86,7 +84,7 @@ class Service: service = self.service if not docs: docs = self.docs or str() - + path = SERVICES_DIR / f"{service}.json" data = ServiceInfo( service=service, @@ -95,33 +93,33 @@ class Service: enabled=True, only_admin=self.only_admin, disable_user=list(), - disable_group=list(), + disable_group=list() ) try: with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(data.dict(), indent=4)) except WriteError: raise WriteError("Write service info failed!") - + def save_service(self, service_data: dict, service: str = None) -> None: if not service: service = self.service - + path = SERVICES_DIR / f"{service}.json" if not path.is_file(): self._generate_service_config() with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(service_data, indent=4)) - + def load_service(self, service: str = None) -> dict: if not service: service = self.service - + path = SERVICES_DIR / f"{service}.json" if not path.is_file(): self._generate_service_config() - + try: data = json.loads(path.read_bytes()) except ReadFileError: @@ -130,31 +128,29 @@ class Service: self._generate_service_config() data = json.loads(path.read_bytes()) return data - + def _save_cmds(self, cmds: dict) -> None: data = self.load_service(self.service) temp_data: dict = data["cmd_list"] temp_data.update(cmds) self.save_service(data) - + def _load_cmds(self) -> dict: path = SERVICES_DIR / f"{self.service}.json" if not path.is_file(): self._generate_service_config() - + data = json.loads(path.read_bytes()) return data["cmd_list"] - def on_message( - self, - docs: str = None, - rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - handlers: Optional[List[T_Handler]] = None, - block: bool = True, - priority: int = None, - state: Optional[T_State] = None, - ) -> Type[Matcher]: + def on_message(self, + docs: str = None, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + handlers: Optional[List[T_Handler]] = None, + block: bool = True, + priority: int = None, + state: Optional[T_State] = None) -> Type[Matcher]: if not rule: rule = self.rule if not permission: @@ -165,7 +161,7 @@ class Service: priority = self.priority if not state: state = self.state - + if docs: a = 0 cmd_list = self._load_cmds() @@ -175,10 +171,14 @@ class Service: break else: a += 1 - - cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict() + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() self._save_cmds(cmd_list) - + matcher = Matcher.new( "message", Rule() & rule, @@ -200,10 +200,14 @@ class Service: break else: a += 1 - - cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict() + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() self._save_cmds(cmd_list) - + matcher = Matcher.new( "notice", Rule() & self.rule, @@ -225,10 +229,14 @@ class Service: break else: a += 1 - - cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict() + + cmd_list[_type] =CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() self._save_cmds(cmd_list) - + matcher = Matcher.new( "request", Rule() & self.rule, @@ -255,10 +263,14 @@ class Service: rule = self.rule if not aliases: aliases = set() - - cmd_list[cmd] = CommandInfo(type=_type, docs=docs, aliases=list(aliases)).dict() + + cmd_list[cmd] = CommandInfo( + type=_type, + docs=docs, + aliases=list(aliases) + ).dict() self._save_cmds(cmd_list) - + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): message = event.get_message() segment = message.pop(0) @@ -272,9 +284,7 @@ class Service: handlers.insert(0, _strip_cmd) commands = set([cmd]) | (aliases or set()) - return self.on_message( - rule=command(*commands) & rule, handlers=handlers, **kwargs - ) + return self.on_message(rule=command(*commands) & rule, handlers=handlers, **kwargs) def on_keyword( self, @@ -285,7 +295,7 @@ class Service: ) -> Type[Matcher]: if not rule: rule = self.rule - + a = 0 cmd_list = self._load_cmds() while True: @@ -294,10 +304,14 @@ class Service: break else: a += 1 - - cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict() + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() self._save_cmds(cmd_list) - + return self.on_message(rule=keyword(*keywords) & rule, **kwargs) def on_regex( @@ -311,15 +325,20 @@ class Service: _type = "regex" if not rule: rule = self.rule - + cmd_list = self._load_cmds() - cmd_list[pattern] = CommandInfo(type=_type, docs=docs, aliases=list()).dict() + cmd_list[pattern] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() self._save_cmds(cmd_list) - + return self.on_message(rule=regex(pattern, flags) & rule, **kwargs) class ServiceTools(object): + @staticmethod def save_service(service_data: dict, service: str) -> None: path = SERVICES_DIR / f"{service}.json" @@ -329,10 +348,10 @@ class ServiceTools(object): "Please delete all file in data/service/services.\n" "Next reboot bot." ) - + with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(service_data, indent=4)) - + @staticmethod def load_service(service: str) -> dict: path = SERVICES_DIR / f"{service}.json" @@ -342,7 +361,7 @@ class ServiceTools(object): "Please delete all file in data/service/services.\n" "Next reboot bot." ) - + with open(path, "r", encoding="utf-8") as r: data = json.loads(r.read()) return data @@ -350,11 +369,11 @@ class ServiceTools(object): @classmethod def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool: data = cls.load_service(service) - + auth_global = data.get("enabled", True) auth_user = data.get("disable_user", list()) auth_group = data.get("disable_group", list()) - + if user_id: if user_id in auth_user: return False @@ -364,7 +383,7 @@ class ServiceTools(object): return False else: return True - + if not auth_global: return False else: |