summaryrefslogtreecommitdiff
path: root/ATRI/plugins/plugin_admin/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/plugin_admin/__init__.py')
-rw-r--r--ATRI/plugins/plugin_admin/__init__.py164
1 files changed, 70 insertions, 94 deletions
diff --git a/ATRI/plugins/plugin_admin/__init__.py b/ATRI/plugins/plugin_admin/__init__.py
index b83da69..7dd18e7 100644
--- a/ATRI/plugins/plugin_admin/__init__.py
+++ b/ATRI/plugins/plugin_admin/__init__.py
@@ -1,6 +1,5 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
-
'''
@File : __init__.py
@Time : 2020/10/11 14:37:53
@@ -18,13 +17,14 @@ from pathlib import Path
from random import randint
from nonebot.plugin import on_command
-from nonebot.adapters.cqhttp import Bot, Event
+from nonebot.typing import Bot, Event
from nonebot.permission import GROUP_ADMIN, GROUP_OWNER, SUPERUSER
-from utils.utils_yml import load_yaml
-from utils.utils_error import errorRepo
-from utils.utils_rule import check_banlist
-from utils.utils_switch import controlSwitch
+from ATRI.utils.utils_yml import load_yaml
+from ATRI.utils.utils_error import errorRepo
+from ATRI.utils.utils_rule import check_banlist
+from ATRI.utils.utils_textcheck import PUBLIC_OPINION_PATH, Textcheck
+from ATRI.utils.utils_switch import controlSwitch
CONFIG_PATH = Path('.') / 'config.yml'
master = load_yaml(CONFIG_PATH)['bot']['superusers']
@@ -34,26 +34,25 @@ switch = on_command('switch',
permission=(SUPERUSER | GROUP_OWNER | GROUP_ADMIN))
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
-
func = str(event.message).strip()
+ SWITCH_PATH = Path('.') / 'ATRI' / 'utils' / 'utils_rule' / 'switch.json'
+ with open(SWITCH_PATH, 'r') as f:
+ data = json.load(f)
+
if not func:
msg0 = "-==ATRI Switch Control System==-\n"
- msg0 += "┌Usage: switch on/off-{service}\n"
- msg0 += "├For SUPERUSER:\n"
- msg0 += "│ └Usage: switch all-on/off-{service}\n"
- msg0 += "└Service:\n"
- msg0 += " ├anime-setu\n"
- msg0 += " ├anime-pic-search\n"
- msg0 += " ├anime-vid-search\n"
- msg0 += " ├ai-face\n"
- msg0 += " ├pixiv-pic-search\n"
- msg0 += " ├pixiv-author-search\n"
- msg0 += " └pixiv-rank"
+ msg0 += "Usage: switch on/off-{service}\n"
+ msg0 += "* For SUPERUSER:\n"
+ msg0 += " - Usage: switch all-on/off-{service}\n"
+ msg0 += "Service:\n"
+
+ for i in data.keys():
+ msg0 += f" {i}\n"
await switch.finish(msg0)
@@ -84,128 +83,105 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
# 舆情监控系统
-publicOpinion = on_command("舆情",
- rule=check_banlist(),
- permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER)
-data_PO = Path(
- '.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'public_opinion.json'
+publicOpinion = on_command("舆情", rule=check_banlist(), permission=SUPERUSER)
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
msg = str(event.message).strip().split(' ')
+ with open(PUBLIC_OPINION_PATH, 'r') as f:
+ data = json.load(f)
+
if msg[0] == '':
msg0 = "---=====ATRI POM System=====---\n"
msg0 += "Usage:\n"
- msg0 += " - 舆情 [key] [times] [ban time(bot)] [repo]\n"
+ msg0 += " - 舆情 [key] [repo] [times] [ban time(bot)]\n"
+ msg0 += " - 舆情 del [key]\n"
+ msg0 += " - 舆情 list\n"
msg0 += "Tips:\n"
msg0 += " - 非 SUPERU 只能设置本群\n"
msg0 += " - SUPERU 需在后跟随 -a 以启用全局效果\n"
msg0 += " - 参数类型:\n"
msg0 += " * key: 关键词(将使用正则匹配)\n"
+ msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img\n"
msg0 += " * times: 容忍次数(n>0, int)\n"
- msg0 += " * ban time: bot对其失效时间(min, int)\n"
- msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img"
+ msg0 += " * ban time: bot对其失效时间(min, int)"
await publicOpinion.finish(msg0)
+ if msg[0] == 'del':
+ await publicOpinion.finish(Textcheck().del_word(msg[1]))
+
+ if msg[0] == 'list':
+ msg0 = "舆情检测列表如下:\n"
+ for w in data.keys():
+ msg0 += f' {w}\n'
+
if msg[0] and msg[1] and msg[2] and msg[3]:
pass
else:
msg0 = "请检查格式奥~!\n"
- msg0 += "舆情 [key] [times] [ban time(bot)] [repo]\n"
+ msg0 += "舆情 [key] [repo] [times] [ban time(bot)]\n"
msg0 += " * key: 关键词(将使用正则匹配)\n"
+ msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img\n"
msg0 += " * times: 容忍次数(n>0, int)\n"
- msg0 += " * ban time: bot对其失效时间(min, int)\n"
- msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img"
+ msg0 += " * ban time: bot对其失效时间(min, int)"
await publicOpinion.finish(msg0)
- key_word = msg[0]
- remind = msg[1]
- punish = msg[2]
- repo = msg[3]
+ key = msg[0]
+ repo = msg[1]
+ max_times = msg[2]
+ ban_time = msg[3]
- if key_word and remind and punish and repo:
- if re.findall(r"/^\d{1,}$/", remind) and re.findall(
- r"/^\d{1,}$/", punish):
- pass
-
- else:
+ if key and repo and max_times and ban_time:
+ if not re.findall(r"/^\d{1,}$/", max_times) and re.findall(
+ r"/^\d{1,}$/", ban_time):
await publicOpinion.finish("非法字符!请注意(times, ban time)类型为int(阿拉伯数字)"
)
else:
- await publicOpinion.finish("请键入完整信息!\n如需帮助,请键入 舆情")
+ await publicOpinion.finish("请键入完整信息!\n如需帮助,请键入:舆情")
if repo == "img":
- state["key_word"] = key_word
- state["remind"] = remind
- state["punish"] = punish
+ state["key"] = key
+ state["max_times"] = max_times
+ state["ban_time"] = ban_time
else:
- try:
- with open(data_PO, "r") as f:
- data = json.load(f)
- except FileNotFoundError:
- data = {}
-
- data[key_word] = [remind, punish, repo]
-
- with open(data_PO, "w") as f:
- f.write(json.dumps(data))
- f.close()
-
- msg0 = "舆情信息记录完成~!\n"
- msg0 += f"Keyword: {key_word}\n"
- msg0 += f"Times: {remind}\n"
- msg0 += f"Ban time: {punish}\n"
- msg0 += f"Repo: {repo}"
-
- await publicOpinion.finish(msg0)
+ await publicOpinion.finish(Textcheck().add_word(
+ key, repo, int(max_times), int(ban_time)))
[email protected]("repo", prompt="检测到 repo 类型为 img,请发送一张图片") # type: ignore
[email protected]("repo", prompt="检测到 repo 类型为 img,请发送一张图片")
async def _(bot: Bot, event: Event, state: dict) -> None:
- key_word = state["key_word"]
- remind = state["remind"]
- punish = state["punish"]
+ key = state["key"]
repo = state["repo"]
+ max_times = state["max_times"]
+ ban_time = state["ban_time"]
if "[CQ:image" not in repo:
await publicOpinion.reject("请发送一张图片而不是图片以外的东西~!(")
- try:
- with open(data_PO, "r") as f:
- data = json.load(f)
- except FileNotFoundError:
- data = {}
-
- data[key_word] = [remind, punish, repo]
-
- with open(data_PO, "w") as f:
- f.write(json.dumps(data))
- f.close()
-
- msg0 = "舆情信息记录完成~!\n"
- msg0 += f"Keyword: {key_word}\n"
- msg0 += f"Times: {remind}\n"
- msg0 += f"Ban time: {punish}\n"
- msg0 += f"Repo: {repo}"
-
- await publicOpinion.finish(msg0)
+ await publicOpinion.finish(Textcheck().add_word(key, repo, int(max_times),
+ int(ban_time)))
trackError = on_command("track", permission=SUPERUSER)
file_error = Path('.') / 'ATRI' / 'data' / 'data_Error' / 'error.json'
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- track_id = str(event.message).strip()
+ args = str(event.message).strip()
+
+ if args:
+ state['track_id'] = args
+
- if not track_id:
- await trackError.finish("请告诉咱追踪ID嗷~!不然无法获取错误堆栈呢!!")
[email protected]('track_id', prompt='请告诉咱追踪ID嗷~!不然无法获取错误堆栈呢!!')
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ track_id = state['track_id']
data = {}
@@ -230,7 +206,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
groupSendMessage = on_command("群发", permission=SUPERUSER)
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
args = str(event.message).strip()
@@ -238,7 +214,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
state['msg'] = args
[email protected]('msg', prompt='请告诉咱需要群发的内容~!') # type: ignore
[email protected]('msg', prompt='请告诉咱需要群发的内容~!')
async def _(bot: Bot, event: Event, state: dict) -> None:
msg = state['msg']
group_list = await bot.get_group_list()
@@ -251,7 +227,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
for group in group_list:
if group['group_id'] not in ban_group_list:
- asyncio.sleep(randint(1,5))
+ asyncio.sleep(randint(1, 5))
try:
await bot.send_group_msg(group_id=group['group_id'],
message=msg)