summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ATRI/service.py202
1 files changed, 137 insertions, 65 deletions
diff --git a/ATRI/service.py b/ATRI/service.py
index 53338ba..0fec819 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -15,6 +15,8 @@ from nonebot.typing import (
T_PermissionChecker,
)
from nonebot.rule import Rule, command, keyword, regex
+from nonebot.adapters import Bot, Event
+from nonebot.adapters.onebot.v11 import PrivateMessageEvent, GroupMessageEvent
from ATRI.exceptions import ReadFileError, WriteFileError
@@ -41,8 +43,8 @@ class CommandInfo(BaseModel):
class Service:
"""
- 集成一套服务管理,对功能信息进行持久化
- 服务文件结构:
+ 集成一套服务管理, 对功能信息持久化
+ 服务文件结构:
{
"service": "Service name",
"docs": "Main helps and commands",
@@ -60,38 +62,85 @@ class Service:
}
"""
- def __init__(
- self,
- service: str,
- docs: str,
- only_admin: bool = False,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- state: Optional[T_State] = None,
- main_cmd: str = str(),
- ):
+ def __init__(self, service: str):
+ """初始化一个服务"""
+
+ super().__init__()
+ if not service:
+ return
+
self.service = service
- self.docs = docs
- self.only_admin = only_admin
- self.rule = rule
- self.permission = permission
- self.handlers = handlers
- self.temp = temp
- self.priority = priority
- self.state = state
- self.main_cmd = (main_cmd,)
-
- def _generate_service_config(self, service: str, docs: str = str()) -> None:
+ self._only_admin = False
+ self._rule = is_in_service(service)
+ self._permission = None
+ self._handlers = None
+ self._temp = False
+ self._priority = 1
+ self._state = None
+ self._main_cmd = (str(),)
+
+ def document(self, context: str) -> "Service":
+ """为服务添加说明"""
+
+ self.docs = context
+ return self
+
+ def only_admin(self, _is: bool) -> "Service":
+ """标记服务仅管理员可用"""
+
+ self._only_admin = _is
+ return self
+
+ def rule(self, rule: Optional[Union[Rule, T_RuleChecker]]) -> "Service":
+ """为服务添加触发判定"""
+
+ self._rule = self._rule & rule
+ return self
+
+ def permission(self, perm: Optional[Permission]) -> "Service":
+ """为服务添加权限判定"""
+
+ self._permission = perm
+ return self
+
+ def handlers(self, hand: Optional[List[T_Handler]]) -> "Service":
+ """为服务设置处理函数"""
+
+ self._handlers = hand
+ return self
+
+ def temp(self, _is: bool) -> "Service":
+ """设置是否为一次性服务"""
+
+ self._temp = _is
+ return self
+
+ def priority(self, level: int) -> "Service":
+ """为服务设置优先级等级"""
+
+ self._priority = level
+ return self
+
+ def state(self, state: Optional[T_State]) -> "Service":
+ """为服务设置处理类型"""
+
+ self._state = state
+ return self
+
+ def main_cmd(self, cmd: str) -> "Service":
+ """为服务命令设置前缀"""
+
+ self._main_cmd = (cmd,)
+ return self
+
+ def __generate_service_config(self, service: str, docs: str = str()) -> None:
path = SERVICES_DIR / f"{service}.json"
data = ServiceInfo(
service=service,
docs=docs,
cmd_list=dict(),
enabled=True,
- only_admin=self.only_admin,
+ only_admin=self._only_admin,
disable_user=list(),
disable_group=list(),
)
@@ -107,7 +156,7 @@ class Service:
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
- self._generate_service_config(service, self.docs)
+ self.__generate_service_config(service, self.docs)
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(service_data, indent=4))
@@ -115,27 +164,27 @@ class Service:
def load_service(self, service: str) -> dict:
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
- self._generate_service_config(service, self.docs)
+ self.__generate_service_config(service, self.docs)
try:
data = json.loads(path.read_bytes())
except Exception:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
- self._generate_service_config(service, self.docs)
+ self.__generate_service_config(service, self.docs)
data = json.loads(path.read_bytes())
return data
- def _save_cmds(self, cmds: dict) -> None:
+ def __save_cmds(self, cmds: dict) -> None:
data = self.load_service(self.service)
temp_data: dict = data["cmd_list"]
temp_data.update(cmds)
self.save_service(data, self.service)
- def _load_cmds(self) -> dict:
+ def __load_cmds(self) -> dict:
path = SERVICES_DIR / f"{self.service}.json"
if not path.is_file():
- self._generate_service_config(self.service, self.docs)
+ self.__generate_service_config(self.service, self.docs)
data = json.loads(path.read_bytes())
return data["cmd_list"]
@@ -152,30 +201,30 @@ class Service:
state: Optional[T_State] = None,
) -> Type[Matcher]:
if not rule:
- rule = self.rule
+ rule = self._rule
if not permission:
- permission = self.permission
+ permission = self._permission
if not handlers:
- handlers = self.handlers
+ handlers = self._handlers
if not state:
- state = self.state
+ state = self._state
if name:
- cmd_list = self._load_cmds()
+ cmd_list = self.__load_cmds()
name = name + "-onmsg"
cmd_list[name] = CommandInfo(
type="message", docs=docs, aliases=list()
).dict()
- self._save_cmds(cmd_list)
+ self.__save_cmds(cmd_list)
matcher = Matcher.new(
"message",
Rule() & rule,
Permission() | permission,
module=ModuleType(self.service),
- temp=self.temp,
+ temp=self._temp,
priority=priority,
block=block,
handlers=handlers,
@@ -184,44 +233,44 @@ class Service:
return matcher
def on_notice(self, name: str, docs: str, block: bool = True) -> Type[Matcher]:
- cmd_list = self._load_cmds()
+ cmd_list = self.__load_cmds()
name = name + "-onntc"
cmd_list[name] = CommandInfo(type="notice", docs=docs, aliases=list()).dict()
- self._save_cmds(cmd_list)
+ self.__save_cmds(cmd_list)
matcher = Matcher.new(
"notice",
- Rule() & self.rule,
+ Rule() & self._rule,
Permission(),
module=ModuleType(self.service),
- temp=self.temp,
- priority=self.priority,
+ temp=self._temp,
+ priority=self._priority,
block=block,
- handlers=self.handlers,
- default_state=self.state,
+ handlers=self._handlers,
+ default_state=self._state,
)
return matcher
def on_request(self, name: str, docs: str, block: bool = True) -> Type[Matcher]:
- cmd_list = self._load_cmds()
+ cmd_list = self.__load_cmds()
name = name + "-onreq"
cmd_list[name] = CommandInfo(type="request", docs=docs, aliases=list()).dict()
- self._save_cmds(cmd_list)
+ self.__save_cmds(cmd_list)
matcher = Matcher.new(
"request",
- Rule() & self.rule,
+ Rule() & self._rule,
Permission(),
module=ModuleType(self.service),
- temp=self.temp,
- priority=self.priority,
+ temp=self._temp,
+ priority=self._priority,
block=block,
- handlers=self.handlers,
- default_state=self.state,
+ handlers=self._handlers,
+ default_state=self._state,
)
return matcher
@@ -233,9 +282,9 @@ class Service:
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
**kwargs,
) -> Type[Matcher]:
- cmd_list = self._load_cmds()
+ cmd_list = self.__load_cmds()
if not rule:
- rule = self.rule
+ rule = self._rule
if not aliases:
aliases = set()
@@ -245,7 +294,7 @@ class Service:
cmd_list[cmd] = CommandInfo(
type="command", docs=docs, aliases=list(aliases)
).dict()
- self._save_cmds(cmd_list)
+ self.__save_cmds(cmd_list)
commands = set([cmd]) | (aliases or set())
return self.on_message(rule=command(*commands) & rule, block=True, **kwargs)
@@ -257,14 +306,14 @@ class Service:
**kwargs,
) -> Type[Matcher]:
if not rule:
- rule = self.rule
+ rule = self._rule
name = list(keywords)[0] + "-onkw"
- cmd_list = self._load_cmds()
+ cmd_list = self.__load_cmds()
cmd_list[name] = CommandInfo(type="keyword", docs=docs, aliases=keywords).dict()
- self._save_cmds(cmd_list)
+ self.__save_cmds(cmd_list)
return self.on_message(rule=keyword(*keywords) & rule, **kwargs)
@@ -277,17 +326,17 @@ class Service:
**kwargs,
) -> Type[Matcher]:
if not rule:
- rule = self.rule
+ rule = self._rule
- cmd_list = self._load_cmds()
+ cmd_list = self.__load_cmds()
cmd_list[pattern] = CommandInfo(type="regex", docs=docs, aliases=list()).dict()
- self._save_cmds(cmd_list)
+ self.__save_cmds(cmd_list)
return self.on_message(rule=regex(pattern, flags) & rule, **kwargs)
def cmd_as_group(self, cmd: str, docs: str, **kwargs) -> Type[Matcher]:
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
- _cmd = self.main_cmd + sub_cmd
+ _cmd = self._main_cmd + sub_cmd
if "aliases" in kwargs:
del kwargs["aliases"]
@@ -295,7 +344,9 @@ class Service:
return self.on_command(_cmd, docs, **kwargs)
-class ServiceTools(object):
+class ServiceTools:
+ """针对服务的工具类"""
+
@staticmethod
def save_service(service_data: dict, service: str):
path = SERVICES_DIR / f"{service}.json"
@@ -345,3 +396,24 @@ class ServiceTools(object):
data = cls.load_service(service)
data["enabled"] = is_enabled
cls.save_service(data, service)
+
+
+def is_in_service(service: str) -> Rule:
+ async def _is_in_service(bot: Bot, event: Event) -> bool:
+ result = ServiceTools().auth_service(service)
+ if not result:
+ return False
+
+ if isinstance(event, PrivateMessageEvent):
+ user_id = event.get_user_id()
+ result = ServiceTools().auth_service(service, user_id)
+ return result
+ elif isinstance(event, GroupMessageEvent):
+ user_id = event.get_user_id()
+ group_id = str(event.group_id)
+ result = ServiceTools().auth_service(service, user_id, group_id)
+ return result
+ else:
+ return True
+
+ return Rule(_is_in_service)