diff options
-rw-r--r-- | ATRI/service.py | 202 |
1 files changed, 137 insertions, 65 deletions
diff --git a/ATRI/service.py b/ATRI/service.py index 53338ba..0fec819 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -15,6 +15,8 @@ from nonebot.typing import ( T_PermissionChecker, ) from nonebot.rule import Rule, command, keyword, regex +from nonebot.adapters import Bot, Event +from nonebot.adapters.onebot.v11 import PrivateMessageEvent, GroupMessageEvent from ATRI.exceptions import ReadFileError, WriteFileError @@ -41,8 +43,8 @@ class CommandInfo(BaseModel): class Service: """ - 集成一套服务管理,对功能信息进行持久化 - 服务文件结构: + 集成一套服务管理, 对功能信息持久化 + 服务文件结构: { "service": "Service name", "docs": "Main helps and commands", @@ -60,38 +62,85 @@ class Service: } """ - def __init__( - self, - service: str, - docs: str, - only_admin: bool = False, - rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - state: Optional[T_State] = None, - main_cmd: str = str(), - ): + def __init__(self, service: str): + """初始化一个服务""" + + super().__init__() + if not service: + return + self.service = service - self.docs = docs - self.only_admin = only_admin - self.rule = rule - self.permission = permission - self.handlers = handlers - self.temp = temp - self.priority = priority - self.state = state - self.main_cmd = (main_cmd,) - - def _generate_service_config(self, service: str, docs: str = str()) -> None: + self._only_admin = False + self._rule = is_in_service(service) + self._permission = None + self._handlers = None + self._temp = False + self._priority = 1 + self._state = None + self._main_cmd = (str(),) + + def document(self, context: str) -> "Service": + """为服务添加说明""" + + self.docs = context + return self + + def only_admin(self, _is: bool) -> "Service": + """标记服务仅管理员可用""" + + self._only_admin = _is + return self + + def rule(self, rule: Optional[Union[Rule, T_RuleChecker]]) -> "Service": + """为服务添加触发判定""" + + self._rule = self._rule & rule + return self + + def permission(self, perm: Optional[Permission]) -> "Service": + """为服务添加权限判定""" + + self._permission = perm + return self + + def handlers(self, hand: Optional[List[T_Handler]]) -> "Service": + """为服务设置处理函数""" + + self._handlers = hand + return self + + def temp(self, _is: bool) -> "Service": + """设置是否为一次性服务""" + + self._temp = _is + return self + + def priority(self, level: int) -> "Service": + """为服务设置优先级等级""" + + self._priority = level + return self + + def state(self, state: Optional[T_State]) -> "Service": + """为服务设置处理类型""" + + self._state = state + return self + + def main_cmd(self, cmd: str) -> "Service": + """为服务命令设置前缀""" + + self._main_cmd = (cmd,) + return self + + def __generate_service_config(self, service: str, docs: str = str()) -> None: path = SERVICES_DIR / f"{service}.json" data = ServiceInfo( service=service, docs=docs, cmd_list=dict(), enabled=True, - only_admin=self.only_admin, + only_admin=self._only_admin, disable_user=list(), disable_group=list(), ) @@ -107,7 +156,7 @@ class Service: path = SERVICES_DIR / f"{service}.json" if not path.is_file(): - self._generate_service_config(service, self.docs) + self.__generate_service_config(service, self.docs) with open(path, "w", encoding="utf-8") as w: w.write(json.dumps(service_data, indent=4)) @@ -115,27 +164,27 @@ class Service: def load_service(self, service: str) -> dict: path = SERVICES_DIR / f"{service}.json" if not path.is_file(): - self._generate_service_config(service, self.docs) + self.__generate_service_config(service, self.docs) try: data = json.loads(path.read_bytes()) except Exception: with open(path, "w", encoding="utf-8") as w: w.write(json.dumps({})) - self._generate_service_config(service, self.docs) + self.__generate_service_config(service, self.docs) data = json.loads(path.read_bytes()) return data - def _save_cmds(self, cmds: dict) -> None: + def __save_cmds(self, cmds: dict) -> None: data = self.load_service(self.service) temp_data: dict = data["cmd_list"] temp_data.update(cmds) self.save_service(data, self.service) - def _load_cmds(self) -> dict: + def __load_cmds(self) -> dict: path = SERVICES_DIR / f"{self.service}.json" if not path.is_file(): - self._generate_service_config(self.service, self.docs) + self.__generate_service_config(self.service, self.docs) data = json.loads(path.read_bytes()) return data["cmd_list"] @@ -152,30 +201,30 @@ class Service: state: Optional[T_State] = None, ) -> Type[Matcher]: if not rule: - rule = self.rule + rule = self._rule if not permission: - permission = self.permission + permission = self._permission if not handlers: - handlers = self.handlers + handlers = self._handlers if not state: - state = self.state + state = self._state if name: - cmd_list = self._load_cmds() + cmd_list = self.__load_cmds() name = name + "-onmsg" cmd_list[name] = CommandInfo( type="message", docs=docs, aliases=list() ).dict() - self._save_cmds(cmd_list) + self.__save_cmds(cmd_list) matcher = Matcher.new( "message", Rule() & rule, Permission() | permission, module=ModuleType(self.service), - temp=self.temp, + temp=self._temp, priority=priority, block=block, handlers=handlers, @@ -184,44 +233,44 @@ class Service: return matcher def on_notice(self, name: str, docs: str, block: bool = True) -> Type[Matcher]: - cmd_list = self._load_cmds() + cmd_list = self.__load_cmds() name = name + "-onntc" cmd_list[name] = CommandInfo(type="notice", docs=docs, aliases=list()).dict() - self._save_cmds(cmd_list) + self.__save_cmds(cmd_list) matcher = Matcher.new( "notice", - Rule() & self.rule, + Rule() & self._rule, Permission(), module=ModuleType(self.service), - temp=self.temp, - priority=self.priority, + temp=self._temp, + priority=self._priority, block=block, - handlers=self.handlers, - default_state=self.state, + handlers=self._handlers, + default_state=self._state, ) return matcher def on_request(self, name: str, docs: str, block: bool = True) -> Type[Matcher]: - cmd_list = self._load_cmds() + cmd_list = self.__load_cmds() name = name + "-onreq" cmd_list[name] = CommandInfo(type="request", docs=docs, aliases=list()).dict() - self._save_cmds(cmd_list) + self.__save_cmds(cmd_list) matcher = Matcher.new( "request", - Rule() & self.rule, + Rule() & self._rule, Permission(), module=ModuleType(self.service), - temp=self.temp, - priority=self.priority, + temp=self._temp, + priority=self._priority, block=block, - handlers=self.handlers, - default_state=self.state, + handlers=self._handlers, + default_state=self._state, ) return matcher @@ -233,9 +282,9 @@ class Service: aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, **kwargs, ) -> Type[Matcher]: - cmd_list = self._load_cmds() + cmd_list = self.__load_cmds() if not rule: - rule = self.rule + rule = self._rule if not aliases: aliases = set() @@ -245,7 +294,7 @@ class Service: cmd_list[cmd] = CommandInfo( type="command", docs=docs, aliases=list(aliases) ).dict() - self._save_cmds(cmd_list) + self.__save_cmds(cmd_list) commands = set([cmd]) | (aliases or set()) return self.on_message(rule=command(*commands) & rule, block=True, **kwargs) @@ -257,14 +306,14 @@ class Service: **kwargs, ) -> Type[Matcher]: if not rule: - rule = self.rule + rule = self._rule name = list(keywords)[0] + "-onkw" - cmd_list = self._load_cmds() + cmd_list = self.__load_cmds() cmd_list[name] = CommandInfo(type="keyword", docs=docs, aliases=keywords).dict() - self._save_cmds(cmd_list) + self.__save_cmds(cmd_list) return self.on_message(rule=keyword(*keywords) & rule, **kwargs) @@ -277,17 +326,17 @@ class Service: **kwargs, ) -> Type[Matcher]: if not rule: - rule = self.rule + rule = self._rule - cmd_list = self._load_cmds() + cmd_list = self.__load_cmds() cmd_list[pattern] = CommandInfo(type="regex", docs=docs, aliases=list()).dict() - self._save_cmds(cmd_list) + self.__save_cmds(cmd_list) return self.on_message(rule=regex(pattern, flags) & rule, **kwargs) def cmd_as_group(self, cmd: str, docs: str, **kwargs) -> Type[Matcher]: sub_cmd = (cmd,) if isinstance(cmd, str) else cmd - _cmd = self.main_cmd + sub_cmd + _cmd = self._main_cmd + sub_cmd if "aliases" in kwargs: del kwargs["aliases"] @@ -295,7 +344,9 @@ class Service: return self.on_command(_cmd, docs, **kwargs) -class ServiceTools(object): +class ServiceTools: + """针对服务的工具类""" + @staticmethod def save_service(service_data: dict, service: str): path = SERVICES_DIR / f"{service}.json" @@ -345,3 +396,24 @@ class ServiceTools(object): data = cls.load_service(service) data["enabled"] = is_enabled cls.save_service(data, service) + + +def is_in_service(service: str) -> Rule: + async def _is_in_service(bot: Bot, event: Event) -> bool: + result = ServiceTools().auth_service(service) + if not result: + return False + + if isinstance(event, PrivateMessageEvent): + user_id = event.get_user_id() + result = ServiceTools().auth_service(service, user_id) + return result + elif isinstance(event, GroupMessageEvent): + user_id = event.get_user_id() + group_id = str(event.group_id) + result = ServiceTools().auth_service(service, user_id, group_id) + return result + else: + return True + + return Rule(_is_in_service) |