diff options
Diffstat (limited to 'ATRI/service.py')
-rw-r--r-- | ATRI/service.py | 455 |
1 files changed, 455 insertions, 0 deletions
diff --git a/ATRI/service.py b/ATRI/service.py new file mode 100644 index 0000000..8ce3c22 --- /dev/null +++ b/ATRI/service.py @@ -0,0 +1,455 @@ +#!/usr/bin/env python3 +# -*- coding:utf-8 -*- +''' +File: service.py +Created Date: 2021-02-27 11:01:39 +Author: Kyomotoi +Email: [email protected] +License: GPLv3 +Project: https://github.com/Kyomotoi/ATRI +-------- +Last Modified: Sunday, 7th March 2021 2:56:28 pm +Modified By: Kyomotoi ([email protected]) +-------- +Copyright (c) 2021 Kyomotoi +''' + +import os +import json +from pathlib import Path +from datetime import datetime +from typing import ( + Dict, + Any, + List, + Set, + Tuple, + Type, + Union, + Optional, + TYPE_CHECKING +) +from nonebot.matcher import Matcher +from nonebot.permission import Permission +from nonebot.typing import T_State, T_Handler, T_RuleChecker +from nonebot.rule import Rule, command, keyword + +from .log import logger as log +from .config import config +from .utils.request import post_bytes + +if TYPE_CHECKING: + from nonebot.adapters import Bot, Event + + +SERVICE_DIR = Path('.') / 'ATRI' / 'data' / 'service' +os.makedirs(SERVICE_DIR, exist_ok=True) + + +matcher_list: list = [] +is_sleep: bool = False + + +def _load_service_config(service: str, docs: str = None) -> dict: + file_name = service + ".json" + file = SERVICE_DIR / file_name + try: + data = json.loads(file.read_bytes()) + except: + service_info = { + "name": service, + "docs": docs, + "disable_group": [] + } + with open(file, "w") as r: + r.write(json.dumps(service_info, indent=4)) + data = service_info + return data + + +def _save_service_config(service: str, data: dict) -> None: + file_name = service + ".json" + file = SERVICE_DIR / file_name + with open(file, "w") as r: + r.write(json.dumps(data, indent=4)) + + +class Service: + """ + 集成一套服务管理,block准确至个人 + 计划搭配前端使用 + """ + @staticmethod + def manual_reg_service(service: str): + file_name = service + ".json" + file = SERVICE_DIR / file_name + service_info = { + "name": service, + "docs": None, + "disable_group": [] + } + with open(file, "w") as r: + r.write(json.dumps(service_info, indent=4)) + + @staticmethod + def auth_service(service: str, group: Optional[int] = None) -> bool: + data = _load_service_config(service) + return False if group in data["disable_group"] else True + + @staticmethod + def control_service(service: str, group: int, is_enable: bool) -> None: + data = _load_service_config(service) + sv_group = data.get('disable_group', []) + if is_enable: + sv_group.remove(group) + log.info(f"Service {service} has been enabled.") + else: + sv_group.append(group) + log.info(f"Service {service} has been disabled.") + data["disable_group"] = sv_group + _save_service_config(service, data) + + @staticmethod + def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + *, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + block: bool = True, + state: Optional[T_State] = None) -> Type[Matcher]: + matcher = Matcher.new("message", + Rule() & rule, + permission or Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + default_state=state) + return matcher + + @staticmethod + def on_notice(name: str, + docs: str, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + *, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None) -> Type[Matcher]: + matcher = Matcher.new("notice", + Rule() & rule, + Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + default_state=state) + _load_service_config(name, docs) + matcher_list.append(name) + return matcher + + @staticmethod + def on_request(name: str, + docs: str, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + *, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None) -> Type[Matcher]: + matcher = Matcher.new("request", + Rule() & rule, + Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + default_state=state) + _load_service_config(name, docs) + matcher_list.append(name) + return matcher + + @classmethod + def on_command(cls, + name: str, + docs: str, + cmd: Union[str, Tuple[str, ...]], + rule: Optional[Union[Rule, T_RuleChecker]] = None, + aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, + **kwargs) -> Type[Matcher]: + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): + message = event.get_message() + segment = message.pop(0) + new_message = message.__class__( + str(segment).lstrip() + [len(state["_prefix"]["raw_command"]):].lstrip()) # type: ignore + for new_segment in reversed(new_message): + message.insert(0, new_segment) + + handlers = kwargs.pop("handlers", []) + handlers.insert(0, _strip_cmd) + + commands = set([cmd]) | (aliases or set()) + _load_service_config(name, docs) + matcher_list.append(name) + return cls.on_message(command(*commands) & rule, + handlers=handlers, **kwargs) + + @classmethod + def on_keyword(cls, + name: str, + docs: str, + keywords: Set[str], + rule: Optional[Union[Rule, T_RuleChecker]] = None, + **kwargs) -> Type[Matcher]: + _load_service_config(name, docs) + matcher_list.append(name) + return cls.on_message(keyword(*keywords) & rule, **kwargs) + + + class NetworkPost: + URL = ( + f"http://{config['NetworkPost']['host']}:" + f"{config['NetworkPost']['port']}/" + ) + + @classmethod + async def send_private_msg(cls, + user_id: int, + message: str, + auto_escape: bool = False): # -> Dict[str, Any] + url = cls.URL + "send_private_msg?" + params = { + "user_id": user_id, + "message": message, + "auto_escape": f"{auto_escape}" + } + result = json.loads(await post_bytes(url, params)) + log.debug(result) + return result + + @classmethod + def send_group_msg(cls, + group_id: int, + message: Union[str], + auto_escape: Optional[bool] = ...) -> Dict[str, Any]: + ... + + @classmethod + def send_msg(cls, + message_type: Optional[str] = ..., + user_id: Optional[int] = ..., + group_id: Optional[int] = ..., + message = Union[str], + auto_escape: bool = ...) -> Dict[str, Any]: + ... + + @classmethod + def delete_msg(cls, + message_id: int): + ... + + @classmethod + def get_msg(cls, + message_id: int) -> Dict[str, Any]: + ... + + @classmethod + def get_forward_msg(cls, + id: int): + ... + + @classmethod + def send_like(cls, + user_id: int, + times: int = ...): + ... + + @classmethod + def set_group_kick(cls, + group_id: int, + user_id: int, + reject_add_request: bool = ...): + ... + + @classmethod + def set_group_ban(cls, + group_id: int, + user_id: int, + duration: int = ...): + ... + + @classmethod + def set_group_anonymous_ban(cls, + group_id: int, + anonymous: Optional[Dict[str, Any]] = ..., + flag: Optional[str] = ..., + duration: int = ...): + ... + + @classmethod + def set_group_whole_ban(cls, + group_id: int, + enable: bool = ...): + ... + + @classmethod + def set_group_admin(cls, + group_id: int, + user_id: int, + enable: bool = ...): + ... + + @classmethod + def set_group_anonymous(cls, + group_id: int, + enable: bool = ...): + ... + + @classmethod + def set_group_card(cls): + ... + + @classmethod + def set_group_name(cls): + ... + + @classmethod + def set_group_leave(cls): + ... + + @classmethod + def set_group_special_title(cls): + ... + + @classmethod + def set_friend_add_request(cls): + ... + + @classmethod + def set_group_add_request(cls): + ... + + @classmethod + def get_login_info(cls): + ... + + @classmethod + def get_stranger_info(cls): + ... + + @classmethod + def get_friend_list(cls): + ... + + @classmethod + def get_group_info(cls): + ... + + @classmethod + def get_group_list(cls): + ... + + @classmethod + def get_group_member_info(cls): + ... + + @classmethod + def get_group_member_list(cls): + ... + + @classmethod + def get_group_honor_info(cls): + ... + + @classmethod + def get_cookies(cls): + ... + + @classmethod + def get_csrf_token(cls): + ... + + @classmethod + def get_credentials(cls): + ... + + @classmethod + def get_record(cls): + ... + + @classmethod + def get_image(cls): + ... + + @classmethod + def can_send_image(cls): + ... + + @classmethod + def can_send_record(cls): + ... + + @classmethod + def get_status(cls): + ... + + @classmethod + def get_version_info(cls): + ... + + @classmethod + def set_restart(cls): + ... + + @classmethod + def clean_cache(cls): + ... + + + class Dormant: + @staticmethod + def is_dormant() -> bool: + return False if is_sleep else True + + @staticmethod + def control_dormant(is_enable: bool) -> None: + global is_sleep + if is_enable: + is_sleep = True + else: + is_sleep = False + + + class BlockSystem: + file_name = "ban.json" + path = SERVICE_DIR / file_name + + @classmethod + def get_list(cls) -> dict: + try: + return json.loads(cls.path.read_bytes()) + except: + with open(cls.path, "w") as r: + json.dump("{}", r) + return {} + + @classmethod + def auth_user(cls, user: int) -> bool: + return False if user in cls.get_list() else True + + @classmethod + def control_list(cls, user: int, is_enable: bool) -> None: + data = cls.get_list() + if is_enable: + data[user] = datetime.now().__str__ + log.info(f"New blocked user: {user} | Time: {datetime.now()}") + else: + del data[user] + log.info(f"User {user} has been unblock.") + + with open(cls.path, "w") as r: + json.dump(data, r)
\ No newline at end of file |