summaryrefslogtreecommitdiff
path: root/ATRI/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/service.py')
-rw-r--r--ATRI/service.py325
1 files changed, 157 insertions, 168 deletions
diff --git a/ATRI/service.py b/ATRI/service.py
index 79d3d42..241d367 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -3,7 +3,17 @@ import re
import json
from pathlib import Path
from datetime import datetime
-from typing import Dict, Any, List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING
+from typing import (
+ Dict,
+ Any,
+ List,
+ Set,
+ Tuple,
+ Type,
+ Union,
+ Optional,
+ TYPE_CHECKING
+)
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.plugin import on_message
@@ -11,15 +21,15 @@ from nonebot.typing import T_State, T_Handler, T_RuleChecker
from nonebot.rule import Rule, command, keyword, regex
from .log import logger as log
-from .config import Config
+from .config import NetworkPost
from .utils.request import post_bytes
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
-SERVICE_DIR = Path(".") / "ATRI" / "data" / "service"
-SERVICES_DIR = SERVICE_DIR / "services"
+SERVICE_DIR = Path('.') / 'ATRI' / 'data' / 'service'
+SERVICES_DIR = SERVICE_DIR / 'services'
os.makedirs(SERVICE_DIR, exist_ok=True)
os.makedirs(SERVICES_DIR, exist_ok=True)
@@ -34,7 +44,10 @@ def _load_block_list() -> dict:
try:
data = json.loads(file.read_bytes())
except:
- data = {"user": {}, "group": {}}
+ data = {
+ "user": {},
+ "group": {}
+ }
with open(file, "w") as r:
r.write(json.dumps(data, indent=4))
return data
@@ -48,7 +61,7 @@ def _save_block_list(data: dict) -> None:
def _load_service_config(service: str, docs: str = None) -> dict:
- file_name = service.replace("/", "") + ".json"
+ file_name = service.replace('/', '') + ".json"
file = SERVICES_DIR / file_name
try:
data = json.loads(file.read_bytes())
@@ -58,7 +71,7 @@ def _load_service_config(service: str, docs: str = None) -> dict:
"docs": docs,
"enabled": True,
"disable_user": {},
- "disable_group": {},
+ "disable_group": {}
}
with open(file, "w") as r:
r.write(json.dumps(service_info, indent=4))
@@ -67,7 +80,7 @@ def _load_service_config(service: str, docs: str = None) -> dict:
def _save_service_config(service: str, data: dict) -> None:
- file_name = service.replace("/", "") + ".json"
+ file_name = service.replace('/', '') + ".json"
file = SERVICES_DIR / file_name
with open(file, "w") as r:
r.write(json.dumps(data, indent=4))
@@ -78,21 +91,20 @@ class Service:
集成一套服务管理,对功能信息进行持久化
计划搭配前端使用
"""
-
@staticmethod
def manual_reg_service(service: str, docs: str = None):
- file_name = service.replace("/", "") + ".json"
+ file_name = service.replace('/', '') + ".json"
file = SERVICES_DIR / file_name
service_info = {
"command": service,
"docs": docs,
"enabled": True,
"disable_user": {},
- "disable_group": {},
+ "disable_group": {}
}
with open(file, "w") as r:
r.write(json.dumps(service_info, indent=4))
-
+
@staticmethod
def auth_service(service: str, user: str, group: str = None) -> bool:
data = _load_service_config(service)
@@ -103,215 +115,193 @@ class Service:
return False
else:
return True
-
+
@staticmethod
- def control_service(
- service: str,
- is_global: bool,
- is_enabled: int,
- user: str = None,
- group: str = None,
- ) -> None:
+ def control_service(service: str,
+ is_global: bool,
+ is_enabled: int,
+ user: str = None,
+ group: str = None) -> None:
data = _load_service_config(service)
is_enabled = bool(is_enabled)
-
+
if is_global:
status = "disabled" if is_enabled else "enabled"
- data["enabled"] = is_enabled
+ data['enabled'] = is_enabled
log.info(f"\033[33mService: {service} has been {status}.\033[33m")
else:
if user:
if not is_enabled:
- data["disable_user"][user] = str(datetime.now())
- log.info(
- f"\033[33mNew service blocked user: {user}\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
+ data['disable_user'][user] = str(datetime.now())
+ log.info(f"\033[33mNew service blocked user: {user}\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
else:
- if user in data["disable_user"]:
- del data["disable_user"][user]
- log.info(
- f"\033[33mUser: {user} has been unblock\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
+ if user in data['disable_user']:
+ del data['disable_user'][user]
+ log.info(f"\033[33mUser: {user} has been unblock\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
else:
if not is_enabled:
- data["disable_group"][group] = str(datetime.now())
- log.info(
- f"\033[33mNew service blocked group: {group}\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
+ data['disable_group'][group] = str(datetime.now())
+ log.info(f"\033[33mNew service blocked group: {group}\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
else:
- if group in data["disable_group"]:
- del data["disable_group"][group]
- log.info(
- f"\033[33mGroup: {group} has been unblock\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
- )
+ if group in data['disable_group']:
+ del data['disable_group'][group]
+ log.info(f"\033[33mGroup: {group} has been unblock\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
_save_service_config(service, data)
-
+
@staticmethod
- def on_message(
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = True,
- state: Optional[T_State] = None,
- ) -> Type[Matcher]:
- matcher = Matcher.new(
- "message",
- Rule() & rule,
- permission or Permission(),
- temp=temp,
- priority=priority,
- block=block,
- handlers=handlers,
- default_state=state,
- )
+ def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = True,
+ state: Optional[T_State] = None) -> Type[Matcher]:
+ matcher = Matcher.new("message",
+ Rule() & rule,
+ permission or Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state)
return matcher
@staticmethod
- def on_notice(
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = False,
- state: Optional[T_State] = None,
- ) -> Type[Matcher]:
- matcher = Matcher.new(
- "notice",
- Rule() & rule,
- Permission(),
- temp=temp,
- priority=priority,
- block=block,
- handlers=handlers,
- default_state=state,
- )
+ def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = False,
+ state: Optional[T_State] = None) -> Type[Matcher]:
+ matcher = Matcher.new("notice",
+ Rule() & rule,
+ Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state)
return matcher
@staticmethod
- def on_request(
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = False,
- state: Optional[T_State] = None,
- ) -> Type[Matcher]:
- matcher = Matcher.new(
- "request",
- Rule() & rule,
- Permission(),
- temp=temp,
- priority=priority,
- block=block,
- handlers=handlers,
- default_state=state,
- )
+ def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = False,
+ state: Optional[T_State] = None) -> Type[Matcher]:
+ matcher = Matcher.new("request",
+ Rule() & rule,
+ Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state)
return matcher
@classmethod
- def on_command(
- cls,
- cmd: Union[str, Tuple[str, ...]],
- docs: Optional[str] = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
- **kwargs,
- ) -> Type[Matcher]:
+ def on_command(cls,
+ cmd: Union[str, Tuple[str, ...]],
+ docs: Optional[str] = None,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
+ **kwargs) -> Type[Matcher]:
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
- str(segment).lstrip()[len(state["_prefix"]["raw_command"]) :].lstrip()
- ) # type: ignore
+ str(segment).lstrip()
+ [len(state["_prefix"]["raw_command"]):].lstrip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
-
+
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
-
+
commands = set([cmd]) | (aliases or set())
_load_service_config(str(cmd), docs)
- return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs)
+ return cls.on_message(command(*commands) & rule,
+ handlers=handlers, **kwargs)
@classmethod
- def on_keyword(
- cls,
- keywords: Set[str],
- docs: Optional[str] = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- **kwargs,
- ) -> Type[Matcher]:
+ def on_keyword(cls,
+ keywords: Set[str],
+ docs: Optional[str] = None,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ **kwargs) -> Type[Matcher]:
_load_service_config(list(keywords)[0], docs)
return cls.on_message(keyword(*keywords) & rule, **kwargs)
-
+
@classmethod
- def on_regex(
- cls,
- pattern: str,
- flags: Union[int, re.RegexFlag] = 0,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- **kwargs,
- ) -> Type[Matcher]:
+ def on_regex(cls,
+ pattern: str,
+ flags: Union[int, re.RegexFlag] = 0,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ **kwargs) -> Type[Matcher]:
return on_message(regex(pattern, flags) & rule, **kwargs)
-
+
+
class NetworkPost:
- URL = f"http://{Config.NetworkPost.host}:" f"{Config.NetworkPost.port}/"
-
+ URL = (
+ f"http://{NetworkPost.host}:"
+ f"{NetworkPost.port}/"
+ )
+
@classmethod
- async def send_private_msg(
- cls, user_id: int, message: str, auto_escape: bool = False
- ) -> Dict[str, Any]:
+ async def send_private_msg(cls,
+ user_id: int,
+ message: str,
+ auto_escape: bool = False) -> Dict[str, Any]:
url = cls.URL + "send_private_msg?"
params = {
"user_id": user_id,
"message": message,
- "auto_escape": f"{auto_escape}",
+ "auto_escape": f"{auto_escape}"
}
result = json.loads(await post_bytes(url, params))
log.debug(result)
return result
@classmethod
- def send_group_msg(
- cls, group_id: int, message: Union[str], auto_escape: Optional[bool] = ...
- ) -> Dict[str, Any]:
+ def send_group_msg(cls,
+ group_id: int,
+ message: Union[str],
+ auto_escape: Optional[bool] = ...) -> Dict[str, Any]:
...
@classmethod
- async def send_msg(
- cls,
- message_type: Optional[str] = "",
- user_id: Optional[int] = None,
- group_id: Optional[int] = None,
- message=Union[str],
- auto_escape: bool = False,
- ) -> Dict[str, Any]:
+ async def send_msg(cls,
+ message_type: Optional[str] = "",
+ user_id: Optional[int] = None,
+ group_id: Optional[int] = None,
+ message = Union[str],
+ auto_escape: bool = False) -> Dict[str, Any]:
url = cls.URL + "send_msg?"
params = {
"message_type": "",
"user_id": user_id,
"group_id": group_id,
"message": message,
- "auto_escape": str(auto_escape),
+ "auto_escape": str(auto_escape)
}
result = json.loads(await post_bytes(url, params))
log.debug(result)
return result
+
class Dormant:
@staticmethod
def is_dormant() -> bool:
return False if is_sleep else True
-
+
@staticmethod
def control_dormant(is_enable: bool) -> None:
global is_sleep
@@ -319,38 +309,37 @@ class Service:
is_sleep = True
else:
is_sleep = False
-
+
+
class BlockSystem:
file_name = "ban.json"
path = SERVICE_DIR / file_name
-
+
@staticmethod
def auth_user(user: str) -> bool:
- return False if user in _load_block_list()["user"] else True
-
+ return False if user in _load_block_list()['user'] else True
+
@staticmethod
def auth_group(group: str) -> bool:
- return False if group in _load_block_list()["group"] else True
-
+ return False if group in _load_block_list()['group'] else True
+
@staticmethod
- def control_list(is_enabled: bool, user: str = None, group: str = None) -> None:
+ def control_list(is_enabled: bool,
+ user: str = None,
+ group: str = None) -> None:
data = _load_block_list()
if user:
if is_enabled:
- data["user"][user] = str(datetime.now())
- log.info(
- f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m"
- )
+ data['user'][user] = str(datetime.now())
+ log.info(f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m")
else:
- del data["user"][str(user)]
+ del data['user'][str(user)]
log.info(f"\033[33mUser {user} has been unblock.\033[33m")
elif group:
if is_enabled:
- data["group"][group] = str(datetime.now())
- log.info(
- f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m"
- )
+ data['group'][group] = str(datetime.now())
+ log.info(f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m")
else:
- del data["group"][str(group)]
+ del data['group'][str(group)]
log.info(f"\033[33mGroup {group} has been unblock.\033[33m")
_save_block_list(data)