summaryrefslogtreecommitdiff
path: root/ATRI/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/service.py')
-rw-r--r--ATRI/service.py323
1 files changed, 167 insertions, 156 deletions
diff --git a/ATRI/service.py b/ATRI/service.py
index 87e606c..79d3d42 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -3,17 +3,7 @@ import re
import json
from pathlib import Path
from datetime import datetime
-from typing import (
- Dict,
- Any,
- List,
- Set,
- Tuple,
- Type,
- Union,
- Optional,
- TYPE_CHECKING
-)
+from typing import Dict, Any, List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.plugin import on_message
@@ -28,8 +18,8 @@ if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
-SERVICE_DIR = Path('.') / 'ATRI' / 'data' / 'service'
-SERVICES_DIR = SERVICE_DIR / 'services'
+SERVICE_DIR = Path(".") / "ATRI" / "data" / "service"
+SERVICES_DIR = SERVICE_DIR / "services"
os.makedirs(SERVICE_DIR, exist_ok=True)
os.makedirs(SERVICES_DIR, exist_ok=True)
@@ -44,10 +34,7 @@ def _load_block_list() -> dict:
try:
data = json.loads(file.read_bytes())
except:
- data = {
- "user": {},
- "group": {}
- }
+ data = {"user": {}, "group": {}}
with open(file, "w") as r:
r.write(json.dumps(data, indent=4))
return data
@@ -61,7 +48,7 @@ def _save_block_list(data: dict) -> None:
def _load_service_config(service: str, docs: str = None) -> dict:
- file_name = service.replace('/', '') + ".json"
+ file_name = service.replace("/", "") + ".json"
file = SERVICES_DIR / file_name
try:
data = json.loads(file.read_bytes())
@@ -71,7 +58,7 @@ def _load_service_config(service: str, docs: str = None) -> dict:
"docs": docs,
"enabled": True,
"disable_user": {},
- "disable_group": {}
+ "disable_group": {},
}
with open(file, "w") as r:
r.write(json.dumps(service_info, indent=4))
@@ -80,7 +67,7 @@ def _load_service_config(service: str, docs: str = None) -> dict:
def _save_service_config(service: str, data: dict) -> None:
- file_name = service.replace('/', '') + ".json"
+ file_name = service.replace("/", "") + ".json"
file = SERVICES_DIR / file_name
with open(file, "w") as r:
r.write(json.dumps(data, indent=4))
@@ -91,20 +78,21 @@ class Service:
集成一套服务管理,对功能信息进行持久化
计划搭配前端使用
"""
+
@staticmethod
def manual_reg_service(service: str, docs: str = None):
- file_name = service.replace('/', '') + ".json"
+ file_name = service.replace("/", "") + ".json"
file = SERVICES_DIR / file_name
service_info = {
"command": service,
"docs": docs,
"enabled": True,
"disable_user": {},
- "disable_group": {}
+ "disable_group": {},
}
with open(file, "w") as r:
r.write(json.dumps(service_info, indent=4))
-
+
@staticmethod
def auth_service(service: str, user: str, group: str = None) -> bool:
data = _load_service_config(service)
@@ -115,193 +103,215 @@ class Service:
return False
else:
return True
-
+
@staticmethod
- def control_service(service: str,
- is_global: bool,
- is_enabled: int,
- user: str = None,
- group: str = None) -> None:
+ def control_service(
+ service: str,
+ is_global: bool,
+ is_enabled: int,
+ user: str = None,
+ group: str = None,
+ ) -> None:
data = _load_service_config(service)
is_enabled = bool(is_enabled)
-
+
if is_global:
status = "disabled" if is_enabled else "enabled"
- data['enabled'] = is_enabled
+ data["enabled"] = is_enabled
log.info(f"\033[33mService: {service} has been {status}.\033[33m")
else:
if user:
if not is_enabled:
- data['disable_user'][user] = str(datetime.now())
- log.info(f"\033[33mNew service blocked user: {user}\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
+ data["disable_user"][user] = str(datetime.now())
+ log.info(
+ f"\033[33mNew service blocked user: {user}\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
+ )
else:
- if user in data['disable_user']:
- del data['disable_user'][user]
- log.info(f"\033[33mUser: {user} has been unblock\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
+ if user in data["disable_user"]:
+ del data["disable_user"][user]
+ log.info(
+ f"\033[33mUser: {user} has been unblock\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
+ )
else:
if not is_enabled:
- data['disable_group'][group] = str(datetime.now())
- log.info(f"\033[33mNew service blocked group: {group}\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
+ data["disable_group"][group] = str(datetime.now())
+ log.info(
+ f"\033[33mNew service blocked group: {group}\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
+ )
else:
- if group in data['disable_group']:
- del data['disable_group'][group]
- log.info(f"\033[33mGroup: {group} has been unblock\033[33m"
- f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
+ if group in data["disable_group"]:
+ del data["disable_group"][group]
+ log.info(
+ f"\033[33mGroup: {group} has been unblock\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m"
+ )
_save_service_config(service, data)
-
+
@staticmethod
- def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = True,
- state: Optional[T_State] = None) -> Type[Matcher]:
- matcher = Matcher.new("message",
- Rule() & rule,
- permission or Permission(),
- temp=temp,
- priority=priority,
- block=block,
- handlers=handlers,
- default_state=state)
+ def on_message(
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = True,
+ state: Optional[T_State] = None,
+ ) -> Type[Matcher]:
+ matcher = Matcher.new(
+ "message",
+ Rule() & rule,
+ permission or Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state,
+ )
return matcher
@staticmethod
- def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = False,
- state: Optional[T_State] = None) -> Type[Matcher]:
- matcher = Matcher.new("notice",
- Rule() & rule,
- Permission(),
- temp=temp,
- priority=priority,
- block=block,
- handlers=handlers,
- default_state=state)
+ def on_notice(
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = False,
+ state: Optional[T_State] = None,
+ ) -> Type[Matcher]:
+ matcher = Matcher.new(
+ "notice",
+ Rule() & rule,
+ Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state,
+ )
return matcher
@staticmethod
- def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None,
- *,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- block: bool = False,
- state: Optional[T_State] = None) -> Type[Matcher]:
- matcher = Matcher.new("request",
- Rule() & rule,
- Permission(),
- temp=temp,
- priority=priority,
- block=block,
- handlers=handlers,
- default_state=state)
+ def on_request(
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ *,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ block: bool = False,
+ state: Optional[T_State] = None,
+ ) -> Type[Matcher]:
+ matcher = Matcher.new(
+ "request",
+ Rule() & rule,
+ Permission(),
+ temp=temp,
+ priority=priority,
+ block=block,
+ handlers=handlers,
+ default_state=state,
+ )
return matcher
@classmethod
- def on_command(cls,
- cmd: Union[str, Tuple[str, ...]],
- docs: Optional[str] = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
- **kwargs) -> Type[Matcher]:
+ def on_command(
+ cls,
+ cmd: Union[str, Tuple[str, ...]],
+ docs: Optional[str] = None,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
+ **kwargs,
+ ) -> Type[Matcher]:
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
- str(segment).lstrip()
- [len(state["_prefix"]["raw_command"]):].lstrip()) # type: ignore
+ str(segment).lstrip()[len(state["_prefix"]["raw_command"]) :].lstrip()
+ ) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
-
+
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
-
+
commands = set([cmd]) | (aliases or set())
_load_service_config(str(cmd), docs)
- return cls.on_message(command(*commands) & rule,
- handlers=handlers, **kwargs)
+ return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs)
@classmethod
- def on_keyword(cls,
- keywords: Set[str],
- docs: Optional[str] = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- **kwargs) -> Type[Matcher]:
+ def on_keyword(
+ cls,
+ keywords: Set[str],
+ docs: Optional[str] = None,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ **kwargs,
+ ) -> Type[Matcher]:
_load_service_config(list(keywords)[0], docs)
return cls.on_message(keyword(*keywords) & rule, **kwargs)
-
+
@classmethod
- def on_regex(cls,
- pattern: str,
- flags: Union[int, re.RegexFlag] = 0,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- **kwargs) -> Type[Matcher]:
+ def on_regex(
+ cls,
+ pattern: str,
+ flags: Union[int, re.RegexFlag] = 0,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ **kwargs,
+ ) -> Type[Matcher]:
return on_message(regex(pattern, flags) & rule, **kwargs)
-
-
+
class NetworkPost:
- URL = (
- f"http://{Config.NetworkPost.host}:"
- f"{Config.NetworkPost.port}/"
- )
-
+ URL = f"http://{Config.NetworkPost.host}:" f"{Config.NetworkPost.port}/"
+
@classmethod
- async def send_private_msg(cls,
- user_id: int,
- message: str,
- auto_escape: bool = False) -> Dict[str, Any]:
+ async def send_private_msg(
+ cls, user_id: int, message: str, auto_escape: bool = False
+ ) -> Dict[str, Any]:
url = cls.URL + "send_private_msg?"
params = {
"user_id": user_id,
"message": message,
- "auto_escape": f"{auto_escape}"
+ "auto_escape": f"{auto_escape}",
}
result = json.loads(await post_bytes(url, params))
log.debug(result)
return result
@classmethod
- def send_group_msg(cls,
- group_id: int,
- message: Union[str],
- auto_escape: Optional[bool] = ...) -> Dict[str, Any]:
+ def send_group_msg(
+ cls, group_id: int, message: Union[str], auto_escape: Optional[bool] = ...
+ ) -> Dict[str, Any]:
...
@classmethod
- async def send_msg(cls,
- message_type: Optional[str] = "",
- user_id: Optional[int] = None,
- group_id: Optional[int] = None,
- message = Union[str],
- auto_escape: bool = False) -> Dict[str, Any]:
+ async def send_msg(
+ cls,
+ message_type: Optional[str] = "",
+ user_id: Optional[int] = None,
+ group_id: Optional[int] = None,
+ message=Union[str],
+ auto_escape: bool = False,
+ ) -> Dict[str, Any]:
url = cls.URL + "send_msg?"
params = {
"message_type": "",
"user_id": user_id,
"group_id": group_id,
"message": message,
- "auto_escape": str(auto_escape)
+ "auto_escape": str(auto_escape),
}
result = json.loads(await post_bytes(url, params))
log.debug(result)
return result
-
class Dormant:
@staticmethod
def is_dormant() -> bool:
return False if is_sleep else True
-
+
@staticmethod
def control_dormant(is_enable: bool) -> None:
global is_sleep
@@ -309,37 +319,38 @@ class Service:
is_sleep = True
else:
is_sleep = False
-
-
+
class BlockSystem:
file_name = "ban.json"
path = SERVICE_DIR / file_name
-
+
@staticmethod
def auth_user(user: str) -> bool:
- return False if user in _load_block_list()['user'] else True
-
+ return False if user in _load_block_list()["user"] else True
+
@staticmethod
def auth_group(group: str) -> bool:
- return False if group in _load_block_list()['group'] else True
-
+ return False if group in _load_block_list()["group"] else True
+
@staticmethod
- def control_list(is_enabled: bool,
- user: str = None,
- group: str = None) -> None:
+ def control_list(is_enabled: bool, user: str = None, group: str = None) -> None:
data = _load_block_list()
if user:
if is_enabled:
- data['user'][user] = str(datetime.now())
- log.info(f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m")
+ data["user"][user] = str(datetime.now())
+ log.info(
+ f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m"
+ )
else:
- del data['user'][str(user)]
+ del data["user"][str(user)]
log.info(f"\033[33mUser {user} has been unblock.\033[33m")
elif group:
if is_enabled:
- data['group'][group] = str(datetime.now())
- log.info(f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m")
+ data["group"][group] = str(datetime.now())
+ log.info(
+ f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m"
+ )
else:
- del data['group'][str(group)]
+ del data["group"][str(group)]
log.info(f"\033[33mGroup {group} has been unblock.\033[33m")
_save_block_list(data)