diff options
| -rw-r--r-- | ATRI/service.py | 202 | 
1 files changed, 137 insertions, 65 deletions
| diff --git a/ATRI/service.py b/ATRI/service.py index 53338ba..0fec819 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -15,6 +15,8 @@ from nonebot.typing import (      T_PermissionChecker,  )  from nonebot.rule import Rule, command, keyword, regex +from nonebot.adapters import Bot, Event +from nonebot.adapters.onebot.v11 import PrivateMessageEvent, GroupMessageEvent  from ATRI.exceptions import ReadFileError, WriteFileError @@ -41,8 +43,8 @@ class CommandInfo(BaseModel):  class Service:      """ -    集成一套服务管理,对功能信息进行持久化 -    服务文件结构: +    集成一套服务管理, 对功能信息持久化 +    服务文件结构:      {          "service": "Service name",          "docs": "Main helps and commands", @@ -60,38 +62,85 @@ class Service:      }      """ -    def __init__( -        self, -        service: str, -        docs: str, -        only_admin: bool = False, -        rule: Optional[Union[Rule, T_RuleChecker]] = None, -        permission: Optional[Permission] = None, -        handlers: Optional[List[T_Handler]] = None, -        temp: bool = False, -        priority: int = 1, -        state: Optional[T_State] = None, -        main_cmd: str = str(), -    ): +    def __init__(self, service: str): +        """初始化一个服务""" + +        super().__init__() +        if not service: +            return +          self.service = service -        self.docs = docs -        self.only_admin = only_admin -        self.rule = rule -        self.permission = permission -        self.handlers = handlers -        self.temp = temp -        self.priority = priority -        self.state = state -        self.main_cmd = (main_cmd,) - -    def _generate_service_config(self, service: str, docs: str = str()) -> None: +        self._only_admin = False +        self._rule = is_in_service(service) +        self._permission = None +        self._handlers = None +        self._temp = False +        self._priority = 1 +        self._state = None +        self._main_cmd = (str(),) + +    def document(self, context: str) -> "Service": +        """为服务添加说明""" + +        self.docs = context +        return self + +    def only_admin(self, _is: bool) -> "Service": +        """标记服务仅管理员可用""" + +        self._only_admin = _is +        return self + +    def rule(self, rule: Optional[Union[Rule, T_RuleChecker]]) -> "Service": +        """为服务添加触发判定""" + +        self._rule = self._rule & rule +        return self + +    def permission(self, perm: Optional[Permission]) -> "Service": +        """为服务添加权限判定""" + +        self._permission = perm +        return self + +    def handlers(self, hand: Optional[List[T_Handler]]) -> "Service": +        """为服务设置处理函数""" + +        self._handlers = hand +        return self + +    def temp(self, _is: bool) -> "Service": +        """设置是否为一次性服务""" + +        self._temp = _is +        return self + +    def priority(self, level: int) -> "Service": +        """为服务设置优先级等级""" + +        self._priority = level +        return self + +    def state(self, state: Optional[T_State]) -> "Service": +        """为服务设置处理类型""" + +        self._state = state +        return self + +    def main_cmd(self, cmd: str) -> "Service": +        """为服务命令设置前缀""" + +        self._main_cmd = (cmd,) +        return self + +    def __generate_service_config(self, service: str, docs: str = str()) -> None:          path = SERVICES_DIR / f"{service}.json"          data = ServiceInfo(              service=service,              docs=docs,              cmd_list=dict(),              enabled=True, -            only_admin=self.only_admin, +            only_admin=self._only_admin,              disable_user=list(),              disable_group=list(),          ) @@ -107,7 +156,7 @@ class Service:          path = SERVICES_DIR / f"{service}.json"          if not path.is_file(): -            self._generate_service_config(service, self.docs) +            self.__generate_service_config(service, self.docs)          with open(path, "w", encoding="utf-8") as w:              w.write(json.dumps(service_data, indent=4)) @@ -115,27 +164,27 @@ class Service:      def load_service(self, service: str) -> dict:          path = SERVICES_DIR / f"{service}.json"          if not path.is_file(): -            self._generate_service_config(service, self.docs) +            self.__generate_service_config(service, self.docs)          try:              data = json.loads(path.read_bytes())          except Exception:              with open(path, "w", encoding="utf-8") as w:                  w.write(json.dumps({})) -            self._generate_service_config(service, self.docs) +            self.__generate_service_config(service, self.docs)              data = json.loads(path.read_bytes())          return data -    def _save_cmds(self, cmds: dict) -> None: +    def __save_cmds(self, cmds: dict) -> None:          data = self.load_service(self.service)          temp_data: dict = data["cmd_list"]          temp_data.update(cmds)          self.save_service(data, self.service) -    def _load_cmds(self) -> dict: +    def __load_cmds(self) -> dict:          path = SERVICES_DIR / f"{self.service}.json"          if not path.is_file(): -            self._generate_service_config(self.service, self.docs) +            self.__generate_service_config(self.service, self.docs)          data = json.loads(path.read_bytes())          return data["cmd_list"] @@ -152,30 +201,30 @@ class Service:          state: Optional[T_State] = None,      ) -> Type[Matcher]:          if not rule: -            rule = self.rule +            rule = self._rule          if not permission: -            permission = self.permission +            permission = self._permission          if not handlers: -            handlers = self.handlers +            handlers = self._handlers          if not state: -            state = self.state +            state = self._state          if name: -            cmd_list = self._load_cmds() +            cmd_list = self.__load_cmds()              name = name + "-onmsg"              cmd_list[name] = CommandInfo(                  type="message", docs=docs, aliases=list()              ).dict() -            self._save_cmds(cmd_list) +            self.__save_cmds(cmd_list)          matcher = Matcher.new(              "message",              Rule() & rule,              Permission() | permission,              module=ModuleType(self.service), -            temp=self.temp, +            temp=self._temp,              priority=priority,              block=block,              handlers=handlers, @@ -184,44 +233,44 @@ class Service:          return matcher      def on_notice(self, name: str, docs: str, block: bool = True) -> Type[Matcher]: -        cmd_list = self._load_cmds() +        cmd_list = self.__load_cmds()          name = name + "-onntc"          cmd_list[name] = CommandInfo(type="notice", docs=docs, aliases=list()).dict() -        self._save_cmds(cmd_list) +        self.__save_cmds(cmd_list)          matcher = Matcher.new(              "notice", -            Rule() & self.rule, +            Rule() & self._rule,              Permission(),              module=ModuleType(self.service), -            temp=self.temp, -            priority=self.priority, +            temp=self._temp, +            priority=self._priority,              block=block, -            handlers=self.handlers, -            default_state=self.state, +            handlers=self._handlers, +            default_state=self._state,          )          return matcher      def on_request(self, name: str, docs: str, block: bool = True) -> Type[Matcher]: -        cmd_list = self._load_cmds() +        cmd_list = self.__load_cmds()          name = name + "-onreq"          cmd_list[name] = CommandInfo(type="request", docs=docs, aliases=list()).dict() -        self._save_cmds(cmd_list) +        self.__save_cmds(cmd_list)          matcher = Matcher.new(              "request", -            Rule() & self.rule, +            Rule() & self._rule,              Permission(),              module=ModuleType(self.service), -            temp=self.temp, -            priority=self.priority, +            temp=self._temp, +            priority=self._priority,              block=block, -            handlers=self.handlers, -            default_state=self.state, +            handlers=self._handlers, +            default_state=self._state,          )          return matcher @@ -233,9 +282,9 @@ class Service:          aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,          **kwargs,      ) -> Type[Matcher]: -        cmd_list = self._load_cmds() +        cmd_list = self.__load_cmds()          if not rule: -            rule = self.rule +            rule = self._rule          if not aliases:              aliases = set() @@ -245,7 +294,7 @@ class Service:          cmd_list[cmd] = CommandInfo(              type="command", docs=docs, aliases=list(aliases)          ).dict() -        self._save_cmds(cmd_list) +        self.__save_cmds(cmd_list)          commands = set([cmd]) | (aliases or set())          return self.on_message(rule=command(*commands) & rule, block=True, **kwargs) @@ -257,14 +306,14 @@ class Service:          **kwargs,      ) -> Type[Matcher]:          if not rule: -            rule = self.rule +            rule = self._rule          name = list(keywords)[0] + "-onkw" -        cmd_list = self._load_cmds() +        cmd_list = self.__load_cmds()          cmd_list[name] = CommandInfo(type="keyword", docs=docs, aliases=keywords).dict() -        self._save_cmds(cmd_list) +        self.__save_cmds(cmd_list)          return self.on_message(rule=keyword(*keywords) & rule, **kwargs) @@ -277,17 +326,17 @@ class Service:          **kwargs,      ) -> Type[Matcher]:          if not rule: -            rule = self.rule +            rule = self._rule -        cmd_list = self._load_cmds() +        cmd_list = self.__load_cmds()          cmd_list[pattern] = CommandInfo(type="regex", docs=docs, aliases=list()).dict() -        self._save_cmds(cmd_list) +        self.__save_cmds(cmd_list)          return self.on_message(rule=regex(pattern, flags) & rule, **kwargs)      def cmd_as_group(self, cmd: str, docs: str, **kwargs) -> Type[Matcher]:          sub_cmd = (cmd,) if isinstance(cmd, str) else cmd -        _cmd = self.main_cmd + sub_cmd +        _cmd = self._main_cmd + sub_cmd          if "aliases" in kwargs:              del kwargs["aliases"] @@ -295,7 +344,9 @@ class Service:          return self.on_command(_cmd, docs, **kwargs) -class ServiceTools(object): +class ServiceTools: +    """针对服务的工具类""" +      @staticmethod      def save_service(service_data: dict, service: str):          path = SERVICES_DIR / f"{service}.json" @@ -345,3 +396,24 @@ class ServiceTools(object):          data = cls.load_service(service)          data["enabled"] = is_enabled          cls.save_service(data, service) + + +def is_in_service(service: str) -> Rule: +    async def _is_in_service(bot: Bot, event: Event) -> bool: +        result = ServiceTools().auth_service(service) +        if not result: +            return False + +        if isinstance(event, PrivateMessageEvent): +            user_id = event.get_user_id() +            result = ServiceTools().auth_service(service, user_id) +            return result +        elif isinstance(event, GroupMessageEvent): +            user_id = event.get_user_id() +            group_id = str(event.group_id) +            result = ServiceTools().auth_service(service, user_id, group_id) +            return result +        else: +            return True + +    return Rule(_is_in_service) | 
