summaryrefslogtreecommitdiff
path: root/ATRI/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/service.py')
-rw-r--r--ATRI/service.py568
1 files changed, 301 insertions, 267 deletions
diff --git a/ATRI/service.py b/ATRI/service.py
index 989179b..4798fd5 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -2,17 +2,15 @@ import os
import re
import json
from pathlib import Path
-from datetime import datetime
-from typing import Dict, Any, List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING
+from pydantic import BaseModel
+from typing import List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING
+
from nonebot.matcher import Matcher
from nonebot.permission import Permission
-from nonebot.plugin import on_message
from nonebot.typing import T_State, T_Handler, T_RuleChecker
from nonebot.rule import Rule, command, keyword, regex
-from .log import logger as log
-from .config import NetworkPost
-from .utils.request import post_bytes
+from ATRI.exceptions import ReadFileError, WriteError
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
@@ -24,148 +22,168 @@ os.makedirs(SERVICE_DIR, exist_ok=True)
os.makedirs(SERVICES_DIR, exist_ok=True)
-is_sleep: bool = False
-matcher_list: list = []
-
-
-def _load_block_list() -> dict:
- file_name = "ban.json"
- file = SERVICE_DIR / file_name
- try:
- data = json.loads(file.read_bytes())
- except:
- data = {"user": {}, "group": {}}
- with open(file, "w") as r:
- r.write(json.dumps(data, indent=4))
- return data
-
-
-def _save_block_list(data: dict) -> None:
- file_name = "ban.json"
- file = SERVICE_DIR / file_name
- with open(file, "w") as r:
- r.write(json.dumps(data, indent=4))
+class ServiceInfo(BaseModel):
+ service: str
+ docs: str
+ cmd_list: dict
+ enabled: bool
+ only_admin: bool
+ disable_user: list
+ disable_group: list
-def _load_service_config(service: str, docs: str = None) -> dict:
- file_name = service.replace("/", "") + ".json"
- file = SERVICES_DIR / file_name
- try:
- data = json.loads(file.read_bytes())
- except:
- service_info = {
- "command": service,
- "docs": docs,
- "enabled": True,
- "disable_user": {},
- "disable_group": {},
- }
- with open(file, "w") as r:
- r.write(json.dumps(service_info, indent=4))
- data = service_info
- return data
-
-
-def _save_service_config(service: str, data: dict) -> None:
- file_name = service.replace("/", "") + ".json"
- file = SERVICES_DIR / file_name
- with open(file, "w") as r:
- r.write(json.dumps(data, indent=4))
+class CommandInfo(BaseModel):
+ type: str
+ docs: str
+ aliases: list
class Service:
"""
集成一套服务管理,对功能信息进行持久化
- 计划搭配前端使用
+ 服务文件结构:
+ {
+ "service": "Service name",
+ "docs": "Main helps and commands",
+ "cmd_list": {
+ "/cmd0": {
+ "type": "Command type",
+ "docs": "Command help",
+ "aliases": ["More trigger ways."]
+ }
+ },
+ "enabled": True,
+ "only_admin": False,
+ "disable_user": [],
+ "disable_group": []
+ }
"""
-
- @staticmethod
- def manual_reg_service(service: str, docs: str = None):
- file_name = service.replace("/", "") + ".json"
- file = SERVICES_DIR / file_name
- service_info = {
- "command": service,
- "docs": docs,
- "enabled": True,
- "disable_user": {},
- "disable_group": {},
- }
- with open(file, "w") as r:
- r.write(json.dumps(service_info, indent=4))
-
- @staticmethod
- def auth_service(service: str, user: str, group: str = None) -> bool:
- data = _load_service_config(service)
- if user in data["disable_user"]:
- return False
- else:
- if group in data["disable_group"]:
- return False
- else:
- return True
-
- @staticmethod
- def control_service(
- service: str,
- is_global: bool,
- is_enabled: int,
- user: str = None,
- group: str = None,
- ) -> None:
- data = _load_service_config(service)
- is_enabled = bool(is_enabled)
-
- if is_global:
- status = "disabled" if is_enabled else "enabled"
- data["enabled"] = is_enabled
- log.info(f"\033[33mService: {service} has been {status}.\033[33m")
- else:
- if user:
- if not is_enabled:
- data["disable_user"][user] = str(datetime.now())
- log.info(
- f"\033[33mNew service blocked user: {user}\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
- else:
- if user in data["disable_user"]:
- del data["disable_user"][user]
- log.info(
- f"\033[33mUser: {user} has been unblock\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
- else:
- if not is_enabled:
- data["disable_group"][group] = str(datetime.now())
- log.info(
- f"\033[33mNew service blocked group: {group}\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
+
+ def __init__(self,
+ service: str,
+ docs: str = None,
+ only_admin: bool = False,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ state: Optional[T_State] = None):
+ self.service = service
+ self.docs = docs
+ self.only_admin = only_admin
+ self.rule = rule
+ self.permission = permission
+ self.handlers = handlers
+ self.temp = temp
+ self.priority = priority
+ self.state = state
+
+ def _generate_service_config(self, service: str = None, docs: str = None) -> None:
+ if not service:
+ service = self.service
+ if not docs:
+ docs = self.docs or str()
+
+ path = SERVICES_DIR / f"{service}.json"
+ data = ServiceInfo(
+ service=service,
+ docs=docs,
+ cmd_list=dict(),
+ enabled=True,
+ only_admin=self.only_admin,
+ disable_user=list(),
+ disable_group=list()
+ )
+ try:
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data.dict(), indent=4))
+ except WriteError:
+ raise WriteError("Write service info failed!")
+
+ def save_service(self, service_data: dict, service: str = None) -> None:
+ if not service:
+ service = self.service
+
+ path = SERVICES_DIR / f"{service}.json"
+ if not path.is_file():
+ self._generate_service_config()
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(service_data, indent=4))
+
+ def load_service(self, service: str = None) -> dict:
+ if not service:
+ service = self.service
+
+ path = SERVICES_DIR / f"{service}.json"
+ if not path.is_file():
+ self._generate_service_config()
+
+ try:
+ data = json.loads(path.read_bytes())
+ except ReadFileError:
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ self._generate_service_config()
+ data = json.loads(path.read_bytes())
+ return data
+
+ def _save_cmds(self, cmds: dict) -> None:
+ data = self.load_service(self.service)
+ temp_data: dict = data["cmd_list"]
+ temp_data.update(cmds)
+ self.save_service(data)
+
+ def _load_cmds(self) -> dict:
+ path = SERVICES_DIR / f"{self.service}.json"
+ if not path.is_file():
+ self._generate_service_config()
+
+ data = json.loads(path.read_bytes())
+ return data["cmd_list"]
+
+ def on_message(self,
+ docs: str = None,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ handlers: Optional[List[T_Handler]] = None,
+ block: bool = True,
+ priority: int = None,
+ state: Optional[T_State] = None) -> Type[Matcher]:
+ if not rule:
+ rule = self.rule
+ if not permission:
+ permission = self.permission
+ if not handlers:
+ handlers = self.handlers
+ if not priority:
+ priority = self.priority
+ if not state:
+ state = self.state
+
+ if docs:
+ a = 0
+ cmd_list = self._load_cmds()
+ while True:
+ _type = "message" + str(a)
+ if _type not in cmd_list:
+ break
else:
- if group in data["disable_group"]:
- del data["disable_group"][group]
- log.info(
- f"\033[33mGroup: {group} has been unblock\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
- _save_service_config(service, data)
-
- @staticmethod
- def on_message(
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = True,
- state: Optional[T_State] = None,
- ) -> Type[Matcher]:
+ a += 1
+
+ cmd_list[_type] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
+ self._save_cmds(cmd_list)
+
matcher = Matcher.new(
"message",
Rule() & rule,
permission or Permission(),
- temp=temp,
+ temp=self.temp,
priority=priority,
block=block,
handlers=handlers,
@@ -173,59 +191,86 @@ class Service:
)
return matcher
- @staticmethod
- def on_notice(
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = False,
- state: Optional[T_State] = None,
- ) -> Type[Matcher]:
+ def on_notice(self, docs: str, block: bool = True) -> Type[Matcher]:
+ a = 0
+ cmd_list = self._load_cmds()
+ while True:
+ _type = "notice" + str(a)
+ if _type not in cmd_list:
+ break
+ else:
+ a += 1
+
+ cmd_list[_type] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
+ self._save_cmds(cmd_list)
+
matcher = Matcher.new(
"notice",
- Rule() & rule,
+ Rule() & self.rule,
Permission(),
- temp=temp,
- priority=priority,
+ temp=self.temp,
+ priority=self.priority,
block=block,
- handlers=handlers,
- default_state=state,
+ handlers=self.handlers,
+ default_state=self.state,
)
return matcher
- @staticmethod
- def on_request(
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = False,
- state: Optional[T_State] = None,
- ) -> Type[Matcher]:
+ def on_request(self, docs: str, block: bool = True) -> Type[Matcher]:
+ a = 0
+ cmd_list = self._load_cmds()
+ while True:
+ _type = "request" + str(a)
+ if _type not in cmd_list:
+ break
+ else:
+ a += 1
+
+ cmd_list[_type] =CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
+ self._save_cmds(cmd_list)
+
matcher = Matcher.new(
"request",
- Rule() & rule,
+ Rule() & self.rule,
Permission(),
- temp=temp,
- priority=priority,
+ temp=self.temp,
+ priority=self.priority,
block=block,
- handlers=handlers,
- default_state=state,
+ handlers=self.handlers,
+ default_state=self.state,
)
return matcher
- @classmethod
def on_command(
- cls,
+ self,
cmd: Union[str, Tuple[str, ...]],
- docs: Optional[str] = None,
+ docs: str,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
**kwargs,
) -> Type[Matcher]:
+ _type = "command"
+ cmd_list = self._load_cmds()
+ if not rule:
+ rule = self.rule
+ if not aliases:
+ aliases = set()
+
+ cmd_list[cmd] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list(aliases)
+ ).dict()
+ self._save_cmds(cmd_list)
+
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
@@ -239,118 +284,107 @@ class Service:
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
- _load_service_config(str(cmd), docs)
- return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs)
+ return self.on_message(rule=command(*commands) & rule, handlers=handlers, **kwargs)
- @classmethod
def on_keyword(
- cls,
+ self,
keywords: Set[str],
- docs: Optional[str] = None,
+ docs: str,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
**kwargs,
) -> Type[Matcher]:
- _load_service_config(list(keywords)[0], docs)
- return cls.on_message(keyword(*keywords) & rule, **kwargs)
+ if not rule:
+ rule = self.rule
+
+ a = 0
+ cmd_list = self._load_cmds()
+ while True:
+ _type = "keyword" + str(a)
+ if _type not in cmd_list:
+ break
+ else:
+ a += 1
+
+ cmd_list[_type] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
+ self._save_cmds(cmd_list)
+
+ return self.on_message(rule=keyword(*keywords) & rule, **kwargs)
- @classmethod
def on_regex(
- cls,
+ self,
pattern: str,
+ docs: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
**kwargs,
) -> Type[Matcher]:
- return on_message(regex(pattern, flags) & rule, **kwargs)
-
- class NetworkPost:
- URL = f"http://{NetworkPost.host}:" f"{NetworkPost.port}/"
-
- @classmethod
- async def send_private_msg(
- cls, user_id: int, message: str, auto_escape: bool = False
- ) -> Dict[str, Any]:
- url = cls.URL + "send_private_msg?"
- params = {
- "user_id": user_id,
- "message": message,
- "auto_escape": f"{auto_escape}",
- }
- result = json.loads(await post_bytes(url, params))
- log.debug(result)
- return result
-
- @classmethod
- def send_group_msg(
- cls, group_id: int, message: Union[str], auto_escape: Optional[bool] = ...
- ) -> Dict[str, Any]:
- ...
-
- @classmethod
- async def send_msg(
- cls,
- message_type: Optional[str] = "",
- user_id: Optional[int] = None,
- group_id: Optional[int] = None,
- message=Union[str],
- auto_escape: bool = False,
- ) -> Dict[str, Any]:
- url = cls.URL + "send_msg?"
- params = {
- "message_type": "",
- "user_id": user_id,
- "group_id": group_id,
- "message": message,
- "auto_escape": str(auto_escape),
- }
- result = json.loads(await post_bytes(url, params))
- log.debug(result)
- return result
+ _type = "regex"
+ if not rule:
+ rule = self.rule
+
+ cmd_list = self._load_cmds()
+ cmd_list[pattern] = CommandInfo(
+ type=_type,
+ docs=docs,
+ aliases=list()
+ ).dict()
+ self._save_cmds(cmd_list)
+
+ return self.on_message(rule=regex(pattern, flags) & rule, **kwargs)
+
+
+class ServiceTools(object):
+
+ @staticmethod
+ def save_service(service_data: dict, service: str) -> None:
+ path = SERVICES_DIR / f"{service}.json"
+ if not path.is_file():
+ raise ReadFileError(
+ f"Can't find service: ({service}) file.\n"
+ "Please delete all file in data/service/services.\n"
+ "Next reboot bot."
+ )
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(service_data, indent=4))
+
+ @staticmethod
+ def load_service(service: str) -> dict:
+ path = SERVICES_DIR / f"{service}.json"
+ if not path.is_file():
+ raise ReadFileError(
+ f"Can't find service: ({service}) file.\n"
+ "Please delete all file in data/service/services.\n"
+ "Next reboot bot."
+ )
+
+ with open(path, "r", encoding="utf-8") as r:
+ data = json.loads(r.read())
+ return data
- class Dormant:
- @staticmethod
- def is_dormant() -> bool:
- return False if is_sleep else True
+ @classmethod
+ def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool:
+ data = cls.load_service(service)
+
+ auth_global = data.get("enabled", True)
+ auth_user = data.get("disable_user", list())
+ auth_group = data.get("disable_group", list())
+
+ if user_id:
+ if user_id in auth_user:
+ return False
- @staticmethod
- def control_dormant(is_enable: bool) -> None:
- global is_sleep
- if is_enable:
- is_sleep = True
+ if group_id:
+ if group_id in auth_group:
+ return False
else:
- is_sleep = False
-
- class BlockSystem:
- file_name = "ban.json"
- path = SERVICE_DIR / file_name
-
- @staticmethod
- def auth_user(user: str) -> bool:
- return False if user in _load_block_list()["user"] else True
-
- @staticmethod
- def auth_group(group: str) -> bool:
- return False if group in _load_block_list()["group"] else True
-
- @staticmethod
- def control_list(is_enabled: bool, user: str = None, group: str = None) -> None:
- data = _load_block_list()
- if user:
- if is_enabled:
- data["user"][user] = str(datetime.now())
- log.info(
- f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m"
- )
- else:
- del data["user"][str(user)]
- log.info(f"\033[33mUser {user} has been unblock.\033[33m")
- elif group:
- if is_enabled:
- data["group"][group] = str(datetime.now())
- log.info(
- f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m"
- )
- else:
- del data["group"][str(group)]
- log.info(f"\033[33mGroup {group} has been unblock.\033[33m")
- _save_block_list(data)
+ return True
+
+ if not auth_global:
+ return False
+ else:
+ return True