summaryrefslogtreecommitdiff
path: root/ATRI
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2021-04-11 15:03:35 +0800
committerKyomotoi <[email protected]>2021-04-11 15:03:35 +0800
commitcbf4900c5c3b1de65e55ce480d4b453091528e84 (patch)
tree6bde5ad2d8f6f4010cb317b2b72be36f17b503d4 /ATRI
parent212424da74fecbd77c9824e422ea8b028b330c9e (diff)
downloadATRI-cbf4900c5c3b1de65e55ce480d4b453091528e84.tar.gz
ATRI-cbf4900c5c3b1de65e55ce480d4b453091528e84.tar.bz2
ATRI-cbf4900c5c3b1de65e55ce480d4b453091528e84.zip
🚀✨⚡️🎨🐛 正式推送
Diffstat (limited to 'ATRI')
-rw-r--r--ATRI/config.py1
-rw-r--r--ATRI/exceptions.py2
-rw-r--r--ATRI/plugins/admin.py227
-rw-r--r--ATRI/plugins/anime-search/__init__.py17
-rw-r--r--ATRI/plugins/call-owner.py27
-rw-r--r--ATRI/plugins/code-runner.py18
-rw-r--r--ATRI/plugins/curse/__init__.py27
-rw-r--r--ATRI/plugins/drifting-bottle/__init__.py5
-rw-r--r--ATRI/plugins/essential.py160
-rw-r--r--ATRI/plugins/funny.py53
-rw-r--r--ATRI/plugins/github.py3
-rw-r--r--ATRI/plugins/help.py12
-rw-r--r--ATRI/plugins/hitokoto.py10
-rw-r--r--ATRI/plugins/key-repo/__init__.py23
-rw-r--r--ATRI/plugins/nsfw.py16
-rw-r--r--ATRI/plugins/rich/__init__.py19
-rw-r--r--ATRI/plugins/status.py24
-rw-r--r--ATRI/plugins/utils/__init__.py17
-rw-r--r--ATRI/plugins/utils/data_source.py19
-rw-r--r--ATRI/rule.py20
-rw-r--r--ATRI/service.py78
21 files changed, 450 insertions, 328 deletions
diff --git a/ATRI/config.py b/ATRI/config.py
index 07493e4..07af8f3 100644
--- a/ATRI/config.py
+++ b/ATRI/config.py
@@ -42,6 +42,7 @@ class Config(BaseConfig):
config: dict = config['NsfwCheck']
enabled: bool = bool(config.get('enabled', False))
+ passing_rate: float = float(config.get('passing_rate', 0.8))
host: str = config.get('host', '127.0.0.1')
port: int = int(config.get('port', 5000))
diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py
index d7c75eb..6026cfa 100644
--- a/ATRI/exceptions.py
+++ b/ATRI/exceptions.py
@@ -95,7 +95,7 @@ async def _track_error(matcher: Matcher,
"[WARNING] 这是一个错误... ;w;\n"
f"追踪ID: {track_id}\n"
f"触发原因: {prompt}\n"
- "键入 来杯红茶 以联系维护者"
+ "键入 /来杯红茶 以联系维护者"
)
await bot.send(event, msg)
diff --git a/ATRI/plugins/admin.py b/ATRI/plugins/admin.py
index cd902e4..3022723 100644
--- a/ATRI/plugins/admin.py
+++ b/ATRI/plugins/admin.py
@@ -1,92 +1,40 @@
-import os
import json
-import time
+import asyncio
+from random import randint
from pathlib import Path
-from datetime import datetime
from nonebot.permission import SUPERUSER
from nonebot.adapters.cqhttp import (
Bot,
MessageEvent,
- GroupMessageEvent
+ GroupMessageEvent,
+ PrivateMessageEvent
)
from nonebot.typing import T_State
from ATRI.config import Config
from ATRI.service import Service as sv
-from ATRI.exceptions import WriteError, load_error
+from ATRI.exceptions import load_error
from ATRI.utils.file import open_file
-from ATRI.log import (
- logger,
- LOGGER_DIR,
- NOW_TIME
-)
-
-
-ADMIN_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'admin'
-os.makedirs(ADMIN_DIR, exist_ok=True)
-
-# 收集bot所在的群聊聊天记录
-chat_monitor = sv.on_message()
-@chat_monitor.handle()
-async def _chat_monitor(bot: Bot, event: GroupMessageEvent) -> None:
- now_time = datetime.now().strftime('%Y-%m-%d')
- GROUP_DIR = ADMIN_DIR / f"{event.group_id}"
- path = GROUP_DIR / f"{now_time}.chat.json"
- now_time = datetime.now().strftime('%Y%m%d-%H%M%S')
-
- if not GROUP_DIR.exists():
- GROUP_DIR.mkdir()
-
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
- data[event.message_id] = {
- "date": now_time,
- "time": time.time(),
- "post_type": event.post_type,
- "sub_type": event.sub_type,
- "user_id": event.user_id,
- "group_id": event.group_id,
- "message_type": event.message_type,
- "message": event.message.__str__(),
- "raw_message": event.raw_message,
- "font": event.font,
- "sender": {
- "user_id": event.sender.user_id,
- "nickname": event.sender.nickname,
- "sex": event.sender.sex,
- "age": event.sender.age,
- "card": event.sender.card,
- "area": event.sender.area,
- "level": event.sender.level,
- "role": event.sender.role,
- "title": event.sender.title
- },
- "to_me": event.to_me
- }
- try:
- with open(path, 'w', encoding='utf-8') as r:
- r.write(
- json.dumps(
- data, indent=4
- )
- )
- logger.debug(f"写入消息成功,id: {event.message_id}")
- except WriteError:
- logger.error("消息记录失败,可能是缺少文件的原因!")
- else:
- pass
+ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
-ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
+__doc__ = """
+好友申请处理
+权限组:维护者
+用法:
+ /fr list
+ /fr (y/n) reqid
+补充:
+ reqid: 申请码
+"""
request_friend = sv.on_command(
- cmd="好友申请",
- docs="好友申请处理",
+ cmd="friendreq",
+ aliases={'fr'},
+ docs=__doc__,
permission=SUPERUSER
)
@@ -124,9 +72,20 @@ async def _request_friend(bot: Bot, event: MessageEvent) -> None:
await request_friend.finish("阿...请检查输入——!")
+__doc__ = """
+群聊申请处理
+权限组:维护者
+用法:
+ /gr list
+ /gr (y/n) reqid
+补充:
+ reqid: 申请码
+"""
+
request_group = sv.on_command(
- cmd="群聊申请",
- docs="群聊申请处理",
+ cmd="groupreq",
+ aliases={'gr'},
+ docs=__doc__,
permission=SUPERUSER
)
@@ -174,9 +133,17 @@ async def _request_group(bot: Bot, event: MessageEvent) -> None:
await request_friend.finish("阿...请检查输入——!")
+__doc__ = """
+广播
+权限组:维护者
+用法:
+ /bc 内容
+"""
+
broadcast = sv.on_command(
- cmd="/bc",
- docs="广播\n用法:/bc 广播内容",
+ cmd="boradcast",
+ aliases={'bc'},
+ docs=__doc__,
permission=SUPERUSER
)
@@ -194,6 +161,7 @@ async def _bd(bot: Bot, event: MessageEvent, state: T_State) -> None:
err_list = []
for group in group_list:
+ await asyncio.sleep(randint(0, 2))
try:
await bot.send_group_msg(group_id=group["group_id"],
message=msg)
@@ -215,9 +183,16 @@ async def _bd(bot: Bot, event: MessageEvent, state: T_State) -> None:
await broadcast.finish(repo_msg)
+__doc__ = """
+错误堆栈查看
+权限组:维护者
+用法:
+ /track 追踪ID
+"""
+
track_error = sv.on_command(
- cmd="/track",
- docs="报错堆栈查看\n用法:/track 追踪ID",
+ cmd="track",
+ docs=__doc__,
permission=SUPERUSER
)
@@ -247,19 +222,33 @@ async def _(bot: Bot, event: MessageEvent, state: T_State) -> None:
await track_error.finish(msg0)
+__doc__ = """
+获取控制台信息
+权限组:维护者
+用法:
+ /gl level line
+补充:
+ level: 等级(info, warning, error, debug)
+ line: 行数(最近20行:-20)
+"""
+
get_log = sv.on_command(
- cmd="/getlog",
- docs="获取控制台信息\n用法:/getlog 等级:info,warning,debug 行数:比如-20即最近20行",
+ cmd="getlog",
+ aliases={'gl'},
+ docs=__doc__,
permission=SUPERUSER
)
@get_log.handle()
-async def _get_log(bot: Bot, event: MessageEvent) -> None:
+async def _get_log(bot: Bot, event: GroupMessageEvent) -> None:
+ user = str(event.user_id)
+ group = event.group_id
+ node = []
msg = str(event.message).split(" ")
try:
rows = msg[1]
except:
- await get_log.finish("格式/getlog level rows")
+ await get_log.finish("格式/gl level rows")
if msg[0] == "info":
level = "info"
@@ -270,22 +259,35 @@ async def _get_log(bot: Bot, event: MessageEvent) -> None:
elif msg[0] == "debug":
level = "debug"
else:
- await get_log.finish("格式/getlog level rows")
+ await get_log.finish("格式/gl level rows")
path = LOGGER_DIR / level / f"{NOW_TIME}-INFO.log" # type: ignore
logs = await open_file(path, "readlines")
try:
content = logs[int(rows):] # type: ignore
+ repo = "\n".join(content).replace("ATRI", "ATRI")
+ node = [{
+ "type": "node",
+ "data": {"name": "ERROR REPO", "uin": user, "content": repo}
+ }]
except IndexError:
await get_log.finish(f"行数错误...max: {len(logs)}") # type: ignore
- await get_log.finish("\n".join(content).replace("ATRI", "ATRI")) # type: ignore
+ await bot.send_group_forward_msg(group_id=group, messages=node)
+__doc__ = """
+紧急停机
+权限组:维护者
+用法:
+ /st
+"""
+
shutdown = sv.on_command(
- cmd="/st",
- docs="紧急停机",
+ cmd="shutdown",
+ aliases={'st'},
+ docs=__doc__,
permission=SUPERUSER
)
@@ -308,16 +310,20 @@ __doc__ = """
懒得和你废话,block了
权限组:维护者
用法:
- /b user,group 0,1
+ /b u+uid,g+gid 0,1
补充:
- user:QQ号
- group:QQ群号
+ uid:QQ号
+ gid:QQ群号
0,1:对应布尔值False, True
范围为全局
+示例:
+ /b u114514 1
+ 执行对QQ号为114514的封禁
"""
block = sv.on_command(
- cmd="/b",
+ cmd="block",
+ aliases={'b'},
docs="懒得和你废话,block了\n用法:/b u,g 0,1",
permission=SUPERUSER
)
@@ -326,7 +332,7 @@ block = sv.on_command(
async def _block(bot: Bot, event: MessageEvent) -> None:
msg = str(event.message).split(' ')
_type = msg[0]
- arg = int(msg[1])
+ arg = msg[1]
is_enabled = bool(int(msg[2]))
b_type = ""
@@ -349,7 +355,7 @@ __doc__ = """
权限组:维护者,群管理
用法:
对于维护者:
- /s 目标指令 u+int,g+int,global 0,1
+ /s 目标指令 u+uid,g+gid,global 0,1
对于群管理:
/s 目标指令 0,1
补充:
@@ -359,44 +365,53 @@ __doc__ = """
0,1:对应布尔值False, True
示例:
对于维护者:
- /s /status u123456789 0
+ /s /status u123456789 1
对于群管理:
- /s /status 0
+ /s /status 1
"""
service_control = sv.on_command(
- cmd="/s",
+ cmd='service',
+ aliases={'s'},
docs=__doc__,
permission=SUPERUSER
)
@service_control.handle()
-async def _service_control(bot: Bot, event: MessageEvent) -> None:
+async def _service_control(bot: Bot, event: GroupMessageEvent) -> None:
msg = str(event.message).split(' ')
- user = event.user_id
+ user = str(event.user_id)
cmd = msg[0]
_type = msg[1]
- is_enabled = bool(msg[2])
- status = "封禁" if is_enabled else "解封"
+ if msg[0] == "":
+ await service_control.finish('请检查输入~!')
if user in Config.BotSelfConfig.superusers:
+ is_enabled = int(msg[2])
+ status = "启用" if bool(is_enabled) else "禁用"
+
if _type == "global":
sv.control_service(cmd, True, is_enabled)
+ await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
else:
+ print(_type)
if "u" in _type:
qq = _type.replace('u', '')
- sv.control_service(cmd, False, is_enabled, user=int(qq))
+ sv.control_service(cmd, False, is_enabled, user=qq)
elif "g" in _type:
group = _type.replace('g', '')
- sv.control_service(cmd, False, is_enabled, group=int(group))
+ sv.control_service(cmd, False, is_enabled, group=group)
else:
await service_control.finish("请检查输入~!")
+ await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
else:
- if isinstance(event, GroupMessageEvent):
- group = event.group_id
- sv.control_service(cmd, False, bool(_type), group=group)
- else:
- await service_control.finish("此功能仅在群聊中触发")
-
- await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
+ group = str(event.group_id)
+ is_enabled = int(_type)
+ sv.control_service(cmd, False, is_enabled, group=group)
+ status = "启用" if bool(is_enabled) else "禁用"
+ await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
+
+@service_control.handle()
+async def _serv(bot: Bot, event: PrivateMessageEvent) -> None:
+ await service_control.finish("此功能仅在群聊中触发")
diff --git a/ATRI/plugins/anime-search/__init__.py b/ATRI/plugins/anime-search/__init__.py
index 5759f19..eb3dc56 100644
--- a/ATRI/plugins/anime-search/__init__.py
+++ b/ATRI/plugins/anime-search/__init__.py
@@ -7,7 +7,7 @@ from nonebot.adapters.cqhttp.message import Message
from nonebot.typing import T_State
from ATRI.service import Service as sv
-from ATRI.rule import is_block, is_in_dormant
+from ATRI.rule import is_in_service
from ATRI.exceptions import RequestTimeOut
from ATRI.utils.request import get_bytes
@@ -17,10 +17,17 @@ from .data_source import to_simple_string
URL = "https://trace.moe/api/search?url="
+__doc__ = """
+以图搜番
+权限组:所有人
+用法:
+ /anime
+"""
+
anime_search = sv.on_command(
- cmd="/anime",
- docs="以图搜番",
- rule=is_block() & is_in_dormant()
+ cmd="anime",
+ docs=__doc__,
+ rule=is_in_service('anime')
)
@anime_search.handle()
@@ -31,7 +38,7 @@ async def _anime_search(bot: Bot,
if msg:
state["msg"] = msg
-@anime_search.got("msg", prompt="请告诉咱目标图片~!")
+@anime_search.got("msg", prompt="图呢?")
async def _(bot: Bot,
event: MessageEvent,
state: T_State) -> None:
diff --git a/ATRI/plugins/call-owner.py b/ATRI/plugins/call-owner.py
index 2e12a1e..488e5a3 100644
--- a/ATRI/plugins/call-owner.py
+++ b/ATRI/plugins/call-owner.py
@@ -6,20 +6,20 @@ from nonebot.adapters.cqhttp import (
)
from ATRI.service import Service as sv
-from ATRI.rule import is_block
from ATRI.config import Config
from ATRI.utils.apscheduler import scheduler
from ATRI.utils.list import count_list
repo_list = []
+__doc__ = """
+给维护者留言
+权限组:所有人
+用法:
+ /来杯红茶 (msg)
+"""
-
-repo = sv.on_command(
- cmd="来杯红茶",
- docs="给维护者留言",
- rule=is_block()
-)
+repo = sv.on_command(cmd="来杯红茶", docs=__doc__)
@repo.handle()
async def _repo(bot: Bot, event: MessageEvent, state: T_State) -> None:
@@ -54,17 +54,24 @@ async def _repo_(bot: Bot, event: MessageEvent, state: T_State) -> None:
)
async def _() -> None:
global repo_list
- repo_list = []
+ repo_list.clear()
+
+__doc__ = """
+重置给维护者的留言次数
+权限组:维护者
+用法:
+ /重置红茶
+"""
reset_repo = sv.on_command(
cmd="重置红茶",
- docs="重置给维护者的留言次数",
+ docs=__doc__,
permission=SUPERUSER
)
@reset_repo.handle()
async def _reset_repo(bot: Bot, event: MessageEvent) -> None:
global repo_list
- repo_list = []
+ repo_list.clear()
await reset_repo.finish("红茶重置完成~!")
diff --git a/ATRI/plugins/code-runner.py b/ATRI/plugins/code-runner.py
index 6da97b0..367a144 100644
--- a/ATRI/plugins/code-runner.py
+++ b/ATRI/plugins/code-runner.py
@@ -4,8 +4,8 @@ Idea from: https://github.com/cczu-osa/aki
import json
from nonebot.adapters.cqhttp import Bot, MessageEvent
+from ATRI.rule import is_in_service
from ATRI.service import Service as sv
-from ATRI.rule import is_block, is_in_dormant
from ATRI.utils.request import post_bytes
from ATRI.exceptions import RequestTimeOut
@@ -40,10 +40,20 @@ SUPPORTED_LANGUAGES = {
}
+__doc__ = """
+在线运行代码
+权限组:所有人
+用法:
+ /code (lang) (code)
+示例:
+ /code python
+ print('Hello world!')
+"""
+
code_runner = sv.on_command(
- cmd="/code",
- docs="在线运行代码",
- rule=is_block() & is_in_dormant()
+ cmd="code",
+ docs=__doc__,
+ rule=is_in_service('code')
)
@code_runner.handle()
diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py
index 6db80cc..ee4ea58 100644
--- a/ATRI/plugins/curse/__init__.py
+++ b/ATRI/plugins/curse/__init__.py
@@ -1,11 +1,7 @@
from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import Service as sv
-from ATRI.rule import (
- is_block,
- is_in_dormant,
- is_in_service
-)
+from ATRI.rule import is_in_service
from ATRI.utils.list import count_list, del_list_aim
from ATRI.utils.request import get_text
from ATRI.exceptions import RequestTimeOut
@@ -15,20 +11,25 @@ URL = "https://zuanbot.com/api.php?level=min&lang=zh_cn"
sick_list = []
-__plugin_name__ = 'curse'
+__doc__ = """
+口臭一下
+权限组:所有人
+用法:
+ 口臭,口臭一下,骂我
+"""
-curse = sv.on_command(
- cmd="口臭一下",
- docs="口臭",
- aliases={"口臭", "骂我"},
- rule=is_block() & is_in_dormant()
- & is_in_service(__plugin_name__)
-)
+curse = sv.on_message(rule=is_in_service('口臭'))
+sv.manual_reg_service('口臭', __doc__)
@curse.handle()
async def _curse(bot: Bot, event: MessageEvent) -> None:
global sick_list
+ msg = str(event.message)
user = event.get_user_id()
+ curse_key = ['口臭', '口臭一下', '骂我']
+
+ if msg not in curse_key:
+ return
if count_list(sick_list, user) == 3:
sick_list.append(user)
diff --git a/ATRI/plugins/drifting-bottle/__init__.py b/ATRI/plugins/drifting-bottle/__init__.py
deleted file mode 100644
index 4d3182e..0000000
--- a/ATRI/plugins/drifting-bottle/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-from ATRI.service import Service as sv
-from ATRI.rule import is_block, is_in_service
-
-
-# 等待撰写 \ No newline at end of file
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
index 80a1c97..c48d540 100644
--- a/ATRI/plugins/essential.py
+++ b/ATRI/plugins/essential.py
@@ -4,10 +4,17 @@ import json
import shutil
from pathlib import Path
from random import choice
+from datetime import datetime
-from nonebot.adapters import Bot
+from nonebot.typing import T_State
+from nonebot.matcher import Matcher
+from nonebot.message import run_preprocessor
+from nonebot.exception import IgnoredException
from nonebot.adapters.cqhttp.message import Message
from nonebot.adapters.cqhttp import (
+ Bot,
+ MessageEvent,
+ GroupMessageEvent,
FriendRequestEvent,
GroupRequestEvent,
GroupIncreaseNoticeEvent,
@@ -24,12 +31,14 @@ import ATRI
from ATRI.log import logger
from ATRI.exceptions import WriteError
from ATRI.config import Config
-from ATRI.rule import is_block
from ATRI.service import Service as sv
from ATRI.utils.cqcode import coolq_code_check
PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services'
+ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
+os.makedirs(PLUGIN_INFO_DIR, exist_ok=True)
+os.makedirs(ESSENTIAL_DIR, exist_ok=True)
driver = ATRI.driver()
@@ -79,11 +88,86 @@ async def disconnect(bot) -> None:
logger.error("WebSocket 已断开,等待重连")
-ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
-os.makedirs(ESSENTIAL_DIR, exist_ok=True)
+@run_preprocessor # type: ignore
+async def _idk(matcher: Matcher,
+ bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ user = str(event.user_id)
+ if not sv.BlockSystem.auth_user(user):
+ raise IgnoredException(f'Block user: {user}')
+
+ if not sv.Dormant.is_dormant():
+ raise IgnoredException('Bot has been dormant.')
+
+ if isinstance(event, GroupMessageEvent):
+ group = str(event.group_id)
+ if not sv.BlockSystem.auth_group(group):
+ raise IgnoredException(f'Block group: {group}')
+
+@run_preprocessor # type: ignore
+async def _store_message(matcher: Matcher,
+ bot: Bot,
+ event,
+ state: T_State) -> None:
+ if isinstance(event, GroupMessageEvent):
+ group = str(event.group_id)
+
+ if event.sub_type == "normal":
+ now_time = datetime.now().strftime('%Y-%m-%d')
+ GROUP_DIR = ESSENTIAL_DIR / 'chat_history' / f'{event.group_id}'
+ os.makedirs(GROUP_DIR, exist_ok=True)
+ path = GROUP_DIR / f"{now_time}.chat.json"
+ now_time = datetime.now().strftime('%Y%m%d-%H%M%S')
+
+ try:
+ data = json.loads(path.read_bytes())
+ except:
+ data = {}
+ data[event.message_id] = {
+ "date": now_time,
+ "time": str(time.time()),
+ "post_type": str(event.post_type),
+ "sub_type": str(event.sub_type),
+ "user_id": str(event.user_id),
+ "group_id": str(event.group_id),
+ "message_type": str(event.message_type),
+ "message": str(event.message),
+ "raw_message": event.raw_message,
+ "font": str(event.font),
+ "sender": {
+ "user_id": str(event.sender.user_id),
+ "nickname": event.sender.nickname,
+ "sex": event.sender.sex,
+ "age": str(event.sender.age),
+ "card": event.sender.card,
+ "area": event.sender.area,
+ "level": event.sender.level,
+ "role": event.sender.role,
+ "title": event.sender.title
+ },
+ "to_me": event.to_me
+ }
+ try:
+ with open(path, 'w', encoding='utf-8') as r:
+ r.write(json.dumps(data, indent=4))
+ logger.debug(f"写入消息成功,id: {event.message_id}")
+ except WriteError:
+ logger.error("消息记录失败,可能是缺少文件的原因!")
+ else:
+ pass
+ else:
+ pass
+
+ if sv.BlockSystem.auth_group(group):
+ return
+ else:
+ pass
+
+
# 处理:好友请求
-request_friend_event = sv.on_request(rule=is_block())
+request_friend_event = sv.on_request()
@request_friend_event.handle()
async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
@@ -123,7 +207,7 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
# 处理:邀请入群,如身为管理,还附有入群请求
-request_group_event = sv.on_request(rule=is_block())
+request_group_event = sv.on_request()
@request_group_event.handle()
async def _request_group_event(bot, event: GroupRequestEvent) -> None:
@@ -197,11 +281,13 @@ group_admin_event = sv.on_notice()
@group_admin_event.handle()
async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None:
if event.is_tome():
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
- )
+ return
+
+ for superuser in Config.BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
+ )
# 处理群禁言事件
@@ -209,28 +295,30 @@ group_ban_event = sv.on_notice()
@group_ban_event.handle()
async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
- if event.is_tome():
- if event.duration:
- msg = (
- "那个..。,主人\n"
- f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n"
- f"时长...是 {event.duration} 秒"
+ if not event.is_tome():
+ return
+
+ if event.duration:
+ msg = (
+ "那个..。,主人\n"
+ f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n"
+ f"时长...是 {event.duration} 秒"
+ )
+ for superuser in Config.BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
)
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
- else:
- msg = (
- "好欸!主人\n"
- f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!"
+ else:
+ msg = (
+ "好欸!主人\n"
+ f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!"
+ )
+ for superuser in Config.BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
)
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
# 处理群红包运气王事件
@@ -260,10 +348,7 @@ recall_event = sv.on_notice()
@recall_event.handle()
async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None:
group = event.group_id
- repo = await bot.call_api(
- "get_msg",
- message_id=event.message_id
- )
+ repo = await bot.get_msg(message_id=event.message_id)
repo = str(repo["message"])
check = await coolq_code_check(repo, group=group)
if not check:
@@ -286,10 +371,7 @@ async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None:
@recall_event.handle()
async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None:
user = event.user_id
- repo = await bot.call_api(
- "get_msg",
- message_id=event.message_id
- )
+ repo = await bot.get_msg(message_id=event.message_id)
repo = str(repo["message"])
check = await coolq_code_check(repo, user)
if not check:
diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py
index f334bcf..6a50b7f 100644
--- a/ATRI/plugins/funny.py
+++ b/ATRI/plugins/funny.py
@@ -3,15 +3,10 @@ from pathlib import Path
from random import choice, randint
from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
-from nonebot.adapters.cqhttp.message import Message
from ATRI.service import Service as sv
from ATRI.utils.limit import is_too_exciting
-from ATRI.rule import (
- is_block,
- is_in_dormant,
- is_in_service
-)
+from ATRI.rule import is_in_service
__doc__ = """
@@ -21,18 +16,18 @@ __doc__ = """
来句笑话
"""
-get_laugh = sv.on_command(
- cmd="来句笑话",
- docs=__doc__,
- rule=is_block() & is_in_dormant()
- & is_in_service('来句笑话')
-)
+get_laugh = sv.on_message(rule=is_in_service('来句笑话'))
+sv.manual_reg_service('来句笑话', __doc__)
@get_laugh.handle()
async def _get_laugh(bot: Bot, event: MessageEvent) -> None:
user_name = event.sender.nickname
+ msg = str(event.message)
laugh_list = []
+ if msg != "来句笑话":
+ return
+
FILE = Path('.') / 'ATRI' / 'data' / 'database' / 'funny' / 'laugh.txt'
with open(FILE, 'r', encoding='utf-8') as r:
for line in r:
@@ -42,14 +37,19 @@ async def _get_laugh(bot: Bot, event: MessageEvent) -> None:
await get_laugh.finish(result.replace("%name", user_name))
-me_to_you = sv.on_message(
- rule=is_block() & is_in_dormant() & is_in_service('你又行了')
-)
-sv.manual_reg_service('你又行了')
+__doc__ = """
+你又行了
+权限组:所有人
+用法:
+ (被动触发)
+"""
+
+me_to_you = sv.on_message(rule=is_in_service('你又行了'))
+sv.manual_reg_service('你又行了', __doc__)
@me_to_you.handle()
async def _me_to_you(bot: Bot, event: MessageEvent) -> None:
- if randint(0, 5) == 5:
+ if randint(0, 15) == 5:
msg = str(event.message)
if "我" in msg and "CQ" not in msg:
await me_to_you.finish(msg.replace("我", "你"))
@@ -62,16 +62,18 @@ __doc__ = """
抽老婆
"""
-roll_wife = sv.on_command(
- cmd="抽老婆",
- docs=__doc__,
- rule=is_block() & is_in_dormant() & is_in_service('抽老婆')
-)
+roll_wife = sv.on_message(rule=is_in_service('抽老婆'))
+sv.manual_reg_service('抽老婆', __doc__)
@roll_wife.handle()
async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None:
user = event.user_id
group = event.group_id
+ msg = str(event.message)
+
+ if msg != "抽老婆":
+ return
+
user_name = await bot.get_group_member_info(group_id=group,
user_id=user)
user_name = user_name['nickname']
@@ -110,9 +112,10 @@ __doc__ = """
"""
fake_msg = sv.on_command(
- cmd="/fm",
+ cmd="fakemsg",
+ aliases={'fm'},
docs=__doc__,
- rule=is_block() & is_in_service('/fm') & is_in_dormant()
+ rule=is_in_service('fakemsg')
)
@fake_msg.handle()
@@ -121,7 +124,7 @@ async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None:
user = event.user_id
group = event.group_id
node = []
- check = await is_too_exciting(user, group, 2, True)
+ check = await is_too_exciting(user, group, 1, True)
if check:
for i in msg:
diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py
index ce118cd..c238b69 100644
--- a/ATRI/plugins/github.py
+++ b/ATRI/plugins/github.py
@@ -3,7 +3,6 @@ import json
from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import Service as sv
-from ATRI.rule import is_block, is_in_dormant
from ATRI.utils.request import get_bytes
from ATRI.exceptions import RequestTimeOut
@@ -11,7 +10,7 @@ from ATRI.exceptions import RequestTimeOut
URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}"
-github_issues = sv.on_message(rule=is_block() & is_in_dormant())
+github_issues = sv.on_message()
@github_issues.handle()
async def _github_issues(bot: Bot, event: MessageEvent) -> None:
diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py
index 5b0d1b0..fd51024 100644
--- a/ATRI/plugins/help.py
+++ b/ATRI/plugins/help.py
@@ -5,7 +5,6 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import SERVICE_DIR
from ATRI.service import Service as sv
-from ATRI.rule import is_block
SERVICE_DIR = SERVICE_DIR / 'services'
@@ -21,16 +20,15 @@ __doc__ = """
"""
help = sv.on_command(
- cmd="/h",
+ cmd="help",
+ aliases={'h', '?', '?'},
docs=__doc__,
- aliases={'/help', '.help'},
- rule=is_block()
)
@help.handle()
async def _help(bot: Bot, event: MessageEvent) -> None:
msg = str(event.message).split(' ')
- if not msg[0]:
+ if msg[0] == "":
msg = (
"呀?找不到路了?\n"
"/h list 查看可用命令列表\n"
@@ -44,9 +42,9 @@ async def _help(bot: Bot, event: MessageEvent) -> None:
for _, _, i in os.walk(SERVICE_DIR):
for a in i:
files.append(a.replace('.json', ''))
- cmds = " ".join(map(str, files))
+ cmds = " | ".join(map(str, files))
msg = "咱能做很多事!比如:\n" + cmds
- msg0 = msg + "\n具体用法呢,/(cmd) 就好!"
+ msg0 = msg + "\n具体用法呢,/(cmd) 就好!\n没反应可能是没权限..."
await help.finish(msg0)
elif msg[0] == "info":
cmd = msg[1]
diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py
index e7cfe61..70d3280 100644
--- a/ATRI/plugins/hitokoto.py
+++ b/ATRI/plugins/hitokoto.py
@@ -4,12 +4,7 @@ from pathlib import Path
from random import choice, randint
from nonebot.adapters.cqhttp import Bot, MessageEvent
-from ATRI.rule import (
- is_block,
- is_in_dormant,
- is_in_service,
- to_bot
-)
+from ATRI.rule import is_in_service, to_bot
from ATRI.service import Service as sv
from ATRI.exceptions import LoadingError
from ATRI.utils.list import count_list, del_list_aim
@@ -34,8 +29,7 @@ hitokoto = sv.on_command(
cmd="一言",
docs=__doc__,
aliases={"抑郁一下", "网抑云"},
- rule=is_block() & is_in_dormant()
- & is_in_service('一言') & to_bot()
+ rule=is_in_service('一言') & to_bot()
)
@hitokoto.handle()
diff --git a/ATRI/plugins/key-repo/__init__.py b/ATRI/plugins/key-repo/__init__.py
index dc606b0..2f2b69f 100644
--- a/ATRI/plugins/key-repo/__init__.py
+++ b/ATRI/plugins/key-repo/__init__.py
@@ -7,7 +7,7 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
from ATRI.config import Config
from ATRI.service import Service as sv
from ATRI.utils.request import get_bytes
-from ATRI.rule import is_block, is_in_dormant, is_in_service, to_bot
+from ATRI.rule import is_in_service, to_bot
from .data_source import (
add_history,
@@ -28,10 +28,17 @@ from .data_source import (
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-keyrepo = sv.on_message(rule=is_block()
- & is_in_dormant()
- & is_in_service('keyrepo')
- & to_bot())
+__doc__ = """
+涩涩的聊天(?
+权限组:所有人
+用法:
+ @ (msg)
+补充:
+ @: at机器人
+"""
+
+keyrepo = sv.on_message(rule=is_in_service('keyrepo') & to_bot(), priority=5)
+sv.manual_reg_service('keyrepo')
@keyrepo.handle()
async def _keyrepo(bot: Bot, event: MessageEvent) -> None:
@@ -45,6 +52,7 @@ async def _keyrepo(bot: Bot, event: MessageEvent) -> None:
__doc__ = """
关键词申请/审核
+(此功能未完善)
权限组:所有人
用法:
/train add (key) (repo)
@@ -62,9 +70,8 @@ __doc__ = """
"""
train = sv.on_command(
- cmd="/train",
- docs=__doc__,
- rule=is_block()
+ cmd="train",
+ docs=__doc__
)
@train.handle()
diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py
index 0d071cb..945edf6 100644
--- a/ATRI/plugins/nsfw.py
+++ b/ATRI/plugins/nsfw.py
@@ -8,7 +8,7 @@ from ATRI.log import logger as log
from ATRI.config import Config
from ATRI.service import Service as sv
from ATRI.exceptions import RequestTimeOut
-from ATRI.rule import is_block, is_in_dormant, is_in_service
+from ATRI.rule import is_in_service
from ATRI.utils.request import get_bytes
from ATRI.utils.cqcode import coolq_code_check
@@ -28,14 +28,18 @@ async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None:
user = event.user_id
group = event.group_id
check = await coolq_code_check(msg, user, group)
+
if check:
+ if "image" not in msg:
+ return
+
+ url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0]
try:
- url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0]
data = json.loads(await get_bytes(url))
except:
log.warning('检测涩图失败,请查阅文档以获取帮助')
return
- if round(data['score'], 4) > 0.6:
+ if round(data['score'], 4) > Config.NsfwCheck.passing_rate:
score = "{:.2%}".format(round(data['score'], 4))
log.debug(f'截获涩图,得分:{score}')
await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!')
@@ -57,9 +61,9 @@ __doc__ = """
"""
nsfw_reading = sv.on_command(
- cmd="/nsfw",
+ cmd="nsfw",
docs=__doc__,
- rule=is_block() & is_in_service('/nsfw') & is_in_dormant()
+ rule=is_in_service('nsfw')
)
@nsfw_reading.handle()
@@ -73,7 +77,7 @@ async def _nsfw_r(bot: Bot,
if check and msg:
state['pic'] = msg
-@nsfw_reading.got('pic', prompt='请提供一张图片')
+@nsfw_reading.got('pic', prompt='图呢?')
async def _nsfw_reading(bot: Bot,
event: GroupMessageEvent,
state: T_State) -> None:
diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py
index 8404da6..9d5d6a9 100644
--- a/ATRI/plugins/rich/__init__.py
+++ b/ATRI/plugins/rich/__init__.py
@@ -6,10 +6,6 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import Service as sv
from ATRI.utils.request import get_bytes
from ATRI.utils.list import count_list, del_list_aim
-from ATRI.rule import (
- is_block,
- is_in_dormant,
-)
from .data_source import dec
@@ -17,10 +13,7 @@ from .data_source import dec
temp_list = []
-bilibili_rich = sv.on_message(
- rule=is_block() & is_in_dormant()
-)
-sv.manual_reg_service("监听b站小程序")
+bilibili_rich = sv.on_message()
@bilibili_rich.handle()
async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
@@ -30,7 +23,10 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
if "qqdocurl" not in msg:
if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
+ try:
+ av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
+ except:
+ return
else:
try:
bv = re.findall(r"(BV\w+)", msg)
@@ -49,7 +45,10 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
if not bv:
if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
+ try:
+ av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
+ except:
+ return
else:
return
diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py
index 25e4e0b..61e96fe 100644
--- a/ATRI/plugins/status.py
+++ b/ATRI/plugins/status.py
@@ -3,27 +3,37 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.log import logger as log
from ATRI.service import Service as sv
-from ATRI.rule import is_block
+from ATRI.rule import is_in_service
from ATRI.exceptions import GetStatusError
from ATRI.utils.apscheduler import scheduler
from ATRI.config import Config
+__doc__ = """
+测试机器人状态
+"""
+
ping = sv.on_command(
- cmd="/ping",
+ cmd="ping",
docs="测试机器人",
- rule=is_block()
-)
+ rule=is_in_service('ping'))
@ping.handle()
async def _ping(bot: Bot, event: MessageEvent) -> None:
await ping.finish("I'm fine.")
+__doc__ = """
+检查机器人性能
+权限组:所有人
+用法:
+ /status
+"""
+
status = sv.on_command(
- cmd="/status",
- docs="检查机器人状态",
- rule=is_block()
+ cmd="status",
+ docs=__doc__,
+ rule=is_in_service('status')
)
@status.handle()
diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py
index b23a01f..af6d4e0 100644
--- a/ATRI/plugins/utils/__init__.py
+++ b/ATRI/plugins/utils/__init__.py
@@ -2,11 +2,7 @@ import re
from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import Service as sv
-from ATRI.rule import (
- is_block,
- is_in_dormant,
- is_in_service
-)
+from ATRI.rule import is_in_service
from .data_source import roll_dice, Encrypt
@@ -22,10 +18,9 @@ roll一下
"""
roll = sv.on_command(
- cmd="/roll",
+ cmd="roll",
docs=__doc__,
- rule=is_block() & is_in_dormant()
- & is_in_service('/roll')
+ rule=is_in_service('roll')
)
@roll.handle()
@@ -46,7 +41,7 @@ async def _(bot: Bot, event: MessageEvent, state: dict) -> None:
__doc__ = """
-加密你的信息!
+加密传输(bushi
权限组:所有人
用法:
/enc e,d msg
@@ -58,9 +53,9 @@ __doc__ = """
"""
encrypt = sv.on_command(
- cmd="/enc",
+ cmd="enc",
docs=__doc__,
- rule=is_block() & is_in_service('/enc') & is_in_dormant()
+ rule=is_in_service('enc')
)
@encrypt.handle()
diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py
index 71da581..18c0492 100644
--- a/ATRI/plugins/utils/data_source.py
+++ b/ATRI/plugins/utils/data_source.py
@@ -55,7 +55,7 @@ class Encrypt():
def _encodeByte(self, i) -> Union[str, None]:
if i > 0xFF:
- raise ValueError('ERROR! rc/nb overflow')
+ raise ValueError('ERROR! at/ri overflow')
if i > 0x7F:
i = i & 0x7F
@@ -65,7 +65,7 @@ class Encrypt():
def _encodeShort(self, i) -> str:
if i > 0xFFFF:
- raise ValueError('ERROR! rcnb overflow')
+ raise ValueError('ERROR! atri overflow')
reverse = False
if i > 0x7FFF:
@@ -93,11 +93,12 @@ class Encrypt():
if idx[0] < 0 or idx[1] < 0:
idx = [self.cn.index(c[0]), self.cb.index(c[1])]
nb = True
- raise ValueError('ERROR! rc/nb overflow')
+ raise ValueError('ERROR! at/ri overflow')
- result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1]
+ result = idx[0] * self.sb + idx[1] \
+ if nb else idx[0] * self.sc + idx[1]
if result > 0x7F:
- raise ValueError('ERROR! rc/nb overflow')
+ raise ValueError('ERROR! at/ri overflow')
return result | 0x80 if nb else 0
@@ -119,12 +120,12 @@ class Encrypt():
]
if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0:
- raise ValueError('ERROR! not rcnb')
+ raise ValueError('ERROR! not atri')
result = idx[0] * self.scnb + idx[1] * self.snb + idx[
2] * self.sb + idx[3]
if result > 0x7FFF:
- raise ValueError('ERROR! rcnb overflow')
+ raise ValueError('ERROR! atri overflow')
result |= 0x8000 if reverse else 0
return result
@@ -155,8 +156,8 @@ class Encrypt():
result = []
for i in range(0, (len(s) >> 2)):
result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8]))
- result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF
- ]))
+ result.append(bytes([
+ self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF]))
if (len(s) & 2) == 2:
result.append(bytes([self._decodeByte(s[-2:])]))
diff --git a/ATRI/rule.py b/ATRI/rule.py
index 109d644..00a4c21 100644
--- a/ATRI/rule.py
+++ b/ATRI/rule.py
@@ -1,34 +1,20 @@
from nonebot.rule import Rule
from nonebot.adapters.cqhttp import GroupMessageEvent, PokeNotifyEvent
-from .config import config
from .service import Service as sv
def is_in_service(service: str) -> Rule:
async def _is_in_service(bot, event, state) -> bool:
+ user = str(event.user_id)
if isinstance(event, GroupMessageEvent):
- return sv.auth_service(service, event.group_id)
+ return sv.auth_service(service, user, str(event.group_id))
else:
- return sv.auth_service(service, None)
+ return sv.auth_service(service, user, None)
return Rule(_is_in_service)
-def is_block() -> Rule:
- async def _is_in_banlist(bot, event, state) -> bool:
- return sv.BlockSystem.auth_user(int(event.get_user_id()))
-
- return Rule(_is_in_banlist)
-
-
-def is_in_dormant() -> Rule:
- async def _is_in_dormant(bot, event, state) -> bool:
- return sv.Dormant.is_dormant()
-
- return Rule(_is_in_dormant)
-
-
def to_bot() -> Rule:
async def _to_bot(bot, event, state) -> bool:
return event.is_tome()
diff --git a/ATRI/service.py b/ATRI/service.py
index df2e8d5..5205bd2 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -90,12 +90,12 @@ class Service:
计划搭配前端使用
"""
@staticmethod
- def manual_reg_service(service: str):
+ def manual_reg_service(service: str, docs: str = None):
file_name = service.replace('/', '') + ".json"
file = SERVICES_DIR / file_name
service_info = {
- "name": service,
- "docs": None,
+ "command": service,
+ "docs": docs,
"enabled": True,
"disable_user": {},
"disable_group": {}
@@ -104,41 +104,50 @@ class Service:
r.write(json.dumps(service_info, indent=4))
@staticmethod
- def auth_service(service: str, group: Optional[int] = None) -> bool:
+ def auth_service(service: str, user: str, group: str = None) -> bool:
data = _load_service_config(service)
- return False if group in data["disable_group"] else True
+ if user in data["disable_user"]:
+ return False
+ else:
+ if group in data["disable_group"]:
+ return False
+ else:
+ return True
@staticmethod
def control_service(service: str,
is_global: bool,
- is_enabled: bool,
- user: Optional[int] = None,
- group: Optional[int] = None) -> None:
+ is_enabled: int,
+ user: str = None,
+ group: str = None) -> None:
data = _load_service_config(service)
-
+ is_enabled = bool(is_enabled)
+
if is_global:
status = "disabled" if is_enabled else "enabled"
- data['enbaled'] = is_enabled
- log.info(f"Service: {service} has been {status}.")
+ data['enabled'] = is_enabled
+ log.info(f"\033[33mService: {service} has been {status}.\033[33m")
else:
if user:
- if is_enabled:
+ if not is_enabled:
data['disable_user'][user] = str(datetime.now())
- log.info(f"New service blocked user: {user}"
- f" | Service: {service} | Time: {datetime.now()}")
+ log.info(f"\033[33mNew service blocked user: {user}\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
else:
- del data['disable_user'][user]
- log.info(f"User: {user} has been unblock"
- f" | Service: {service} | Time: {datetime.now()}")
+ if user in data['disable_user']:
+ del data['disable_user'][user]
+ log.info(f"\033[33mUser: {user} has been unblock\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
else:
- if is_enabled:
+ if not is_enabled:
data['disable_group'][group] = str(datetime.now())
- log.info(f"New service blocked group: {group}"
- f" | Service: {service} | Time: {datetime.now()}")
+ log.info(f"\033[33mNew service blocked group: {group}\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
else:
- del data['disable_group'][group]
- log.info(f"Group: {group} has been unblock"
- f" | Service: {service} | Time: {datetime.now()}")
+ if group in data['disable_group']:
+ del data['disable_group'][group]
+ log.info(f"\033[33mGroup: {group} has been unblock\033[33m"
+ f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m")
_save_service_config(service, data)
@staticmethod
@@ -458,32 +467,31 @@ class Service:
file_name = "ban.json"
path = SERVICE_DIR / file_name
- @classmethod
- def auth_user(cls, user: int) -> bool:
+ @staticmethod
+ def auth_user(user: str) -> bool:
return False if user in _load_block_list()['user'] else True
@staticmethod
- def auth_group(group: int) -> bool:
+ def auth_group(group: str) -> bool:
return False if group in _load_block_list()['group'] else True
- @classmethod
- def control_list(cls,
- is_enabled: bool,
- user: Optional[int] = None,
- group: Optional[int] = None) -> None:
+ @staticmethod
+ def control_list(is_enabled: bool,
+ user: str = None,
+ group: str = None) -> None:
data = _load_block_list()
if user:
if is_enabled:
data['user'][user] = str(datetime.now())
- log.info(f"New blocked user: {user} | Time: {datetime.now()}")
+ log.info(f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m")
else:
del data['user'][str(user)]
- log.info(f"User {user} has been unblock.")
+ log.info(f"\033[33mUser {user} has been unblock.\033[33m")
elif group:
if is_enabled:
data['group'][group] = str(datetime.now())
- log.info(f"New blocked group: {group} | Time: {datetime.now()}")
+ log.info(f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m")
else:
del data['group'][str(group)]
- log.info(f"Group {group} has been unblock.")
+ log.info(f"\033[33mGroup {group} has been unblock.\033[33m")
_save_block_list(data)