summaryrefslogtreecommitdiff
path: root/ATRI/plugins/manage
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/manage')
8 files changed, 231 insertions, 304 deletions
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py
index d9f1ffe..b316020 100644
--- a/ATRI/plugins/manage/__init__.py
+++ b/ATRI/plugins/manage/__init__.py
@@ -4,5 +4,4 @@ from pathlib import Path
_sub_plugins = set()
-_sub_plugins |= nonebot.load_plugins(
- str((Path(__file__).parent / 'modules').resolve()))
+_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve()))
diff --git a/ATRI/plugins/manage/modules/block.py b/ATRI/plugins/manage/modules/block.py
index 7b982d4..b439ba1 100644
--- a/ATRI/plugins/manage/modules/block.py
+++ b/ATRI/plugins/manage/modules/block.py
@@ -12,36 +12,31 @@ __doc__ = """
封禁用户 QQ号
"""
-block_user = sv.on_command(
- cmd="封禁用户",
- docs=__doc__,
- permission=SUPERUSER
-)
+block_user = sv.on_command(cmd="封禁用户", docs=__doc__, permission=SUPERUSER)
+
@block_user.args_parser # type: ignore
-async def _block_user_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _block_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await block_user.finish('好吧...')
+ await block_user.finish("好吧...")
if not msg:
- await block_user.reject('是谁呢?!GKD!')
+ await block_user.reject("是谁呢?!GKD!")
else:
- state['noob'] = msg
+ state["noob"] = msg
+
@block_user.handle()
async def _block_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['noob'] = msg
+ state["noob"] = msg
+
-@block_user.got('noob', prompt='是谁呢?!GKD!')
-async def _deal_block_user(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- noob = state['noob']
+@block_user.got("noob", prompt="是谁呢?!GKD!")
+async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ noob = state["noob"]
sv.BlockSystem.control_list(True, user=noob)
msg = f"用户[{noob}]已被封禁(;′⌒`)"
await block_user.finish(msg)
@@ -54,36 +49,31 @@ __doc__ = """
解封用户 QQ号
"""
-unblock_user = sv.on_command(
- cmd="解封用户",
- docs=__doc__,
- permission=SUPERUSER
-)
+unblock_user = sv.on_command(cmd="解封用户", docs=__doc__, permission=SUPERUSER)
+
@unblock_user.args_parser # type: ignore
-async def _unblock_user_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _unblock_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await unblock_user.finish('好吧...')
+ await unblock_user.finish("好吧...")
if not msg:
- await unblock_user.reject('要原谅谁呢...')
+ await unblock_user.reject("要原谅谁呢...")
else:
- state['forgive'] = msg
+ state["forgive"] = msg
+
@unblock_user.handle()
async def _unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['forgive'] = msg
+ state["forgive"] = msg
+
-@unblock_user.got('forgive', prompt='要原谅谁呢...')
-async def _deal_unblock_user(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- forgive = state['forgive']
+@unblock_user.got("forgive", prompt="要原谅谁呢...")
+async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ forgive = state["forgive"]
sv.BlockSystem.control_list(False, user=forgive)
msg = f"用户[{forgive}]已被解封ヾ(´・ω・`)ノ"
await unblock_user.finish(msg)
@@ -96,36 +86,31 @@ __doc__ = """
封禁群 群号
"""
-block_group = sv.on_command(
- cmd="封禁群",
- docs=__doc__,
- permission=SUPERUSER
-)
+block_group = sv.on_command(cmd="封禁群", docs=__doc__, permission=SUPERUSER)
+
@block_group.args_parser # type: ignore
-async def _block_group_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _block_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await block_user.finish('好吧...')
+ await block_user.finish("好吧...")
if not msg:
- await block_user.reject('是哪个群?!GKD!')
+ await block_user.reject("是哪个群?!GKD!")
else:
- state['noob_g'] = msg
+ state["noob_g"] = msg
+
@block_group.handle()
async def _block_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['noob_g'] = msg
+ state["noob_g"] = msg
+
-@block_group.got('noob_g', prompt='是哪个群?!GKD!')
-async def _deal_block_group(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- noob_g = state['noob_g']
+@block_group.got("noob_g", prompt="是哪个群?!GKD!")
+async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ noob_g = state["noob_g"]
sv.BlockSystem.control_list(True, group=noob_g)
msg = f"群[{noob_g}]已被封禁(;′⌒`)"
await block_user.finish(msg)
@@ -138,38 +123,31 @@ __doc__ = """
解封 群号
"""
-unblock_group = sv.on_command(
- cmd="解封群",
- docs=__doc__,
- permission=SUPERUSER
-)
+unblock_group = sv.on_command(cmd="解封群", docs=__doc__, permission=SUPERUSER)
+
@unblock_group.args_parser # type: ignore
-async def _unblock_group_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _unblock_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await block_user.finish('好吧...')
+ await block_user.finish("好吧...")
if not msg:
- await block_user.reject('要原谅哪个群呢...')
+ await block_user.reject("要原谅哪个群呢...")
else:
- state['forgive_g'] = msg
+ state["forgive_g"] = msg
+
@unblock_group.handle()
-async def _unblock_group(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['forgive_g'] = msg
+ state["forgive_g"] = msg
+
-@unblock_group.got('forgive_g', prompt='要原谅哪个群呢...')
-async def _deal_unblock_group(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- forgive_g = state['forgive_g']
+@unblock_group.got("forgive_g", prompt="要原谅哪个群呢...")
+async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ forgive_g = state["forgive_g"]
sv.BlockSystem.control_list(False, group=forgive_g)
msg = f"群[{forgive_g}]已被解封ヾ(´・ω・`)ノ"
await unblock_user.finish(msg)
diff --git a/ATRI/plugins/manage/modules/broadcast.py b/ATRI/plugins/manage/modules/broadcast.py
index 5086fcf..7f7816d 100644
--- a/ATRI/plugins/manage/modules/broadcast.py
+++ b/ATRI/plugins/manage/modules/broadcast.py
@@ -15,52 +15,46 @@ __doc__ = """
广播 内容
"""
-broadcast = sv.on_command(
- cmd="广播",
- docs=__doc__,
- permission=SUPERUSER
-)
+broadcast = sv.on_command(cmd="广播", docs=__doc__, permission=SUPERUSER)
+
@broadcast.args_parser # type: ignore
-async def _broadcast_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _broadcast_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await broadcast.finish('好吧...')
+ await broadcast.finish("好吧...")
if not msg:
- await broadcast.reject('想群发啥呢0w0')
+ await broadcast.reject("想群发啥呢0w0")
else:
- state['msg'] = msg
+ state["msg"] = msg
+
@broadcast.handle()
async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['msg'] = msg
-
-@broadcast.got('msg', prompt='请告诉咱需要群发的内容~!')
-async def _deal_broadcast(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- msg = state['msg']
+ state["msg"] = msg
+
+
+@broadcast.got("msg", prompt="请告诉咱需要群发的内容~!")
+async def _deal_broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = state["msg"]
group_list = await bot.get_group_list()
succ_list = []
err_list = []
-
+
for group in group_list:
await asyncio.sleep(randint(0, 2))
try:
- await bot.send_group_msg(group_id=group["group_id"],
- message=msg)
+ await bot.send_group_msg(group_id=group["group_id"], message=msg)
except BaseException:
err_list.append(group["group_id"])
-
+
msg0 = ""
for i in err_list:
msg0 += f" {i}\n"
-
+
repo_msg = (
f"推送消息:\n{msg}\n"
"————————\n"
diff --git a/ATRI/plugins/manage/modules/debug.py b/ATRI/plugins/manage/modules/debug.py
index 259e791..fa21c5b 100644
--- a/ATRI/plugins/manage/modules/debug.py
+++ b/ATRI/plugins/manage/modules/debug.py
@@ -11,7 +11,7 @@ from ATRI.utils.ub_paste import paste
from ATRI.exceptions import load_error
-level_list = ['info', 'warning', 'error', 'debug']
+level_list = ["info", "warning", "error", "debug"]
__doc__ = """
@@ -25,48 +25,51 @@ __doc__ = """
get_console = sv.on_command(
cmd="获取log",
- aliases={'获取LOG', '获取控制台', '获取控制台信息'},
+ aliases={"获取LOG", "获取控制台", "获取控制台信息"},
docs=__doc__,
- permission=SUPERUSER
+ permission=SUPERUSER,
)
+
@get_console.handle()
async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).split(' ')
+ msg = str(event.message).split(" ")
if msg:
- state['level'] = msg[0]
+ state["level"] = msg[0]
try:
- state['line'] = msg[1]
+ state["line"] = msg[1]
except Exception:
pass
-@get_console.got('level', prompt='需要获取的等级是?')
+
+@get_console.got("level", prompt="需要获取的等级是?")
async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None:
- quit_list = ['算了', '罢了', '不了']
- if state['level'] in quit_list:
- await get_console.finish('好吧...')
+ quit_list = ["算了", "罢了", "不了"]
+ if state["level"] in quit_list:
+ await get_console.finish("好吧...")
-@get_console.got('line', prompt='需要获取的行数是?')
+
+@get_console.got("line", prompt="需要获取的行数是?")
async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None:
- level = state['level']
- line = state['line']
+ level = state["level"]
+ line = state["line"]
repo = str()
-
+
path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log"
- logs = await open_file(path, 'readlines')
-
+ logs = await open_file(path, "readlines")
+
try:
- content = logs[int(line):] # type: ignore
+ content = logs[int(line) :] # type: ignore
repo = "\n".join(content).replace("ATRI", "ATRI")
except IndexError:
- await get_console.finish(f'行数错误...max: {len(logs)}') # type: ignore
-
+ await get_console.finish(f"行数错误...max: {len(logs)}") # type: ignore
+
data = FormData()
- data.add_field('poster', 'ATRI running log')
- data.add_field('syntax', 'text')
- data.add_field('expiration', 'day')
- data.add_field('content', repo)
-
+ data.add_field("poster", "ATRI running log")
+ data.add_field("syntax", "text")
+ data.add_field("expiration", "day")
+ data.add_field("content", repo)
+
msg0 = f"> {event.sender.nickname}\n"
msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}"
await track_error.finish(msg0)
@@ -80,41 +83,39 @@ __doc__ = """
"""
track_error = sv.on_command(
- cmd="track",
- aliases={'追踪'},
- docs=__doc__,
- permission=SUPERUSER
+ cmd="track", aliases={"追踪"}, docs=__doc__, permission=SUPERUSER
)
+
@track_error.args_parser # type: ignore
-async def _track_error_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await track_error.finish('好吧...')
+ await track_error.finish("好吧...")
if not msg:
- await track_error.reject('欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ')
+ await track_error.reject("欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ")
else:
- state['track'] = msg
+ state["track"] = msg
+
@track_error.handle()
async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['track'] = msg
+ state["track"] = msg
-@track_error.got('track', prompt='欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ')
+
+@track_error.got("track", prompt="欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ")
async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None:
- track_id = state['track']
+ track_id = state["track"]
data = dict()
-
+
try:
data = load_error(track_id)
except BaseException:
- await track_error.finish('未发现对应ID呢...(⇀‸↼‶)')
-
+ await track_error.finish("未发现对应ID呢...(⇀‸↼‶)")
+
msg = (
f"ID: [{track_id}]\n"
f"Time: [{data['time']}]\n"
@@ -122,13 +123,13 @@ async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None:
"——————\n"
f"{data['content']}"
)
-
+
data = FormData()
- data.add_field('poster', track_id)
- data.add_field('syntax', 'text')
- data.add_field('expiration', 'day')
- data.add_field('content', msg)
-
+ data.add_field("poster", track_id)
+ data.add_field("syntax", "text")
+ data.add_field("expiration", "day")
+ data.add_field("content", msg)
+
msg0 = f"> {event.sender.nickname}\n"
msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}"
await track_error.finish(msg0)
diff --git a/ATRI/plugins/manage/modules/dormant.py b/ATRI/plugins/manage/modules/dormant.py
index d5ae321..cd72d47 100644
--- a/ATRI/plugins/manage/modules/dormant.py
+++ b/ATRI/plugins/manage/modules/dormant.py
@@ -13,12 +13,10 @@ __doc__ = """
"""
dormant_enabled = sv.on_command(
- cmd='休眠',
- docs=__doc__,
- rule=to_bot(),
- permission=SUPERUSER
+ cmd="休眠", docs=__doc__, rule=to_bot(), permission=SUPERUSER
)
+
@dormant_enabled.handle()
async def _dormant_enabled(bot: Bot, event: MessageEvent) -> None:
sv.Dormant.control_dormant(True)
@@ -34,12 +32,10 @@ __doc__ = """
"""
dormant_disabled = sv.on_command(
- cmd='苏醒',
- docs=__doc__,
- rule=to_bot(),
- permission=SUPERUSER
+ cmd="苏醒", docs=__doc__, rule=to_bot(), permission=SUPERUSER
)
+
@dormant_disabled.handle()
async def _dormant_disabled(bot: Bot, event: MessageEvent) -> None:
sv.Dormant.control_dormant(False)
diff --git a/ATRI/plugins/manage/modules/request.py b/ATRI/plugins/manage/modules/request.py
index a09b721..26100b1 100644
--- a/ATRI/plugins/manage/modules/request.py
+++ b/ATRI/plugins/manage/modules/request.py
@@ -9,7 +9,7 @@ from ATRI.service import Service as sv
from ATRI.exceptions import ReadFileError, FormatError
-ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
+ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential"
__doc__ = """
@@ -19,16 +19,13 @@ __doc__ = """
查看申请列表
"""
-req_list = sv.on_command(
- cmd="申请列表",
- docs=__doc__,
- permission=SUPERUSER
-)
+req_list = sv.on_command(cmd="申请列表", docs=__doc__, permission=SUPERUSER)
+
@req_list.handle()
async def _req_list(bot: Bot, event: MessageEvent) -> None:
- path_f = ESSENTIAL_DIR / 'request_friend.json'
- path_g = ESSENTIAL_DIR / 'request_group.json'
+ path_f = ESSENTIAL_DIR / "request_friend.json"
+ path_g = ESSENTIAL_DIR / "request_group.json"
data_f, data_g = dict()
try:
data_f = json.loads(path_f.read_bytes())
@@ -38,7 +35,7 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None:
data_g = json.loads(path_g.read_bytes())
except ReadFileError:
msg_g = "[读取文件失败]"
-
+
msg_f = str()
for i in data_f.keys():
msg_f += f"{i} | {data_f[i]['user_id']} | {data_f[i]['comment']}\n"
@@ -46,41 +43,32 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None:
msg_g = str()
for i in data_g.keys():
msg_g += f"{i} | {data_g[i]['sub_type']} | {data_g[i]['user_id']} | {data_g[i]['comment']}\n"
-
- msg = (
- "好友/群申请列表如下:\n"
- "· 好友:\n"
- f"{msg_f}"
- "· 群:\n"
- f"{msg_g}"
- )
+
+ msg = "好友/群申请列表如下:\n" "· 好友:\n" f"{msg_f}" "· 群:\n" f"{msg_g}"
await req_list.finish(msg)
-req_deal = sv.on_regex(
- r'(同意|拒绝)(好友|群)申请',
- permission=SUPERUSER
-)
+req_deal = sv.on_regex(r"(同意|拒绝)(好友|群)申请", permission=SUPERUSER)
+
@req_deal.handle()
async def _req_deal(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(' ')
- arg = re.findall(r'(同意|拒绝)(好友|群)申请', msg[0])[0]
+ msg = str(event.message).split(" ")
+ arg = re.findall(r"(同意|拒绝)(好友|群)申请", msg[0])[0]
app = arg[0]
_type = arg[1]
-
+
if not msg[1]:
- await req_deal.finish(f'正确用法!速看\n{app}{_type}申请 (reqid)')
-
+ await req_deal.finish(f"正确用法!速看\n{app}{_type}申请 (reqid)")
+
reqid = msg[1]
if app == "同意":
if _type == "好友":
try:
- await bot.set_friend_add_request(flag=reqid,
- approve=True)
- await req_deal.finish('完成~!已同意申请')
+ await bot.set_friend_add_request(flag=reqid, approve=True)
+ await req_deal.finish("完成~!已同意申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
elif _type == "群":
path = ESSENTIAL_DIR / "request_group.json"
data_g = dict()
@@ -88,24 +76,23 @@ async def _req_deal(bot: Bot, event: MessageEvent) -> None:
data_g = json.loads(path.read_bytes())
except FileExistsError:
await req_deal.finish("读取群数据失败,可能并没有请求...")
-
+
try:
- await bot.set_group_add_request(flag=reqid,
- sub_type=data_g[reqid]['sub_type'],
- approve=True)
- await req_deal.finish('完成~!已同意申请')
+ await bot.set_group_add_request(
+ flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=True
+ )
+ await req_deal.finish("完成~!已同意申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
else:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
elif app == "拒绝":
if _type == "好友":
try:
- await bot.set_friend_add_request(flag=reqid,
- approve=False)
- await req_deal.finish('完成~!已拒绝申请')
+ await bot.set_friend_add_request(flag=reqid, approve=False)
+ await req_deal.finish("完成~!已拒绝申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
elif _type == "群":
path = ESSENTIAL_DIR / "request_group.json"
data_g = dict()
@@ -113,15 +100,15 @@ async def _req_deal(bot: Bot, event: MessageEvent) -> None:
data_g = json.loads(path.read_bytes())
except FileExistsError:
await req_deal.finish("读取群数据失败,可能并没有请求...")
-
+
try:
- await bot.set_group_add_request(flag=reqid,
- sub_type=data_g[reqid]['sub_type'],
- approve=False)
- await req_deal.finish('完成~!已拒绝申请')
+ await bot.set_group_add_request(
+ flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=False
+ )
+ await req_deal.finish("完成~!已拒绝申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
else:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
else:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
diff --git a/ATRI/plugins/manage/modules/service.py b/ATRI/plugins/manage/modules/service.py
index 6489ad6..a3624df 100644
--- a/ATRI/plugins/manage/modules/service.py
+++ b/ATRI/plugins/manage/modules/service.py
@@ -5,7 +5,7 @@ from nonebot.adapters.cqhttp import (
Bot,
MessageEvent,
GroupMessageEvent,
- PrivateMessageEvent
+ PrivateMessageEvent,
)
from ATRI.service import Service as sv
@@ -21,46 +21,40 @@ __doc__ = """
"""
cur_service_ena = sv.on_command(
- cmd="启用功能",
- docs=__doc__,
- permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER
+ cmd="启用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER
)
+
@cur_service_ena.args_parser # type: ignore
-async def _cur_ena_load(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_ena_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await cur_service_ena.finish('好吧...')
+ await cur_service_ena.finish("好吧...")
if not msg:
- await cur_service_ena.reject('请告诉咱目标命令!')
+ await cur_service_ena.reject("请告诉咱目标命令!")
else:
- state['service_e'] = msg
+ state["service_e"] = msg
+
@cur_service_ena.handle()
-async def _cur_ena(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_e'] = msg
+ state["service_e"] = msg
-@cur_service_ena.got('service_e', prompt='请告诉咱目标命令!')
-async def _deal_cur_ena(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
- cmd = state['service_e']
+
+@cur_service_ena.got("service_e", prompt="请告诉咱目标命令!")
+async def _deal_cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+ cmd = state["service_e"]
group = str(event.group_id)
sv.control_service(cmd, False, True, group=group)
- await cur_service_ena.finish(f'成功!本群已启用:{cmd}')
+ await cur_service_ena.finish(f"成功!本群已启用:{cmd}")
+
@cur_service_ena.handle()
-async def _refuse_cur_ena(bot: Bot,
- event: PrivateMessageEvent,
- state: T_State) -> None:
- await cur_service_ena.finish('只能在群聊中决定哦...')
+async def _refuse_cur_ena(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None:
+ await cur_service_ena.finish("只能在群聊中决定哦...")
__doc__ = """
@@ -73,46 +67,40 @@ __doc__ = """
"""
cur_service_dis = sv.on_command(
- cmd="禁用功能",
- docs=__doc__,
- permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER
+ cmd="禁用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER
)
+
@cur_service_dis.args_parser # type: ignore
-async def _cur_dis_load(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_dis_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await cur_service_dis.finish('好吧...')
+ await cur_service_dis.finish("好吧...")
if not msg:
- await cur_service_dis.reject('请告诉咱目标命令!')
+ await cur_service_dis.reject("请告诉咱目标命令!")
else:
- state['service_d'] = msg
+ state["service_d"] = msg
+
@cur_service_dis.handle()
-async def _cur_dis(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_d'] = msg
+ state["service_d"] = msg
-@cur_service_dis.got('service_d', prompt='请告诉咱目标命令!')
-async def _deal_cur_dis(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
- cmd = state['service_d']
+
+@cur_service_dis.got("service_d", prompt="请告诉咱目标命令!")
+async def _deal_cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+ cmd = state["service_d"]
group = str(event.group_id)
sv.control_service(cmd, False, False, group=group)
- await cur_service_dis.finish(f'成功!本群已禁用:{cmd}')
+ await cur_service_dis.finish(f"成功!本群已禁用:{cmd}")
+
@cur_service_dis.handle()
-async def _refuse_cur_dis(bot: Bot,
- event: PrivateMessageEvent,
- state: T_State) -> None:
- await cur_service_dis.finish('只能在群聊中决定哦...')
+async def _refuse_cur_dis(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None:
+ await cur_service_dis.finish("只能在群聊中决定哦...")
__doc__ = """
@@ -124,40 +112,33 @@ __doc__ = """
全局启用 以图搜图
"""
-glo_service_ena = sv.on_command(
- cmd="全局启用",
- docs=__doc__,
- permission=SUPERUSER
-)
+glo_service_ena = sv.on_command(cmd="全局启用", docs=__doc__, permission=SUPERUSER)
+
@glo_service_ena.args_parser # type: ignore
-async def _glo_ena_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_ena_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await glo_service_ena.finish('好吧...')
+ await glo_service_ena.finish("好吧...")
if not msg:
- await glo_service_ena.reject('请告诉咱目标命令!')
+ await glo_service_ena.reject("请告诉咱目标命令!")
else:
- state['service_e_g'] = msg
+ state["service_e_g"] = msg
+
@glo_service_ena.handle()
-async def _glo_ena(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_e_g'] = msg
+ state["service_e_g"] = msg
+
-@glo_service_ena.got('service_e_g', prompt='请告诉咱目标命令!')
-async def _deal_glo_ena(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- cmd = state['service_e_g']
+@glo_service_ena.got("service_e_g", prompt="请告诉咱目标命令!")
+async def _deal_glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ cmd = state["service_e_g"]
sv.control_service(cmd, True, True)
- await glo_service_ena.finish(f'成功!已全局启用:{cmd}')
+ await glo_service_ena.finish(f"成功!已全局启用:{cmd}")
__doc__ = """
@@ -169,37 +150,30 @@ __doc__ = """
全局禁用 以图搜图
"""
-glo_service_dis = sv.on_command(
- cmd="全局禁用",
- docs=__doc__,
- permission=SUPERUSER
-)
+glo_service_dis = sv.on_command(cmd="全局禁用", docs=__doc__, permission=SUPERUSER)
+
@glo_service_dis.args_parser # type: ignore
-async def _glo_dis_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_dis_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await glo_service_dis.finish('好吧...')
+ await glo_service_dis.finish("好吧...")
if not msg:
- await glo_service_dis.reject('请告诉咱目标命令!')
+ await glo_service_dis.reject("请告诉咱目标命令!")
else:
- state['service_d_g'] = msg
+ state["service_d_g"] = msg
+
@glo_service_dis.handle()
-async def _glo_dis(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_d_g'] = msg
+ state["service_d_g"] = msg
+
-@glo_service_dis.got('service_d_g', prompt='请告诉咱目标命令!')
-async def _deal_glo_dis(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- cmd = state['service_d_g']
+@glo_service_dis.got("service_d_g", prompt="请告诉咱目标命令!")
+async def _deal_glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ cmd = state["service_d_g"]
sv.control_service(cmd, True, False)
- await glo_service_dis.finish(f'成功!已全局禁用:{cmd}')
+ await glo_service_dis.finish(f"成功!已全局禁用:{cmd}")
diff --git a/ATRI/plugins/manage/modules/shutdown.py b/ATRI/plugins/manage/modules/shutdown.py
index 11b2b1b..78f23e8 100644
--- a/ATRI/plugins/manage/modules/shutdown.py
+++ b/ATRI/plugins/manage/modules/shutdown.py
@@ -12,11 +12,8 @@ __doc__ = """
@ 关机
"""
-shutdown = sv.on_command(
- cmd="关机",
- docs=__doc__,
- permission=SUPERUSER
-)
+shutdown = sv.on_command(cmd="关机", docs=__doc__, permission=SUPERUSER)
+
@shutdown.handle()
async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
@@ -24,9 +21,10 @@ async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
if msg:
state["msg"] = msg
+
@shutdown.got("msg", prompt="[WARNING]此项操作将强行终止bot运行,是否继续(y/n)")
async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
- t = ['y', 'Y', '是']
+ t = ["y", "Y", "是"]
if state["msg"] in t:
await bot.send(event, "咱还会醒来的,一定")
exit(0)