diff options
Diffstat (limited to 'ATRI/service.py')
-rw-r--r-- | ATRI/service.py | 130 |
1 files changed, 74 insertions, 56 deletions
diff --git a/ATRI/service.py b/ATRI/service.py index ab870fd..df2e8d5 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -19,7 +19,7 @@ from nonebot.typing import T_State, T_Handler, T_RuleChecker from nonebot.rule import Rule, command, keyword from .log import logger as log -from .config import config +from .config import Config from .utils.request import post_bytes if TYPE_CHECKING: @@ -32,8 +32,8 @@ os.makedirs(SERVICE_DIR, exist_ok=True) os.makedirs(SERVICES_DIR, exist_ok=True) -matcher_list: list = [] is_sleep: bool = False +matcher_list: list = [] def _load_block_list() -> dict: @@ -43,8 +43,8 @@ def _load_block_list() -> dict: data = json.loads(file.read_bytes()) except: data = { - "user": [], - "group": [] + "user": {}, + "group": {} } with open(file, "w") as r: r.write(json.dumps(data, indent=4)) @@ -59,16 +59,17 @@ def _save_block_list(data: dict) -> None: def _load_service_config(service: str, docs: str = None) -> dict: - file_name = service + ".json" + file_name = service.replace('/', '') + ".json" file = SERVICES_DIR / file_name try: data = json.loads(file.read_bytes()) except: service_info = { - "name": service, + "command": service, "docs": docs, - "disable_user": _load_block_list()['user'], - "disable_group": _load_block_list()['group'] + "enabled": True, + "disable_user": {}, + "disable_group": {} } with open(file, "w") as r: r.write(json.dumps(service_info, indent=4)) @@ -77,7 +78,7 @@ def _load_service_config(service: str, docs: str = None) -> dict: def _save_service_config(service: str, data: dict) -> None: - file_name = service + ".json" + file_name = service.replace('/', '') + ".json" file = SERVICES_DIR / file_name with open(file, "w") as r: r.write(json.dumps(data, indent=4)) @@ -85,18 +86,19 @@ def _save_service_config(service: str, data: dict) -> None: class Service: """ - 集成一套服务管理,block准确至个人 + 集成一套服务管理,对功能信息进行持久化 计划搭配前端使用 """ @staticmethod def manual_reg_service(service: str): - file_name = service + ".json" + file_name = service.replace('/', '') + ".json" file = SERVICES_DIR / file_name service_info = { "name": service, "docs": None, - "disable_user": _load_block_list()['user'], - "disable_group": _load_block_list()['group'] + "enabled": True, + "disable_user": {}, + "disable_group": {} } with open(file, "w") as r: r.write(json.dumps(service_info, indent=4)) @@ -107,16 +109,36 @@ class Service: return False if group in data["disable_group"] else True @staticmethod - def control_service(service: str, group: int, is_enable: bool) -> None: + def control_service(service: str, + is_global: bool, + is_enabled: bool, + user: Optional[int] = None, + group: Optional[int] = None) -> None: data = _load_service_config(service) - sv_group = data.get('disable_group', []) - if is_enable: - sv_group.remove(group) - log.info(f"Service {service} has been enabled.") + + if is_global: + status = "disabled" if is_enabled else "enabled" + data['enbaled'] = is_enabled + log.info(f"Service: {service} has been {status}.") else: - sv_group.append(group) - log.info(f"Service {service} has been disabled.") - data["disable_group"] = sv_group + if user: + if is_enabled: + data['disable_user'][user] = str(datetime.now()) + log.info(f"New service blocked user: {user}" + f" | Service: {service} | Time: {datetime.now()}") + else: + del data['disable_user'][user] + log.info(f"User: {user} has been unblock" + f" | Service: {service} | Time: {datetime.now()}") + else: + if is_enabled: + data['disable_group'][group] = str(datetime.now()) + log.info(f"New service blocked group: {group}" + f" | Service: {service} | Time: {datetime.now()}") + else: + del data['disable_group'][group] + log.info(f"Group: {group} has been unblock" + f" | Service: {service} | Time: {datetime.now()}") _save_service_config(service, data) @staticmethod @@ -139,9 +161,7 @@ class Service: return matcher @staticmethod - def on_notice(name: str, - docs: Optional[str] = None, - rule: Optional[Union[Rule, T_RuleChecker]] = None, + def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None, *, handlers: Optional[List[T_Handler]] = None, temp: bool = False, @@ -156,14 +176,10 @@ class Service: block=block, handlers=handlers, default_state=state) - _load_service_config(name, docs) - matcher_list.append(name) return matcher @staticmethod - def on_request(name: str, - docs: Optional[str] = None, - rule: Optional[Union[Rule, T_RuleChecker]] = None, + def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None, *, handlers: Optional[List[T_Handler]] = None, temp: bool = False, @@ -178,13 +194,10 @@ class Service: block=block, handlers=handlers, default_state=state) - _load_service_config(name, docs) - matcher_list.append(name) return matcher @classmethod def on_command(cls, - name: str, cmd: Union[str, Tuple[str, ...]], docs: Optional[str] = None, rule: Optional[Union[Rule, T_RuleChecker]] = None, @@ -203,34 +216,31 @@ class Service: handlers.insert(0, _strip_cmd) commands = set([cmd]) | (aliases or set()) - _load_service_config(name, docs) - matcher_list.append(name) + _load_service_config(str(cmd), docs) return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs) @classmethod def on_keyword(cls, - name: str, keywords: Set[str], docs: Optional[str] = None, rule: Optional[Union[Rule, T_RuleChecker]] = None, **kwargs) -> Type[Matcher]: - _load_service_config(name, docs) - matcher_list.append(name) + _load_service_config(list(keywords)[0], docs) return cls.on_message(keyword(*keywords) & rule, **kwargs) class NetworkPost: URL = ( - f"http://{config['NetworkPost']['host']}:" - f"{config['NetworkPost']['port']}/" + f"http://{Config.NetworkPost.host}:" + f"{Config.NetworkPost.port}/" ) @classmethod async def send_private_msg(cls, user_id: int, message: str, - auto_escape: bool = False): # -> Dict[str, Any] + auto_escape: bool = False) -> Dict[str, Any]: url = cls.URL + "send_private_msg?" params = { "user_id": user_id, @@ -249,13 +259,23 @@ class Service: ... @classmethod - def send_msg(cls, - message_type: Optional[str] = ..., - user_id: Optional[int] = ..., - group_id: Optional[int] = ..., + async def send_msg(cls, + message_type: Optional[str] = "", + user_id: Optional[int] = None, + group_id: Optional[int] = None, message = Union[str], - auto_escape: bool = ...) -> Dict[str, Any]: - ... + auto_escape: bool = False) -> Dict[str, Any]: + url = cls.URL + "send_msg?" + params = { + "message_type": "", + "user_id": user_id, + "group_id": group_id, + "message": message, + "auto_escape": str(auto_escape) + } + result = json.loads(await post_bytes(url, params)) + log.debug(result) + return result @classmethod def delete_msg(cls, @@ -448,24 +468,22 @@ class Service: @classmethod def control_list(cls, - is_enable: bool, + is_enabled: bool, user: Optional[int] = None, group: Optional[int] = None) -> None: data = _load_block_list() if user: - if is_enable: - data['user'][user] = datetime.now().__str__ + if is_enabled: + data['user'][user] = str(datetime.now()) log.info(f"New blocked user: {user} | Time: {datetime.now()}") else: - del data[user] + del data['user'][str(user)] log.info(f"User {user} has been unblock.") elif group: - if is_enable: - data['group'][group] = datetime.now().__str__ + if is_enabled: + data['group'][group] = str(datetime.now()) log.info(f"New blocked group: {group} | Time: {datetime.now()}") else: - del data[user] + del data['group'][str(group)] log.info(f"Group {group} has been unblock.") - - with open(cls.path, "w") as r: - json.dump(data, r) + _save_block_list(data) |