diff options
Diffstat (limited to 'ATRI/service.py')
-rw-r--r-- | ATRI/service.py | 323 |
1 files changed, 167 insertions, 156 deletions
diff --git a/ATRI/service.py b/ATRI/service.py index 87e606c..79d3d42 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -3,17 +3,7 @@ import re import json from pathlib import Path from datetime import datetime -from typing import ( - Dict, - Any, - List, - Set, - Tuple, - Type, - Union, - Optional, - TYPE_CHECKING -) +from typing import Dict, Any, List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING from nonebot.matcher import Matcher from nonebot.permission import Permission from nonebot.plugin import on_message @@ -28,8 +18,8 @@ if TYPE_CHECKING: from nonebot.adapters import Bot, Event -SERVICE_DIR = Path('.') / 'ATRI' / 'data' / 'service' -SERVICES_DIR = SERVICE_DIR / 'services' +SERVICE_DIR = Path(".") / "ATRI" / "data" / "service" +SERVICES_DIR = SERVICE_DIR / "services" os.makedirs(SERVICE_DIR, exist_ok=True) os.makedirs(SERVICES_DIR, exist_ok=True) @@ -44,10 +34,7 @@ def _load_block_list() -> dict: try: data = json.loads(file.read_bytes()) except: - data = { - "user": {}, - "group": {} - } + data = {"user": {}, "group": {}} with open(file, "w") as r: r.write(json.dumps(data, indent=4)) return data @@ -61,7 +48,7 @@ def _save_block_list(data: dict) -> None: def _load_service_config(service: str, docs: str = None) -> dict: - file_name = service.replace('/', '') + ".json" + file_name = service.replace("/", "") + ".json" file = SERVICES_DIR / file_name try: data = json.loads(file.read_bytes()) @@ -71,7 +58,7 @@ def _load_service_config(service: str, docs: str = None) -> dict: "docs": docs, "enabled": True, "disable_user": {}, - "disable_group": {} + "disable_group": {}, } with open(file, "w") as r: r.write(json.dumps(service_info, indent=4)) @@ -80,7 +67,7 @@ def _load_service_config(service: str, docs: str = None) -> dict: def _save_service_config(service: str, data: dict) -> None: - file_name = service.replace('/', '') + ".json" + file_name = service.replace("/", "") + ".json" file = SERVICES_DIR / file_name with open(file, "w") as r: r.write(json.dumps(data, indent=4)) @@ -91,20 +78,21 @@ class Service: 集成一套服务管理,对功能信息进行持久化 计划搭配前端使用 """ + @staticmethod def manual_reg_service(service: str, docs: str = None): - file_name = service.replace('/', '') + ".json" + file_name = service.replace("/", "") + ".json" file = SERVICES_DIR / file_name service_info = { "command": service, "docs": docs, "enabled": True, "disable_user": {}, - "disable_group": {} + "disable_group": {}, } with open(file, "w") as r: r.write(json.dumps(service_info, indent=4)) - + @staticmethod def auth_service(service: str, user: str, group: str = None) -> bool: data = _load_service_config(service) @@ -115,193 +103,215 @@ class Service: return False else: return True - + @staticmethod - def control_service(service: str, - is_global: bool, - is_enabled: int, - user: str = None, - group: str = None) -> None: + def control_service( + service: str, + is_global: bool, + is_enabled: int, + user: str = None, + group: str = None, + ) -> None: data = _load_service_config(service) is_enabled = bool(is_enabled) - + if is_global: status = "disabled" if is_enabled else "enabled" - data['enabled'] = is_enabled + data["enabled"] = is_enabled log.info(f"\033[33mService: {service} has been {status}.\033[33m") else: if user: if not is_enabled: - data['disable_user'][user] = str(datetime.now()) - log.info(f"\033[33mNew service blocked user: {user}\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m") + data["disable_user"][user] = str(datetime.now()) + log.info( + f"\033[33mNew service blocked user: {user}\033[33m" + f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" + ) else: - if user in data['disable_user']: - del data['disable_user'][user] - log.info(f"\033[33mUser: {user} has been unblock\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m") + if user in data["disable_user"]: + del data["disable_user"][user] + log.info( + f"\033[33mUser: {user} has been unblock\033[33m" + f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" + ) else: if not is_enabled: - data['disable_group'][group] = str(datetime.now()) - log.info(f"\033[33mNew service blocked group: {group}\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m") + data["disable_group"][group] = str(datetime.now()) + log.info( + f"\033[33mNew service blocked group: {group}\033[33m" + f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" + ) else: - if group in data['disable_group']: - del data['disable_group'][group] - log.info(f"\033[33mGroup: {group} has been unblock\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m") + if group in data["disable_group"]: + del data["disable_group"][group] + log.info( + f"\033[33mGroup: {group} has been unblock\033[33m" + f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" + ) _save_service_config(service, data) - + @staticmethod - def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = True, - state: Optional[T_State] = None) -> Type[Matcher]: - matcher = Matcher.new("message", - Rule() & rule, - permission or Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - default_state=state) + def on_message( + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + *, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + block: bool = True, + state: Optional[T_State] = None, + ) -> Type[Matcher]: + matcher = Matcher.new( + "message", + Rule() & rule, + permission or Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + default_state=state, + ) return matcher @staticmethod - def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None) -> Type[Matcher]: - matcher = Matcher.new("notice", - Rule() & rule, - Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - default_state=state) + def on_notice( + rule: Optional[Union[Rule, T_RuleChecker]] = None, + *, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None, + ) -> Type[Matcher]: + matcher = Matcher.new( + "notice", + Rule() & rule, + Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + default_state=state, + ) return matcher @staticmethod - def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None) -> Type[Matcher]: - matcher = Matcher.new("request", - Rule() & rule, - Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - default_state=state) + def on_request( + rule: Optional[Union[Rule, T_RuleChecker]] = None, + *, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None, + ) -> Type[Matcher]: + matcher = Matcher.new( + "request", + Rule() & rule, + Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + default_state=state, + ) return matcher @classmethod - def on_command(cls, - cmd: Union[str, Tuple[str, ...]], - docs: Optional[str] = None, - rule: Optional[Union[Rule, T_RuleChecker]] = None, - aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, - **kwargs) -> Type[Matcher]: + def on_command( + cls, + cmd: Union[str, Tuple[str, ...]], + docs: Optional[str] = None, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, + **kwargs, + ) -> Type[Matcher]: async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): message = event.get_message() segment = message.pop(0) new_message = message.__class__( - str(segment).lstrip() - [len(state["_prefix"]["raw_command"]):].lstrip()) # type: ignore + str(segment).lstrip()[len(state["_prefix"]["raw_command"]) :].lstrip() + ) # type: ignore for new_segment in reversed(new_message): message.insert(0, new_segment) - + handlers = kwargs.pop("handlers", []) handlers.insert(0, _strip_cmd) - + commands = set([cmd]) | (aliases or set()) _load_service_config(str(cmd), docs) - return cls.on_message(command(*commands) & rule, - handlers=handlers, **kwargs) + return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs) @classmethod - def on_keyword(cls, - keywords: Set[str], - docs: Optional[str] = None, - rule: Optional[Union[Rule, T_RuleChecker]] = None, - **kwargs) -> Type[Matcher]: + def on_keyword( + cls, + keywords: Set[str], + docs: Optional[str] = None, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + **kwargs, + ) -> Type[Matcher]: _load_service_config(list(keywords)[0], docs) return cls.on_message(keyword(*keywords) & rule, **kwargs) - + @classmethod - def on_regex(cls, - pattern: str, - flags: Union[int, re.RegexFlag] = 0, - rule: Optional[Union[Rule, T_RuleChecker]] = None, - **kwargs) -> Type[Matcher]: + def on_regex( + cls, + pattern: str, + flags: Union[int, re.RegexFlag] = 0, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + **kwargs, + ) -> Type[Matcher]: return on_message(regex(pattern, flags) & rule, **kwargs) - - + class NetworkPost: - URL = ( - f"http://{Config.NetworkPost.host}:" - f"{Config.NetworkPost.port}/" - ) - + URL = f"http://{Config.NetworkPost.host}:" f"{Config.NetworkPost.port}/" + @classmethod - async def send_private_msg(cls, - user_id: int, - message: str, - auto_escape: bool = False) -> Dict[str, Any]: + async def send_private_msg( + cls, user_id: int, message: str, auto_escape: bool = False + ) -> Dict[str, Any]: url = cls.URL + "send_private_msg?" params = { "user_id": user_id, "message": message, - "auto_escape": f"{auto_escape}" + "auto_escape": f"{auto_escape}", } result = json.loads(await post_bytes(url, params)) log.debug(result) return result @classmethod - def send_group_msg(cls, - group_id: int, - message: Union[str], - auto_escape: Optional[bool] = ...) -> Dict[str, Any]: + def send_group_msg( + cls, group_id: int, message: Union[str], auto_escape: Optional[bool] = ... + ) -> Dict[str, Any]: ... @classmethod - async def send_msg(cls, - message_type: Optional[str] = "", - user_id: Optional[int] = None, - group_id: Optional[int] = None, - message = Union[str], - auto_escape: bool = False) -> Dict[str, Any]: + async def send_msg( + cls, + message_type: Optional[str] = "", + user_id: Optional[int] = None, + group_id: Optional[int] = None, + message=Union[str], + auto_escape: bool = False, + ) -> Dict[str, Any]: url = cls.URL + "send_msg?" params = { "message_type": "", "user_id": user_id, "group_id": group_id, "message": message, - "auto_escape": str(auto_escape) + "auto_escape": str(auto_escape), } result = json.loads(await post_bytes(url, params)) log.debug(result) return result - class Dormant: @staticmethod def is_dormant() -> bool: return False if is_sleep else True - + @staticmethod def control_dormant(is_enable: bool) -> None: global is_sleep @@ -309,37 +319,38 @@ class Service: is_sleep = True else: is_sleep = False - - + class BlockSystem: file_name = "ban.json" path = SERVICE_DIR / file_name - + @staticmethod def auth_user(user: str) -> bool: - return False if user in _load_block_list()['user'] else True - + return False if user in _load_block_list()["user"] else True + @staticmethod def auth_group(group: str) -> bool: - return False if group in _load_block_list()['group'] else True - + return False if group in _load_block_list()["group"] else True + @staticmethod - def control_list(is_enabled: bool, - user: str = None, - group: str = None) -> None: + def control_list(is_enabled: bool, user: str = None, group: str = None) -> None: data = _load_block_list() if user: if is_enabled: - data['user'][user] = str(datetime.now()) - log.info(f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m") + data["user"][user] = str(datetime.now()) + log.info( + f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m" + ) else: - del data['user'][str(user)] + del data["user"][str(user)] log.info(f"\033[33mUser {user} has been unblock.\033[33m") elif group: if is_enabled: - data['group'][group] = str(datetime.now()) - log.info(f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m") + data["group"][group] = str(datetime.now()) + log.info( + f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m" + ) else: - del data['group'][str(group)] + del data["group"][str(group)] log.info(f"\033[33mGroup {group} has been unblock.\033[33m") _save_block_list(data) |