summaryrefslogtreecommitdiff
path: root/ATRI/plugins/manege
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/manege')
-rw-r--r--ATRI/plugins/manege/__init__.py104
-rw-r--r--ATRI/plugins/manege/data_source.py65
2 files changed, 110 insertions, 59 deletions
diff --git a/ATRI/plugins/manege/__init__.py b/ATRI/plugins/manege/__init__.py
index d0337e9..305cdc5 100644
--- a/ATRI/plugins/manege/__init__.py
+++ b/ATRI/plugins/manege/__init__.py
@@ -13,6 +13,7 @@ from .data_source import Manege
block_user = Manege().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER)
+
@block_user.args_parser # type: ignore
async def _get_block_user(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -24,24 +25,27 @@ async def _get_block_user(bot: Bot, event: MessageEvent, state: T_State):
else:
state["block_user"] = msg
+
@block_user.handle()
async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["block_user"] = msg
+
@block_user.got("block_user", "哪位?GKD!")
async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State):
user_id = state["block_user"]
is_ok = Manege().block_user(user_id)
if not is_ok:
await block_user.finish("kuso!封禁失败了...")
-
+
await block_user.finish(f"用户 {user_id} 危!")
unblock_user = Manege().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER)
+
@unblock_user.args_parser # type: ignore
async def _get_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -53,24 +57,27 @@ async def _get_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
else:
state["unblock_user"] = msg
+
@unblock_user.handle()
async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["unblock_user"] = msg
+
@unblock_user.got("unblock_user", "哪位?GKD!")
async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
user_id = state["unblock_user"]
is_ok = Manege().unblock_user(user_id)
if not is_ok:
await unblock_user.finish("kuso!解封失败了...")
-
+
await unblock_user.finish(f"好欸!{user_id} 重获新生!")
block_group = Manege().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER)
+
@block_group.args_parser # type: ignore
async def _get_block_group(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -82,24 +89,27 @@ async def _get_block_group(bot: Bot, event: MessageEvent, state: T_State):
else:
state["block_group"] = msg
+
@block_group.handle()
async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["block_group"] = msg
+
@block_group.got("block_group", "哪个群?GKD!")
async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State):
group_id = state["block_group"]
is_ok = Manege().block_group(group_id)
if not is_ok:
await block_group.finish("kuso!封禁失败了...")
-
+
await block_group.finish(f"群 {group_id} 危!")
unblock_group = Manege().on_command("解封群", "对目标群进行解封", permission=SUPERUSER)
+
@unblock_group.args_parser # type: ignore
async def _get_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -111,24 +121,27 @@ async def _get_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
else:
state["unblock_group"] = msg
+
@unblock_group.handle()
async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["unblock_group"] = msg
+
@unblock_group.got("unblock_group", "哪个群?GKD!")
async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
group_id = state["unblock_group"]
is_ok = Manege().unblock_group(group_id)
if not is_ok:
await unblock_group.finish("kuso!解封失败了...")
-
+
await unblock_group.finish(f"好欸!群 {group_id} 重获新生!")
global_block_service = Manege().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER)
+
@global_block_service.args_parser # type: ignore
async def _get_global_block_service(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -140,24 +153,27 @@ async def _get_global_block_service(bot: Bot, event: MessageEvent, state: T_Stat
else:
state["global_block_service"] = msg
+
@global_block_service.handle()
async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["global_block_service"] = msg
+
@global_block_service.got("global_block_service", "阿...是哪个服务呢")
async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State):
block_service = state["global_block_service"]
is_ok = Manege().control_global_service(block_service, False)
if not is_ok:
await global_block_service.finish("kuso!禁用失败了...")
-
+
await global_block_service.finish(f"服务 {block_service} 已被禁用")
global_unblock_service = Manege().on_command("全局启用", "全局启用某服务", permission=SUPERUSER)
+
@global_unblock_service.args_parser # type: ignore
async def _get_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -169,23 +185,28 @@ async def _get_global_unblock_service(bot: Bot, event: MessageEvent, state: T_St
else:
state["global_unblock_service"] = msg
+
@global_unblock_service.handle()
async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["global_unblock_service"] = msg
+
@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢")
async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
block_service = state["global_unblock_service"]
is_ok = Manege().control_global_service(block_service, True)
if not is_ok:
await global_unblock_service.finish("kuso!启用服务失败了...")
-
+
await global_unblock_service.finish(f"服务 {block_service} 已启用")
-user_block_service = Manege().on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER)
+user_block_service = Manege().on_regex(
+ r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER
+)
+
@user_block_service.handle()
async def _user_block_service(bot: Bot, event: MessageEvent):
@@ -194,15 +215,17 @@ async def _user_block_service(bot: Bot, event: MessageEvent):
reg = re.findall(pattern, msg)
aim_user = reg[0]
aim_service = reg[1]
-
+
is_ok = Manege().control_user_service(aim_service, aim_user, False)
if not is_ok:
await user_block_service.finish("禁用失败...请检查服务名是否正确")
await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}")
-
-user_unblock_service = Manege().on_regex(r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER)
+user_unblock_service = Manege().on_regex(
+ r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER
+)
+
@user_unblock_service.handle()
async def _user_unblock_service(bot: Bot, event: MessageEvent):
@@ -211,14 +234,17 @@ async def _user_unblock_service(bot: Bot, event: MessageEvent):
reg = re.findall(pattern, msg)
aim_user = reg[0]
aim_service = reg[1]
-
+
is_ok = Manege().control_user_service(aim_service, aim_user, True)
if not is_ok:
await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中")
await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}")
-group_block_service = Manege().on_command("禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN)
+group_block_service = Manege().on_command(
+ "禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
@group_block_service.args_parser # type: ignore
async def _get_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
@@ -231,27 +257,36 @@ async def _get_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_
else:
state["group_block_service"] = msg
+
@group_block_service.handle()
-async def _ready_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _ready_group_block_service(
+ bot: Bot, event: GroupMessageEvent, state: T_State
+):
msg = str(event.message).strip()
if msg:
state["group_block_service"] = msg
+
@group_block_service.got("group_block_service", "阿...是哪个服务呢")
async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
aim_service = state["group_block_service"]
group_id = str(event.group_id)
-
+
is_ok = Manege().control_group_service(aim_service, group_id, False)
if not is_ok:
await group_block_service.finish("禁用失败...请检查服务名是否输入正确")
await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}")
-group_unblock_service = Manege().on_command("启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN)
+group_unblock_service = Manege().on_command(
+ "启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
@group_unblock_service.args_parser # type: ignore
-async def _get_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _get_group_unblock_service(
+ bot: Bot, event: GroupMessageEvent, state: T_State
+):
msg = str(event.message).strip()
quit_list = ["算了", "罢了"]
if msg in quit_list:
@@ -261,17 +296,23 @@ async def _get_group_unblock_service(bot: Bot, event: GroupMessageEvent, state:
else:
state["group_unblock_service"] = msg
+
@group_unblock_service.handle()
-async def _ready_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _ready_group_unblock_service(
+ bot: Bot, event: GroupMessageEvent, state: T_State
+):
msg = str(event.message).strip()
if msg:
state["group_unblock_service"] = msg
+
@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢")
-async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _deal_group_unblock_service(
+ bot: Bot, event: GroupMessageEvent, state: T_State
+):
aim_service = state["group_unblock_service"]
group_id = str(event.group_id)
-
+
is_ok = Manege().control_group_service(aim_service, group_id, True)
if not is_ok:
await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中")
@@ -280,6 +321,7 @@ async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state:
get_friend_add_list = Manege().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER)
+
@get_friend_add_list.handle()
async def _get_friend_add_list(bot: Bot, event: MessageEvent):
data = Manege().load_friend_apply_list()
@@ -294,10 +336,11 @@ async def _get_friend_add_list(bot: Bot, event: MessageEvent):
msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list))
msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定"
await get_friend_add_list.finish(msg1)
-
+
approve_friend_add = Manege().on_command("同意好友", "同意好友申请", permission=SUPERUSER)
+
@approve_friend_add.args_parser # type: ignore
async def _get_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -309,12 +352,14 @@ async def _get_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State)
else:
state["approve_friend_add"] = msg
+
@approve_friend_add.handle()
async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["approve_friend_add"]
+
@approve_friend_add.got("approve_friend_add", "申请码GKD!")
async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["approve_friend_add"]
@@ -330,6 +375,7 @@ async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State
refuse_friend_add = Manege().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER)
+
@refuse_friend_add.args_parser # type: ignore
async def _get_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -341,12 +387,14 @@ async def _get_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
else:
state["refuse_friend_add"] = msg
+
@refuse_friend_add.handle()
async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["refuse_friend_add"]
+
@refuse_friend_add.got("refuse_friend_add", "申请码GKD!")
async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["refuse_friend_add"]
@@ -362,6 +410,7 @@ async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State)
get_group_invite_list = Manege().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER)
+
@get_group_invite_list.handle()
async def _get_group_invite_list(bot: Bot, event: MessageEvent):
data = Manege().load_invite_apply_list()
@@ -380,6 +429,7 @@ async def _get_group_invite_list(bot: Bot, event: MessageEvent):
approve_group_invite = Manege().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER)
+
@approve_group_invite.args_parser # type: ignore
async def _get_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -391,17 +441,21 @@ async def _get_approve_group_invite(bot: Bot, event: MessageEvent, state: T_Stat
else:
state["approve_group_invite"] = msg
+
@approve_group_invite.handle()
async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["approve_group_invite"]
+
@approve_group_invite.got("approve_group_invite", "申请码GKD!")
async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["approve_group_invite"]
try:
- await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=True)
+ await bot.set_group_add_request(
+ flag=apply_code, sub_type="invite", approve=True
+ )
except BaseException:
await approve_group_invite.finish("同意失败...尝试下手动?")
data = Manege().load_invite_apply_list()
@@ -412,6 +466,7 @@ async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_Sta
refuse_group_invite = Manege().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER)
+
@refuse_group_invite.args_parser # type: ignore
async def _get_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -423,17 +478,21 @@ async def _get_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State
else:
state["refuse_group_invite"] = msg
+
@refuse_group_invite.handle()
async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["refuse_group_invite"]
+
@refuse_group_invite.got("refuse_group_invite", "申请码GKD!")
async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["refuse_group_invite"]
try:
- await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=False)
+ await bot.set_group_add_request(
+ flag=apply_code, sub_type="invite", approve=False
+ )
except BaseException:
await refuse_group_invite.finish("拒绝失败...尝试下手动?")
data = Manege().load_invite_apply_list()
@@ -444,6 +503,7 @@ async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_Stat
track_error = Manege().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"})
+
@track_error.handle()
async def _track_error(bot: Bot, event: MessageEvent):
track_id = str(event.message).strip()
diff --git a/ATRI/plugins/manege/data_source.py b/ATRI/plugins/manege/data_source.py
index 933f6c6..6717f6d 100644
--- a/ATRI/plugins/manege/data_source.py
+++ b/ATRI/plugins/manege/data_source.py
@@ -9,7 +9,7 @@ from ATRI.utils import UbuntuPaste
from ATRI.exceptions import ReadFileError, load_error
-MANEGE_DIR = Path(".") / "ATRI" / "data"/ "database" / "manege"
+MANEGE_DIR = Path(".") / "ATRI" / "data" / "database" / "manege"
ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential"
os.makedirs(MANEGE_DIR, exist_ok=True)
os.makedirs(ESSENTIAL_DIR, exist_ok=True)
@@ -29,7 +29,6 @@ __doc__ = """
class Manege(Service):
-
def __init__(self):
Service.__init__(self, "管理", __doc__, True)
@@ -49,11 +48,10 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
data = json.loads(path.read_bytes())
return data
-
-
+
@staticmethod
def _save_block_user_list(data: dict) -> None:
file_name = "block_user.json"
@@ -62,10 +60,10 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
-
+
@staticmethod
def _load_block_group_list() -> dict:
"""
@@ -82,10 +80,10 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
data = json.loads(path.read_bytes())
return data
-
+
@staticmethod
def _save_block_group_list(data: dict) -> None:
file_name = "block_group.json"
@@ -94,55 +92,51 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
-
+
@classmethod
def block_user(cls, user_id: str) -> bool:
data = cls._load_block_user_list()
now_time = datetime.now()
- data[user_id] = {
- "time": now_time
- }
+ data[user_id] = {"time": now_time}
try:
cls._save_block_user_list(data)
return True
except BaseException:
return False
-
+
@classmethod
def unblock_user(cls, user_id: str) -> bool:
data: dict = cls._load_block_user_list()
if user_id not in data:
return False
-
+
try:
data.pop(user_id)
cls._save_block_user_list(data)
return True
except BaseException:
return False
-
+
@classmethod
def block_group(cls, group_id: str) -> bool:
data = cls._load_block_group_list()
now_time = datetime.now()
- data[group_id] = {
- "time": now_time
- }
+ data[group_id] = {"time": now_time}
try:
cls._save_block_group_list(data)
return True
except BaseException:
return False
-
+
@classmethod
def unblock_group(cls, group_id: str) -> bool:
data: dict = cls._load_block_group_list()
if group_id not in data:
return False
-
+
try:
data.pop(group_id)
cls._save_block_group_list(data)
@@ -162,7 +156,7 @@ class Manege(Service):
data["enabled"] = is_enabled
ServiceTools().save_service(data, service)
return True
-
+
@staticmethod
def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool:
"""
@@ -173,7 +167,7 @@ class Manege(Service):
except BaseException:
return False
temp_list: list = data.get("disable_user", list())
-
+
if is_enabled:
try:
temp_list.remove(user_id)
@@ -184,7 +178,7 @@ class Manege(Service):
data["disable_user"] = temp_list
ServiceTools().save_service(data, service)
return True
-
+
@staticmethod
def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool:
"""
@@ -196,7 +190,7 @@ class Manege(Service):
except BaseException:
return False
temp_list: list = data.get("disable_group", list())
-
+
if is_enabled:
try:
temp_list.remove(group_id)
@@ -219,7 +213,7 @@ class Manege(Service):
data = json.loads(path.read_bytes())
return data
-
+
@staticmethod
def save_friend_apply_list(data: dict) -> None:
file_name = "friend_add.json"
@@ -230,7 +224,7 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
-
+
@staticmethod
def load_invite_apply_list() -> dict:
file_name = "group_invite.json"
@@ -242,7 +236,7 @@ class Manege(Service):
data = json.loads(path.read_bytes())
return data
-
+
@staticmethod
def save_invite_apply_list(data: dict) -> None:
file_name = "group_invite.json"
@@ -260,22 +254,19 @@ class Manege(Service):
data = load_error(track_id)
except ReadFileError:
return "请检查ID是否正确..."
-
+
prompt = data.get("prompt", "ignore")
time = data.get("time", "ignore")
content = data.get("content", "ignore")
-
+
msg0 = TRACK_BACK_FORMAT.format(
- track_id=track_id,
- prompt=prompt,
- time=time,
- content=content
+ track_id=track_id, prompt=prompt, time=time, content=content
)
f_data = FormData()
f_data.add_field("poster", "ATRI running log")
f_data.add_field("syntax", "text")
f_data.add_field("expiration", "day")
f_data.add_field("content", msg0)
-
+
repo = f"详细请移步此处~\n{await UbuntuPaste(data).paste()}"
- return repo \ No newline at end of file
+ return repo