summaryrefslogtreecommitdiff
path: root/ATRI/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/service.py')
-rw-r--r--ATRI/service.py130
1 files changed, 74 insertions, 56 deletions
diff --git a/ATRI/service.py b/ATRI/service.py
index ab870fd..df2e8d5 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -19,7 +19,7 @@ from nonebot.typing import T_State, T_Handler, T_RuleChecker
from nonebot.rule import Rule, command, keyword
from .log import logger as log
-from .config import config
+from .config import Config
from .utils.request import post_bytes
if TYPE_CHECKING:
@@ -32,8 +32,8 @@ os.makedirs(SERVICE_DIR, exist_ok=True)
os.makedirs(SERVICES_DIR, exist_ok=True)
-matcher_list: list = []
is_sleep: bool = False
+matcher_list: list = []
def _load_block_list() -> dict:
@@ -43,8 +43,8 @@ def _load_block_list() -> dict:
data = json.loads(file.read_bytes())
except:
data = {
- "user": [],
- "group": []
+ "user": {},
+ "group": {}
}
with open(file, "w") as r:
r.write(json.dumps(data, indent=4))
@@ -59,16 +59,17 @@ def _save_block_list(data: dict) -> None:
def _load_service_config(service: str, docs: str = None) -> dict:
- file_name = service + ".json"
+ file_name = service.replace('/', '') + ".json"
file = SERVICES_DIR / file_name
try:
data = json.loads(file.read_bytes())
except:
service_info = {
- "name": service,
+ "command": service,
"docs": docs,
- "disable_user": _load_block_list()['user'],
- "disable_group": _load_block_list()['group']
+ "enabled": True,
+ "disable_user": {},
+ "disable_group": {}
}
with open(file, "w") as r:
r.write(json.dumps(service_info, indent=4))
@@ -77,7 +78,7 @@ def _load_service_config(service: str, docs: str = None) -> dict:
def _save_service_config(service: str, data: dict) -> None:
- file_name = service + ".json"
+ file_name = service.replace('/', '') + ".json"
file = SERVICES_DIR / file_name
with open(file, "w") as r:
r.write(json.dumps(data, indent=4))
@@ -85,18 +86,19 @@ def _save_service_config(service: str, data: dict) -> None:
class Service:
"""
- 集成一套服务管理,block准确至个人
+ 集成一套服务管理,对功能信息进行持久化
计划搭配前端使用
"""
@staticmethod
def manual_reg_service(service: str):
- file_name = service + ".json"
+ file_name = service.replace('/', '') + ".json"
file = SERVICES_DIR / file_name
service_info = {
"name": service,
"docs": None,
- "disable_user": _load_block_list()['user'],
- "disable_group": _load_block_list()['group']
+ "enabled": True,
+ "disable_user": {},
+ "disable_group": {}
}
with open(file, "w") as r:
r.write(json.dumps(service_info, indent=4))
@@ -107,16 +109,36 @@ class Service:
return False if group in data["disable_group"] else True
@staticmethod
- def control_service(service: str, group: int, is_enable: bool) -> None:
+ def control_service(service: str,
+ is_global: bool,
+ is_enabled: bool,
+ user: Optional[int] = None,
+ group: Optional[int] = None) -> None:
data = _load_service_config(service)
- sv_group = data.get('disable_group', [])
- if is_enable:
- sv_group.remove(group)
- log.info(f"Service {service} has been enabled.")
+
+ if is_global:
+ status = "disabled" if is_enabled else "enabled"
+ data['enbaled'] = is_enabled
+ log.info(f"Service: {service} has been {status}.")
else:
- sv_group.append(group)
- log.info(f"Service {service} has been disabled.")
- data["disable_group"] = sv_group
+ if user:
+ if is_enabled:
+ data['disable_user'][user] = str(datetime.now())
+ log.info(f"New service blocked user: {user}"
+ f" | Service: {service} | Time: {datetime.now()}")
+ else:
+ del data['disable_user'][user]
+ log.info(f"User: {user} has been unblock"
+ f" | Service: {service} | Time: {datetime.now()}")
+ else:
+ if is_enabled:
+ data['disable_group'][group] = str(datetime.now())
+ log.info(f"New service blocked group: {group}"
+ f" | Service: {service} | Time: {datetime.now()}")
+ else:
+ del data['disable_group'][group]
+ log.info(f"Group: {group} has been unblock"
+ f" | Service: {service} | Time: {datetime.now()}")
_save_service_config(service, data)
@staticmethod
@@ -139,9 +161,7 @@ class Service:
return matcher
@staticmethod
- def on_notice(name: str,
- docs: Optional[str] = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[T_Handler]] = None,
temp: bool = False,
@@ -156,14 +176,10 @@ class Service:
block=block,
handlers=handlers,
default_state=state)
- _load_service_config(name, docs)
- matcher_list.append(name)
return matcher
@staticmethod
- def on_request(name: str,
- docs: Optional[str] = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[T_Handler]] = None,
temp: bool = False,
@@ -178,13 +194,10 @@ class Service:
block=block,
handlers=handlers,
default_state=state)
- _load_service_config(name, docs)
- matcher_list.append(name)
return matcher
@classmethod
def on_command(cls,
- name: str,
cmd: Union[str, Tuple[str, ...]],
docs: Optional[str] = None,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
@@ -203,34 +216,31 @@ class Service:
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
- _load_service_config(name, docs)
- matcher_list.append(name)
+ _load_service_config(str(cmd), docs)
return cls.on_message(command(*commands) & rule,
handlers=handlers, **kwargs)
@classmethod
def on_keyword(cls,
- name: str,
keywords: Set[str],
docs: Optional[str] = None,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
**kwargs) -> Type[Matcher]:
- _load_service_config(name, docs)
- matcher_list.append(name)
+ _load_service_config(list(keywords)[0], docs)
return cls.on_message(keyword(*keywords) & rule, **kwargs)
class NetworkPost:
URL = (
- f"http://{config['NetworkPost']['host']}:"
- f"{config['NetworkPost']['port']}/"
+ f"http://{Config.NetworkPost.host}:"
+ f"{Config.NetworkPost.port}/"
)
@classmethod
async def send_private_msg(cls,
user_id: int,
message: str,
- auto_escape: bool = False): # -> Dict[str, Any]
+ auto_escape: bool = False) -> Dict[str, Any]:
url = cls.URL + "send_private_msg?"
params = {
"user_id": user_id,
@@ -249,13 +259,23 @@ class Service:
...
@classmethod
- def send_msg(cls,
- message_type: Optional[str] = ...,
- user_id: Optional[int] = ...,
- group_id: Optional[int] = ...,
+ async def send_msg(cls,
+ message_type: Optional[str] = "",
+ user_id: Optional[int] = None,
+ group_id: Optional[int] = None,
message = Union[str],
- auto_escape: bool = ...) -> Dict[str, Any]:
- ...
+ auto_escape: bool = False) -> Dict[str, Any]:
+ url = cls.URL + "send_msg?"
+ params = {
+ "message_type": "",
+ "user_id": user_id,
+ "group_id": group_id,
+ "message": message,
+ "auto_escape": str(auto_escape)
+ }
+ result = json.loads(await post_bytes(url, params))
+ log.debug(result)
+ return result
@classmethod
def delete_msg(cls,
@@ -448,24 +468,22 @@ class Service:
@classmethod
def control_list(cls,
- is_enable: bool,
+ is_enabled: bool,
user: Optional[int] = None,
group: Optional[int] = None) -> None:
data = _load_block_list()
if user:
- if is_enable:
- data['user'][user] = datetime.now().__str__
+ if is_enabled:
+ data['user'][user] = str(datetime.now())
log.info(f"New blocked user: {user} | Time: {datetime.now()}")
else:
- del data[user]
+ del data['user'][str(user)]
log.info(f"User {user} has been unblock.")
elif group:
- if is_enable:
- data['group'][group] = datetime.now().__str__
+ if is_enabled:
+ data['group'][group] = str(datetime.now())
log.info(f"New blocked group: {group} | Time: {datetime.now()}")
else:
- del data[user]
+ del data['group'][str(group)]
log.info(f"Group {group} has been unblock.")
-
- with open(cls.path, "w") as r:
- json.dump(data, r)
+ _save_block_list(data)