summaryrefslogtreecommitdiff
path: root/ATRI/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/service.py')
-rw-r--r--ATRI/service.py145
1 files changed, 82 insertions, 63 deletions
diff --git a/ATRI/service.py b/ATRI/service.py
index 378f2ca..cc3243b 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -16,7 +16,7 @@ if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
-SERVICE_DIR = Path(".") / "ATRI" / "data" / "service"
+SERVICE_DIR = Path(".") / "data" / "service"
SERVICES_DIR = SERVICE_DIR / "services"
os.makedirs(SERVICE_DIR, exist_ok=True)
os.makedirs(SERVICES_DIR, exist_ok=True)
@@ -58,19 +58,17 @@ class Service:
"disable_group": []
}
"""
-
- def __init__(
- self,
- service: str,
- docs: str = None,
- only_admin: bool = False,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- state: Optional[T_State] = None,
- ):
+
+ def __init__(self,
+ service: str,
+ docs: str = None,
+ only_admin: bool = False,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ state: Optional[T_State] = None):
self.service = service
self.docs = docs
self.only_admin = only_admin
@@ -86,7 +84,7 @@ class Service:
service = self.service
if not docs:
docs = self.docs or str()
-
+
path = SERVICES_DIR / f"{service}.json"
data = ServiceInfo(
service=service,
@@ -95,33 +93,33 @@ class Service:
enabled=True,
only_admin=self.only_admin,
disable_user=list(),
- disable_group=list(),
+ disable_group=list()
)
try:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data.dict(), indent=4))
except WriteError:
raise WriteError("Write service info failed!")
-
+
def save_service(self, service_data: dict, service: str = None) -> None:
if not service:
service = self.service
-
+
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
self._generate_service_config()
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(service_data, indent=4))
-
+
def load_service(self, service: str = None) -> dict:
if not service:
service = self.service
-
+
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
self._generate_service_config()
-
+
try:
data = json.loads(path.read_bytes())
except ReadFileError:
@@ -130,31 +128,29 @@ class Service:
self._generate_service_config()
data = json.loads(path.read_bytes())
return data
-
+
def _save_cmds(self, cmds: dict) -> None:
data = self.load_service(self.service)
temp_data: dict = data["cmd_list"]
temp_data.update(cmds)
self.save_service(data)
-
+
def _load_cmds(self) -> dict:
path = SERVICES_DIR / f"{self.service}.json"
if not path.is_file():
self._generate_service_config()
-
+
data = json.loads(path.read_bytes())
return data["cmd_list"]
- def on_message(
- self,
- docs: str = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- handlers: Optional[List[T_Handler]] = None,
- block: bool = True,
- priority: int = None,
- state: Optional[T_State] = None,
- ) -> Type[Matcher]:
+ def on_message(self,
+ docs: str = None,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ handlers: Optional[List[T_Handler]] = None,
+ block: bool = True,
+ priority: int = None,
+ state: Optional[T_State] = None) -> Type[Matcher]:
if not rule:
rule = self.rule
if not permission:
@@ -165,7 +161,7 @@ class Service:
priority = self.priority
if not state:
state = self.state
-
+
if docs:
a = 0
cmd_list = self._load_cmds()
@@ -175,10 +171,14 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
+
+ cmd_list[_type] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
self._save_cmds(cmd_list)
-
+
matcher = Matcher.new(
"message",
Rule() & rule,
@@ -200,10 +200,14 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
+
+ cmd_list[_type] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
self._save_cmds(cmd_list)
-
+
matcher = Matcher.new(
"notice",
Rule() & self.rule,
@@ -225,10 +229,14 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
+
+ cmd_list[_type] =CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
self._save_cmds(cmd_list)
-
+
matcher = Matcher.new(
"request",
Rule() & self.rule,
@@ -255,10 +263,14 @@ class Service:
rule = self.rule
if not aliases:
aliases = set()
-
- cmd_list[cmd] = CommandInfo(type=_type, docs=docs, aliases=list(aliases)).dict()
+
+ cmd_list[cmd] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list(aliases)
+ ).dict()
self._save_cmds(cmd_list)
-
+
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
@@ -272,9 +284,7 @@ class Service:
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
- return self.on_message(
- rule=command(*commands) & rule, handlers=handlers, **kwargs
- )
+ return self.on_message(rule=command(*commands) & rule, handlers=handlers, **kwargs)
def on_keyword(
self,
@@ -285,7 +295,7 @@ class Service:
) -> Type[Matcher]:
if not rule:
rule = self.rule
-
+
a = 0
cmd_list = self._load_cmds()
while True:
@@ -294,10 +304,14 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
+
+ cmd_list[_type] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
self._save_cmds(cmd_list)
-
+
return self.on_message(rule=keyword(*keywords) & rule, **kwargs)
def on_regex(
@@ -311,15 +325,20 @@ class Service:
_type = "regex"
if not rule:
rule = self.rule
-
+
cmd_list = self._load_cmds()
- cmd_list[pattern] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
+ cmd_list[pattern] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
self._save_cmds(cmd_list)
-
+
return self.on_message(rule=regex(pattern, flags) & rule, **kwargs)
class ServiceTools(object):
+
@staticmethod
def save_service(service_data: dict, service: str) -> None:
path = SERVICES_DIR / f"{service}.json"
@@ -329,10 +348,10 @@ class ServiceTools(object):
"Please delete all file in data/service/services.\n"
"Next reboot bot."
)
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(service_data, indent=4))
-
+
@staticmethod
def load_service(service: str) -> dict:
path = SERVICES_DIR / f"{service}.json"
@@ -342,7 +361,7 @@ class ServiceTools(object):
"Please delete all file in data/service/services.\n"
"Next reboot bot."
)
-
+
with open(path, "r", encoding="utf-8") as r:
data = json.loads(r.read())
return data
@@ -350,11 +369,11 @@ class ServiceTools(object):
@classmethod
def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool:
data = cls.load_service(service)
-
+
auth_global = data.get("enabled", True)
auth_user = data.get("disable_user", list())
auth_group = data.get("disable_group", list())
-
+
if user_id:
if user_id in auth_user:
return False
@@ -364,7 +383,7 @@ class ServiceTools(object):
return False
else:
return True
-
+
if not auth_global:
return False
else: