summaryrefslogtreecommitdiff
path: root/ATRI/plugins/admin.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/admin.py')
-rw-r--r--ATRI/plugins/admin.py439
1 files changed, 0 insertions, 439 deletions
diff --git a/ATRI/plugins/admin.py b/ATRI/plugins/admin.py
deleted file mode 100644
index 85ed805..0000000
--- a/ATRI/plugins/admin.py
+++ /dev/null
@@ -1,439 +0,0 @@
-import json
-import asyncio
-from random import randint
-from pathlib import Path
-
-from nonebot.permission import SUPERUSER
-from nonebot.adapters.cqhttp.permission import GROUP_ADMIN, GROUP_OWNER
-from nonebot.adapters.cqhttp import (
- Bot,
- MessageEvent,
- GroupMessageEvent,
- PrivateMessageEvent
-)
-from nonebot.typing import T_State
-
-from ATRI.config import Config
-from ATRI.service import Service as sv
-from ATRI.exceptions import load_error
-from ATRI.utils.file import open_file
-
-
-ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
-
-
-__doc__ = """
-好友申请处理
-权限组:维护者
-用法:
- /friendreq list
- /friendreq (y/n) reqid
-补充:
- reqid: 申请码
-"""
-
-request_friend = sv.on_command(
- cmd="/friendreq",
- docs=__doc__,
- permission=SUPERUSER
-)
-
-@request_friend.handle()
-async def _request_friend(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(" ")
- key = msg[0]
- data = {}
- path = ESSENTIAL_DIR / "request_friend.json"
- try:
- data = json.loads(path.read_bytes())
- except:
- await request_friend.finish("读取数据失败,可能并没有请求...")
-
- if key == "list":
- msg0 = ""
- for i in data.keys():
- msg0 += f"{i} | {data[i]['user_id']} | {data[i]['comment']}\n"
-
- msg = "好友申请列表如下:\n"
- msg += msg0
- await request_friend.finish(msg)
-
- elif key == "y":
- arg = msg[1]
- await bot.set_friend_add_request(flag=arg, approve=True)
- await request_friend.finish(f"完成~!已同意 {data[arg]['user_id']} 的申请")
-
- elif key == "n":
- arg = msg[1]
- await bot.set_friend_add_request(flag=arg, approve=False)
- await request_friend.finish(f"完成~!已拒绝 {data[arg]['user_id']} 的申请")
-
- else:
- await request_friend.finish("阿...请检查输入——!")
-
-
-__doc__ = """
-群聊申请处理
-权限组:维护者
-用法:
- /groupreq list
- /groupreq (y/n) reqid
-补充:
- reqid: 申请码
-"""
-
-request_group = sv.on_command(
- cmd="/groupreq",
- docs=__doc__,
- permission=SUPERUSER
-)
-
-@request_group.handle()
-async def _request_group(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(" ")
- key = msg[0]
- data = {}
- path = ESSENTIAL_DIR / "request_group.json"
- try:
- data = json.loads(path.read_bytes())
- except FileExistsError:
- await request_friend.finish("读取数据失败,可能并没有请求...")
-
- if key == "list":
- msg0 = ""
- for i in data.keys():
- msg0 += f"{i} | {data[i]['sub_type']} | {data[i]['user_id']} | {data[i]['comment']}\n"
-
- msg = "群申请列表如下:\n"
- msg += msg0
- await request_friend.finish(msg)
-
- elif key == "y":
- arg = msg[1]
- try:
- await bot.set_group_add_request(flag=arg,
- sub_type=data[arg]['sub_type'],
- approve=False)
- await request_friend.finish(f"完成~!已同意 {data[arg]['user_id']} 的申请")
- except:
- await request_friend.finish("请检查输入的值是否正确——!")
-
- elif key == "n":
- arg = msg[1]
- try:
- await bot.set_group_add_request(flag=arg,
- sub_type=data[arg]['sub_type'],
- approve=False)
- await request_friend.finish(f"完成~!已拒绝 {data[arg]['user_id']} 的申请")
- except:
- await request_friend.finish("请检查输入的值是否正确——!")
-
- else:
- await request_friend.finish("阿...请检查输入——!")
-
-
-__doc__ = """
-广播
-权限组:维护者
-用法:
- /bc 内容
-"""
-
-broadcast = sv.on_command(
- cmd="/bc",
- docs=__doc__,
- permission=SUPERUSER
-)
-
-async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
- if msg:
- state["msg"] = msg
-
[email protected]("msg", prompt="请告诉咱需要群发的内容~!")
-async def _bd(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = state["msg"]
- group_list = await bot.get_group_list()
- succ_list = []
- err_list = []
-
- for group in group_list:
- await asyncio.sleep(randint(0, 2))
- try:
- await bot.send_group_msg(group_id=group["group_id"],
- message=msg)
- except:
- err_list.append(group["group_id"])
-
- msg0 = ""
- for i in err_list:
- msg0 += f" {i}\n"
-
- repo_msg = (
- f"推送消息:\n{msg}\n"
- "————————\n"
- f"总共:{len(group_list)}\n"
- f"成功推送:{len(succ_list)}\n"
- f"失败[{len(err_list)}]个:\n"
- ) + msg0
-
- await broadcast.finish(repo_msg)
-
-
-__doc__ = """
-错误堆栈查看
-权限组:维护者
-用法:
- /track 追踪ID
-"""
-
-track_error = sv.on_command(
- cmd="/track",
- docs=__doc__,
- permission=SUPERUSER
-)
-
-@track_error.handle()
-async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
- if msg:
- state["msg"] = msg
-
-@track_error.got("msg", prompt="请告诉咱追踪ID!")
-async def _(bot: Bot, event: MessageEvent, state: T_State) -> None:
- track_id = state["msg"]
- data = {}
-
- try:
- data = load_error(track_id)
- except:
- await track_error.finish("Ignore track ID!")
-
- msg0 = (
- f"ID: {track_id}\n"
- f"Time: {data['time']}\n"
- f"Prompt: {data['prompt']}\n"
- f"{data['content']}"
- )
-
- await track_error.finish(msg0)
-
-
-__doc__ = """
-获取控制台信息
-权限组:维护者
-用法:
- /getlog level line
-补充:
- level: 等级(info, warning, error, debug)
- line: 行数(最近20行:-20)
-"""
-
-get_log = sv.on_command(
- cmd="/getlog",
- docs=__doc__,
- permission=SUPERUSER
-)
-
-@get_log.handle()
-async def _get_log(bot: Bot, event: GroupMessageEvent) -> None:
- user = str(event.user_id)
- group = event.group_id
- node = []
- msg = str(event.message).split(" ")
- try:
- rows = msg[1]
- except:
- await get_log.finish("格式/gl level rows")
-
- if msg[0] == "info":
- level = "info"
- elif msg[0] == "warning":
- level = "warning"
- elif msg[0] == "error":
- level = "error"
- elif msg[0] == "debug":
- level = "debug"
- else:
- await get_log.finish("格式/gl level rows")
-
- path = LOGGER_DIR / level / f"{NOW_TIME}-INFO.log" # type: ignore
- logs = await open_file(path, "readlines")
-
- try:
- content = logs[int(rows):] # type: ignore
- repo = "\n".join(content).replace("ATRI", "ATRI")
- node = [{
- "type": "node",
- "data": {"name": "ERROR REPO", "uin": user, "content": repo}
- }]
- except IndexError:
- await get_log.finish(f"行数错误...max: {len(logs)}") # type: ignore
-
- await bot.send_group_forward_msg(group_id=group, messages=node)
-
-
-__doc__ = """
-紧急停机
-权限组:维护者
-用法:
- /down
-"""
-
-shutdown = sv.on_command(
- cmd="/down",
- docs=__doc__,
- permission=SUPERUSER
-)
-
-async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
- if msg:
- state["msg"] = msg
-
[email protected]("msg", prompt="WARNING,此项操作将强行终止bot运行,是否继续(y/n)")
-async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
- if state["msg"] == "y":
- await bot.send(event, "咱还会醒来的,一定")
- exit(0)
- else:
- await shutdown.finish("再考虑下先吧 ;w;")
-
-
-__doc__ = """
-懒得和你废话,block了
-权限组:维护者
-用法:
- /block (u,g) (int) (0,1)
-补充:
- u:QQ
- g:QQ群
- int: 对应号码
- 0,1:对应布尔值False, True
- 范围为全局
-示例:
- /block u 114514 1
- 执行对QQ号为114514的封禁
-"""
-
-block = sv.on_command(
- cmd="/block",
- docs=__doc__,
- permission=SUPERUSER
-)
-
-async def _block(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(' ')
- _type = msg[0]
- arg = msg[1]
- is_enabled = bool(int(msg[2]))
- b_type = ""
-
- status = "封禁" if is_enabled else "解封"
-
- if _type == "g":
- sv.BlockSystem.control_list(is_enabled=is_enabled, group=arg)
- b_type = "Group"
- elif _type == "u":
- sv.BlockSystem.control_list(is_enabled, user=arg)
- b_type = "User"
- else:
- await block.finish("请检查输入...")
-
- await block.finish(f"已成功将[{b_type}@{arg}]{status}")
-
-
-__doc__ = """
-功能开关控制
-权限组:维护者,群管理
-用法:
- 对于维护者:
- /service 目标指令 u+uid,g+gid,global 0,1
- 对于群管理:
- /service 目标指令 0,1
-补充:
- user:QQ号
- group:QQ群号
- global:全局
- 0,1:对应布尔值False, True
-示例:
- 对于维护者:
- /service /status u123456789 1
- 对于群管理:
- /service /status 1
-"""
-
-service_control = sv.on_command(
- cmd='/service',
- docs=__doc__,
- permission=SUPERUSER|GROUP_OWNER|GROUP_ADMIN
-)
-
-@service_control.handle()
-async def _service_control(bot: Bot, event: GroupMessageEvent) -> None:
- msg = str(event.message).split(' ')
- user = str(event.user_id)
- cmd = msg[0]
- _type = msg[1]
-
- if msg[0] == "":
- await service_control.finish('请检查输入~!')
-
- if user in Config.BotSelfConfig.superusers:
- is_enabled = int(msg[2])
- status = "启用" if bool(is_enabled) else "禁用"
-
- if _type == "global":
- sv.control_service(cmd, True, is_enabled)
- await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
- else:
- print(_type)
- if "u" in _type:
- qq = _type.replace('u', '')
- sv.control_service(cmd, False, is_enabled, user=qq)
- elif "g" in _type:
- group = _type.replace('g', '')
- sv.control_service(cmd, False, is_enabled, group=group)
- else:
- await service_control.finish("请检查输入~!")
- await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
- else:
- group = str(event.group_id)
- is_enabled = int(_type)
- sv.control_service(cmd, False, is_enabled, group=group)
- status = "启用" if bool(is_enabled) else "禁用"
- await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
-
-@service_control.handle()
-async def _serv(bot: Bot, event: PrivateMessageEvent) -> None:
- await service_control.finish("此功能仅在群聊中触发")
-
-
-__doc__ = """
-休眠bot,不处理任何信息
-权限组:维护者
-用法:
- /dormant (0,1)
-补充:
- 0,1: 对应布尔值(False,True)
-"""
-
-dormant = sv.on_command(
- cmd='/dormant',
- docs=__doc__,
- permission=SUPERUSER
-)
-
-async def _dormant(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).strip()
- if msg == "1":
- sv.Dormant.control_dormant(True)
- stat = "已进入休眠状态...期间咱不会回应任何人的消息哦..."
- else:
- sv.Dormant.control_dormant(False)
- stat = "唔...回复精神力!"
- await dormant.finish(stat)