diff options
Diffstat (limited to 'ATRI/plugins/plugin_admin/__init__.py')
-rw-r--r-- | ATRI/plugins/plugin_admin/__init__.py | 164 |
1 files changed, 70 insertions, 94 deletions
diff --git a/ATRI/plugins/plugin_admin/__init__.py b/ATRI/plugins/plugin_admin/__init__.py index b83da69..7dd18e7 100644 --- a/ATRI/plugins/plugin_admin/__init__.py +++ b/ATRI/plugins/plugin_admin/__init__.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 # -*- encoding: utf-8 -*- - ''' @File : __init__.py @Time : 2020/10/11 14:37:53 @@ -18,13 +17,14 @@ from pathlib import Path from random import randint from nonebot.plugin import on_command -from nonebot.adapters.cqhttp import Bot, Event +from nonebot.typing import Bot, Event from nonebot.permission import GROUP_ADMIN, GROUP_OWNER, SUPERUSER -from utils.utils_yml import load_yaml -from utils.utils_error import errorRepo -from utils.utils_rule import check_banlist -from utils.utils_switch import controlSwitch +from ATRI.utils.utils_yml import load_yaml +from ATRI.utils.utils_error import errorRepo +from ATRI.utils.utils_rule import check_banlist +from ATRI.utils.utils_textcheck import PUBLIC_OPINION_PATH, Textcheck +from ATRI.utils.utils_switch import controlSwitch CONFIG_PATH = Path('.') / 'config.yml' master = load_yaml(CONFIG_PATH)['bot']['superusers'] @@ -34,26 +34,25 @@ switch = on_command('switch', permission=(SUPERUSER | GROUP_OWNER | GROUP_ADMIN)) [email protected]() # type: ignore async def _(bot: Bot, event: Event, state: dict) -> None: user = str(event.user_id) group = str(event.group_id) - func = str(event.message).strip() + SWITCH_PATH = Path('.') / 'ATRI' / 'utils' / 'utils_rule' / 'switch.json' + with open(SWITCH_PATH, 'r') as f: + data = json.load(f) + if not func: msg0 = "-==ATRI Switch Control System==-\n" - msg0 += "┌Usage: switch on/off-{service}\n" - msg0 += "├For SUPERUSER:\n" - msg0 += "│ └Usage: switch all-on/off-{service}\n" - msg0 += "└Service:\n" - msg0 += " ├anime-setu\n" - msg0 += " ├anime-pic-search\n" - msg0 += " ├anime-vid-search\n" - msg0 += " ├ai-face\n" - msg0 += " ├pixiv-pic-search\n" - msg0 += " ├pixiv-author-search\n" - msg0 += " └pixiv-rank" + msg0 += "Usage: switch on/off-{service}\n" + msg0 += "* For SUPERUSER:\n" + msg0 += " - Usage: switch all-on/off-{service}\n" + msg0 += "Service:\n" + + for i in data.keys(): + msg0 += f" {i}\n" await switch.finish(msg0) @@ -84,128 +83,105 @@ async def _(bot: Bot, event: Event, state: dict) -> None: # 舆情监控系统 -publicOpinion = on_command("舆情", - rule=check_banlist(), - permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER) -data_PO = Path( - '.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'public_opinion.json' +publicOpinion = on_command("舆情", rule=check_banlist(), permission=SUPERUSER) [email protected]() # type: ignore async def _(bot: Bot, event: Event, state: dict) -> None: msg = str(event.message).strip().split(' ') + with open(PUBLIC_OPINION_PATH, 'r') as f: + data = json.load(f) + if msg[0] == '': msg0 = "---=====ATRI POM System=====---\n" msg0 += "Usage:\n" - msg0 += " - 舆情 [key] [times] [ban time(bot)] [repo]\n" + msg0 += " - 舆情 [key] [repo] [times] [ban time(bot)]\n" + msg0 += " - 舆情 del [key]\n" + msg0 += " - 舆情 list\n" msg0 += "Tips:\n" msg0 += " - 非 SUPERU 只能设置本群\n" msg0 += " - SUPERU 需在后跟随 -a 以启用全局效果\n" msg0 += " - 参数类型:\n" msg0 += " * key: 关键词(将使用正则匹配)\n" + msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img\n" msg0 += " * times: 容忍次数(n>0, int)\n" - msg0 += " * ban time: bot对其失效时间(min, int)\n" - msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img" + msg0 += " * ban time: bot对其失效时间(min, int)" await publicOpinion.finish(msg0) + if msg[0] == 'del': + await publicOpinion.finish(Textcheck().del_word(msg[1])) + + if msg[0] == 'list': + msg0 = "舆情检测列表如下:\n" + for w in data.keys(): + msg0 += f' {w}\n' + if msg[0] and msg[1] and msg[2] and msg[3]: pass else: msg0 = "请检查格式奥~!\n" - msg0 += "舆情 [key] [times] [ban time(bot)] [repo]\n" + msg0 += "舆情 [key] [repo] [times] [ban time(bot)]\n" msg0 += " * key: 关键词(将使用正则匹配)\n" + msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img\n" msg0 += " * times: 容忍次数(n>0, int)\n" - msg0 += " * ban time: bot对其失效时间(min, int)\n" - msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img" + msg0 += " * ban time: bot对其失效时间(min, int)" await publicOpinion.finish(msg0) - key_word = msg[0] - remind = msg[1] - punish = msg[2] - repo = msg[3] + key = msg[0] + repo = msg[1] + max_times = msg[2] + ban_time = msg[3] - if key_word and remind and punish and repo: - if re.findall(r"/^\d{1,}$/", remind) and re.findall( - r"/^\d{1,}$/", punish): - pass - - else: + if key and repo and max_times and ban_time: + if not re.findall(r"/^\d{1,}$/", max_times) and re.findall( + r"/^\d{1,}$/", ban_time): await publicOpinion.finish("非法字符!请注意(times, ban time)类型为int(阿拉伯数字)" ) else: - await publicOpinion.finish("请键入完整信息!\n如需帮助,请键入 舆情") + await publicOpinion.finish("请键入完整信息!\n如需帮助,请键入:舆情") if repo == "img": - state["key_word"] = key_word - state["remind"] = remind - state["punish"] = punish + state["key"] = key + state["max_times"] = max_times + state["ban_time"] = ban_time else: - try: - with open(data_PO, "r") as f: - data = json.load(f) - except FileNotFoundError: - data = {} - - data[key_word] = [remind, punish, repo] - - with open(data_PO, "w") as f: - f.write(json.dumps(data)) - f.close() - - msg0 = "舆情信息记录完成~!\n" - msg0 += f"Keyword: {key_word}\n" - msg0 += f"Times: {remind}\n" - msg0 += f"Ban time: {punish}\n" - msg0 += f"Repo: {repo}" - - await publicOpinion.finish(msg0) + await publicOpinion.finish(Textcheck().add_word( + key, repo, int(max_times), int(ban_time))) [email protected]("repo", prompt="检测到 repo 类型为 img,请发送一张图片") # type: ignore [email protected]("repo", prompt="检测到 repo 类型为 img,请发送一张图片") async def _(bot: Bot, event: Event, state: dict) -> None: - key_word = state["key_word"] - remind = state["remind"] - punish = state["punish"] + key = state["key"] repo = state["repo"] + max_times = state["max_times"] + ban_time = state["ban_time"] if "[CQ:image" not in repo: await publicOpinion.reject("请发送一张图片而不是图片以外的东西~!(") - try: - with open(data_PO, "r") as f: - data = json.load(f) - except FileNotFoundError: - data = {} - - data[key_word] = [remind, punish, repo] - - with open(data_PO, "w") as f: - f.write(json.dumps(data)) - f.close() - - msg0 = "舆情信息记录完成~!\n" - msg0 += f"Keyword: {key_word}\n" - msg0 += f"Times: {remind}\n" - msg0 += f"Ban time: {punish}\n" - msg0 += f"Repo: {repo}" - - await publicOpinion.finish(msg0) + await publicOpinion.finish(Textcheck().add_word(key, repo, int(max_times), + int(ban_time))) trackError = on_command("track", permission=SUPERUSER) file_error = Path('.') / 'ATRI' / 'data' / 'data_Error' / 'error.json' [email protected]() # type: ignore async def _(bot: Bot, event: Event, state: dict) -> None: - track_id = str(event.message).strip() + args = str(event.message).strip() + + if args: + state['track_id'] = args + - if not track_id: - await trackError.finish("请告诉咱追踪ID嗷~!不然无法获取错误堆栈呢!!") [email protected]('track_id', prompt='请告诉咱追踪ID嗷~!不然无法获取错误堆栈呢!!') +async def _(bot: Bot, event: Event, state: dict) -> None: + track_id = state['track_id'] data = {} @@ -230,7 +206,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: groupSendMessage = on_command("群发", permission=SUPERUSER) [email protected]() # type: ignore async def _(bot: Bot, event: Event, state: dict) -> None: args = str(event.message).strip() @@ -238,7 +214,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: state['msg'] = args [email protected]('msg', prompt='请告诉咱需要群发的内容~!') # type: ignore [email protected]('msg', prompt='请告诉咱需要群发的内容~!') async def _(bot: Bot, event: Event, state: dict) -> None: msg = state['msg'] group_list = await bot.get_group_list() @@ -251,7 +227,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: for group in group_list: if group['group_id'] not in ban_group_list: - asyncio.sleep(randint(1,5)) + asyncio.sleep(randint(1, 5)) try: await bot.send_group_msg(group_id=group['group_id'], message=msg) |