summaryrefslogtreecommitdiff
path: root/ATRI/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/service.py')
-rw-r--r--ATRI/service.py455
1 files changed, 455 insertions, 0 deletions
diff --git a/ATRI/service.py b/ATRI/service.py
new file mode 100644
index 0000000..8ce3c22
--- /dev/null
+++ b/ATRI/service.py
@@ -0,0 +1,455 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+'''
+File: service.py
+Created Date: 2021-02-27 11:01:39
+Author: Kyomotoi
+License: GPLv3
+Project: https://github.com/Kyomotoi/ATRI
+--------
+Last Modified: Sunday, 7th March 2021 2:56:28 pm
+Modified By: Kyomotoi ([email protected])
+--------
+Copyright (c) 2021 Kyomotoi
+'''
+
+import os
+import json
+from pathlib import Path
+from datetime import datetime
+from typing import (
+ Dict,
+ Any,
+ List,
+ Set,
+ Tuple,
+ Type,
+ Union,
+ Optional,
+ TYPE_CHECKING
+)
+from nonebot.matcher import Matcher
+from nonebot.permission import Permission
+from nonebot.typing import T_State, T_Handler, T_RuleChecker
+from nonebot.rule import Rule, command, keyword
+
+from .log import logger as log
+from .config import config
+from .utils.request import post_bytes
+
+if TYPE_CHECKING:
+ from nonebot.adapters import Bot, Event
+
+
+SERVICE_DIR = Path('.') / 'ATRI' / 'data' / 'service'
+os.makedirs(SERVICE_DIR, exist_ok=True)
+
+
+matcher_list: list = []
+is_sleep: bool = False
+
+
+def _load_service_config(service: str, docs: str = None) -> dict:
+ file_name = service + ".json"
+ file = SERVICE_DIR / file_name
+ try:
+ data = json.loads(file.read_bytes())
+ except:
+ service_info = {
+ "name": service,
+ "docs": docs,
+ "disable_group": []
+ }
+ with open(file, "w") as r:
+ r.write(json.dumps(service_info, indent=4))
+ data = service_info
+ return data
+
+
+def _save_service_config(service: str, data: dict) -> None:
+ file_name = service + ".json"
+ file = SERVICE_DIR / file_name
+ with open(file, "w") as r:
+ r.write(json.dumps(data, indent=4))
+
+
+class Service:
+ """
+ 集成一套服务管理,block准确至个人
+ 计划搭配前端使用
+ """
+ @staticmethod
+ def manual_reg_service(service: str):
+ file_name = service + ".json"
+ file = SERVICE_DIR / file_name
+ service_info = {
+ "name": service,
+ "docs": None,
+ "disable_group": []
+ }
+ with open(file, "w") as r:
+ r.write(json.dumps(service_info, indent=4))
+
+ @staticmethod
+ def auth_service(service: str, group: Optional[int] = None) -> bool:
+ data = _load_service_config(service)
+ return False if group in data["disable_group"] else True
+
+ @staticmethod
+ def control_service(service: str, group: int, is_enable: bool) -> None:
+ data = _load_service_config(service)
+ sv_group = data.get('disable_group', [])
+ if is_enable:
+ sv_group.remove(group)
+ log.info(f"Service {service} has been enabled.")
+ else:
+ sv_group.append(group)
+ log.info(f"Service {service} has been disabled.")
+ data["disable_group"] = sv_group
+ _save_service_config(service, data)
+
+ @staticmethod
+ def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = True,
+ state: Optional[T_State] = None) -> Type[Matcher]:
+ matcher = Matcher.new("message",
+ Rule() & rule,
+ permission or Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state)
+ return matcher
+
+ @staticmethod
+ def on_notice(name: str,
+ docs: str,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = False,
+ state: Optional[T_State] = None) -> Type[Matcher]:
+ matcher = Matcher.new("notice",
+ Rule() & rule,
+ Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state)
+ _load_service_config(name, docs)
+ matcher_list.append(name)
+ return matcher
+
+ @staticmethod
+ def on_request(name: str,
+ docs: str,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = False,
+ state: Optional[T_State] = None) -> Type[Matcher]:
+ matcher = Matcher.new("request",
+ Rule() & rule,
+ Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state)
+ _load_service_config(name, docs)
+ matcher_list.append(name)
+ return matcher
+
+ @classmethod
+ def on_command(cls,
+ name: str,
+ docs: str,
+ cmd: Union[str, Tuple[str, ...]],
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
+ **kwargs) -> Type[Matcher]:
+ async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
+ message = event.get_message()
+ segment = message.pop(0)
+ new_message = message.__class__(
+ str(segment).lstrip()
+ [len(state["_prefix"]["raw_command"]):].lstrip()) # type: ignore
+ for new_segment in reversed(new_message):
+ message.insert(0, new_segment)
+
+ handlers = kwargs.pop("handlers", [])
+ handlers.insert(0, _strip_cmd)
+
+ commands = set([cmd]) | (aliases or set())
+ _load_service_config(name, docs)
+ matcher_list.append(name)
+ return cls.on_message(command(*commands) & rule,
+ handlers=handlers, **kwargs)
+
+ @classmethod
+ def on_keyword(cls,
+ name: str,
+ docs: str,
+ keywords: Set[str],
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ **kwargs) -> Type[Matcher]:
+ _load_service_config(name, docs)
+ matcher_list.append(name)
+ return cls.on_message(keyword(*keywords) & rule, **kwargs)
+
+
+ class NetworkPost:
+ URL = (
+ f"http://{config['NetworkPost']['host']}:"
+ f"{config['NetworkPost']['port']}/"
+ )
+
+ @classmethod
+ async def send_private_msg(cls,
+ user_id: int,
+ message: str,
+ auto_escape: bool = False): # -> Dict[str, Any]
+ url = cls.URL + "send_private_msg?"
+ params = {
+ "user_id": user_id,
+ "message": message,
+ "auto_escape": f"{auto_escape}"
+ }
+ result = json.loads(await post_bytes(url, params))
+ log.debug(result)
+ return result
+
+ @classmethod
+ def send_group_msg(cls,
+ group_id: int,
+ message: Union[str],
+ auto_escape: Optional[bool] = ...) -> Dict[str, Any]:
+ ...
+
+ @classmethod
+ def send_msg(cls,
+ message_type: Optional[str] = ...,
+ user_id: Optional[int] = ...,
+ group_id: Optional[int] = ...,
+ message = Union[str],
+ auto_escape: bool = ...) -> Dict[str, Any]:
+ ...
+
+ @classmethod
+ def delete_msg(cls,
+ message_id: int):
+ ...
+
+ @classmethod
+ def get_msg(cls,
+ message_id: int) -> Dict[str, Any]:
+ ...
+
+ @classmethod
+ def get_forward_msg(cls,
+ id: int):
+ ...
+
+ @classmethod
+ def send_like(cls,
+ user_id: int,
+ times: int = ...):
+ ...
+
+ @classmethod
+ def set_group_kick(cls,
+ group_id: int,
+ user_id: int,
+ reject_add_request: bool = ...):
+ ...
+
+ @classmethod
+ def set_group_ban(cls,
+ group_id: int,
+ user_id: int,
+ duration: int = ...):
+ ...
+
+ @classmethod
+ def set_group_anonymous_ban(cls,
+ group_id: int,
+ anonymous: Optional[Dict[str, Any]] = ...,
+ flag: Optional[str] = ...,
+ duration: int = ...):
+ ...
+
+ @classmethod
+ def set_group_whole_ban(cls,
+ group_id: int,
+ enable: bool = ...):
+ ...
+
+ @classmethod
+ def set_group_admin(cls,
+ group_id: int,
+ user_id: int,
+ enable: bool = ...):
+ ...
+
+ @classmethod
+ def set_group_anonymous(cls,
+ group_id: int,
+ enable: bool = ...):
+ ...
+
+ @classmethod
+ def set_group_card(cls):
+ ...
+
+ @classmethod
+ def set_group_name(cls):
+ ...
+
+ @classmethod
+ def set_group_leave(cls):
+ ...
+
+ @classmethod
+ def set_group_special_title(cls):
+ ...
+
+ @classmethod
+ def set_friend_add_request(cls):
+ ...
+
+ @classmethod
+ def set_group_add_request(cls):
+ ...
+
+ @classmethod
+ def get_login_info(cls):
+ ...
+
+ @classmethod
+ def get_stranger_info(cls):
+ ...
+
+ @classmethod
+ def get_friend_list(cls):
+ ...
+
+ @classmethod
+ def get_group_info(cls):
+ ...
+
+ @classmethod
+ def get_group_list(cls):
+ ...
+
+ @classmethod
+ def get_group_member_info(cls):
+ ...
+
+ @classmethod
+ def get_group_member_list(cls):
+ ...
+
+ @classmethod
+ def get_group_honor_info(cls):
+ ...
+
+ @classmethod
+ def get_cookies(cls):
+ ...
+
+ @classmethod
+ def get_csrf_token(cls):
+ ...
+
+ @classmethod
+ def get_credentials(cls):
+ ...
+
+ @classmethod
+ def get_record(cls):
+ ...
+
+ @classmethod
+ def get_image(cls):
+ ...
+
+ @classmethod
+ def can_send_image(cls):
+ ...
+
+ @classmethod
+ def can_send_record(cls):
+ ...
+
+ @classmethod
+ def get_status(cls):
+ ...
+
+ @classmethod
+ def get_version_info(cls):
+ ...
+
+ @classmethod
+ def set_restart(cls):
+ ...
+
+ @classmethod
+ def clean_cache(cls):
+ ...
+
+
+ class Dormant:
+ @staticmethod
+ def is_dormant() -> bool:
+ return False if is_sleep else True
+
+ @staticmethod
+ def control_dormant(is_enable: bool) -> None:
+ global is_sleep
+ if is_enable:
+ is_sleep = True
+ else:
+ is_sleep = False
+
+
+ class BlockSystem:
+ file_name = "ban.json"
+ path = SERVICE_DIR / file_name
+
+ @classmethod
+ def get_list(cls) -> dict:
+ try:
+ return json.loads(cls.path.read_bytes())
+ except:
+ with open(cls.path, "w") as r:
+ json.dump("{}", r)
+ return {}
+
+ @classmethod
+ def auth_user(cls, user: int) -> bool:
+ return False if user in cls.get_list() else True
+
+ @classmethod
+ def control_list(cls, user: int, is_enable: bool) -> None:
+ data = cls.get_list()
+ if is_enable:
+ data[user] = datetime.now().__str__
+ log.info(f"New blocked user: {user} | Time: {datetime.now()}")
+ else:
+ del data[user]
+ log.info(f"User {user} has been unblock.")
+
+ with open(cls.path, "w") as r:
+ json.dump(data, r) \ No newline at end of file