diff options
Diffstat (limited to 'ATRI/plugins/plugin_admin/__init__.py')
-rw-r--r-- | ATRI/plugins/plugin_admin/__init__.py | 252 |
1 files changed, 0 insertions, 252 deletions
diff --git a/ATRI/plugins/plugin_admin/__init__.py b/ATRI/plugins/plugin_admin/__init__.py deleted file mode 100644 index a8bb61b..0000000 --- a/ATRI/plugins/plugin_admin/__init__.py +++ /dev/null @@ -1,252 +0,0 @@ -#!/usr/bin/env python3 -# -*- encoding: utf-8 -*- -''' -@File : __init__.py -@Time : 2020/10/11 14:37:53 -@Author : Kyomotoi -@Contact : [email protected] -@Github : https://github.com/Kyomotoi -@License : Copyright © 2018-2020 Kyomotoi, All Rights Reserved. -''' -__author__ = 'kyomotoi' - -import re -import json -import asyncio -from pathlib import Path -from random import choice, randint, sample - -from nonebot.plugin import on_command -from nonebot.typing import Bot, Event -from nonebot.permission import GROUP_ADMIN, GROUP_OWNER, SUPERUSER - -from ATRI.utils.utils_yml import load_yaml -from ATRI.utils.utils_error import errorRepo -from ATRI.utils.utils_rule import check_banlist -from ATRI.utils.utils_textcheck import PUBLIC_OPINION_PATH, Textcheck -from ATRI.utils.utils_switch import controlSwitch - -CONFIG_PATH = Path('.') / 'config.yml' -master = load_yaml(CONFIG_PATH)['bot']['superusers'] - -switch = on_command('/switch', - rule=check_banlist(), - permission=(SUPERUSER | GROUP_OWNER | GROUP_ADMIN)) - - -async def _(bot: Bot, event: Event, state: dict) -> None: - user = str(event.user_id) - group = str(event.group_id) - func = str(event.message).strip() - - SWITCH_PATH = Path('.') / 'ATRI' / 'utils' / 'utils_rule' / 'switch.json' - with open(SWITCH_PATH, 'r') as f: - data = json.load(f) - - if not func: - await switch.finish('请查看文档获取帮助(') - - funct = re.findall(r"[on|off]-(.*)", func) - - if "all-on" in func: - if int(user) in master: - await switch.finish(controlSwitch(funct[0], True)) - - else: - await switch.finish("Permission Denied") - - elif "all-off" in func: - if int(user) in master: - await switch.finish(controlSwitch(funct[0], False)) - - else: - await switch.finish("Permission Denied") - - elif "on" in func: - await switch.finish(controlSwitch(funct[0], True, group)) - - elif "off" in func: - await switch.finish(controlSwitch(funct[0], False, group)) - - else: - await switch.finish("请检查拼写是否正确嗷~~!") - - -# 舆情监控系统 -# Usage: -# - /pubopin [key] [repo] [times] [ban time(bot)] -# - /pubopin del [key] -# - /pubopin list -# Tips: -# - 参数类型: -# * key: 关键词(将使用正则匹配) -# * repo: 触发后的关键词(可选),如为图片,键入 img -# * times: 容忍次数(n>0, int) -# * ban time: bot对其失效时间(min, int) -publicOpinion = on_command("/pubopin", - rule=check_banlist(), - permission=SUPERUSER) - - -async def _(bot: Bot, event: Event, state: dict) -> None: - msg = str(event.message).strip().split(' ') - - with open(PUBLIC_OPINION_PATH, 'r') as f: - data = json.load(f) - - if msg[0] == '': - await publicOpinion.finish("请查看文档获取帮助(") - - if msg[0] == 'del': - await publicOpinion.finish(Textcheck().del_word(msg[1])) - - if msg[0] == 'list': - msg0 = "舆情检测列表如下:\n" - for w in data.keys(): - msg0 += f' {w}\n' - - if not msg[0] or not msg[1] or not msg[2] or not msg[3]: - await publicOpinion.finish("ごんめなさい...请检查格式嗷...") - - if not re.findall(r"/^\d{1,}$/", msg[2]) or not re.findall( - r"/^\d{1,}$/", msg[3]): - await publicOpinion.finish("非法字符!咱不接受除int以外的类型!!") - - if msg[1] == "img": - state["key"] = msg[0] - state["max_times"] = msg[2] - state["ban_time"] = msg[3] - - else: - await publicOpinion.finish(Textcheck().add_word( - msg[0], msg[1], int(msg[2]), int(msg[3]))) - - [email protected]("repo", prompt="检测到 repo 类型为 img,请发送一张图片") -async def _(bot: Bot, event: Event, state: dict) -> None: - key = state["key"] - repo = state["repo"] - max_times = state["max_times"] - ban_time = state["ban_time"] - - if "[CQ:image" not in repo: - await publicOpinion.reject("请发送一张图片而不是图片以外的东西~!(") - - await publicOpinion.finish(Textcheck().add_word(key, repo, int(max_times), - int(ban_time))) - - -trackError = on_command("/track", permission=SUPERUSER) -file_error = Path('.') / 'ATRI' / 'data' / 'data_Error' / 'error.json' - - -async def _(bot: Bot, event: Event, state: dict) -> None: - args = str(event.message).strip() - - if args: - state['track_id'] = args - - [email protected]('track_id', prompt='请告诉咱追踪ID嗷~!不然无法获取错误堆栈呢!!') -async def _(bot: Bot, event: Event, state: dict) -> None: - track_id = state['track_id'] - - data = {} - - try: - with open(file_error, 'r') as f: - data = json.load(f) - except FileNotFoundError: - await trackError.finish(errorRepo("读取文件时错误")) - - if track_id in data: - info_error = data[track_id] - - msg0 = f"trackID: {track_id}\n" - msg0 += info_error - - await trackError.finish(msg0) - - else: - await trackError.finish("未发现该ID") - - -groupSendMessage = on_command("/groupsend", permission=SUPERUSER) - - -async def _(bot: Bot, event: Event, state: dict) -> None: - args = str(event.message).strip() - - if args: - state['msg'] = args - - [email protected]('msg', prompt='请告诉咱需要群发的内容~!') -async def _(bot: Bot, event: Event, state: dict) -> None: - msg = state['msg'] - group_list = await bot.get_group_list() - sc_list = [] - err_list = [] - - for group in group_list: - asyncio.sleep(randint(2, 10)) - try: - await bot.send_group_msg(group_id=group['group_id'], - message=msg) - sc_list.append(group['group_id']) - except: - await bot.send(event, f"在尝试推送到群[{group['group_id']}]时失败了呢...") - err_list.append(group['group_id']) - - msg0 = "" - for i in err_list: - msg0 += f" {i}\n" - - repo_msg = f"推送信息:\n{msg}" - repo_msg += "\n————————\n" - repo_msg += f"总共:{len(group_list)}\n" - repo_msg += f"成功推送:{len(sc_list)}\n" - repo_msg += f"失败[{len(err_list)}]个:\n" - repo_msg += msg0 - - await groupSendMessage.finish(repo_msg) - - -# keyRepoAddReview = on_command('关键词审核', permission=SUPERUSER) -# KEY_PATH = Path('.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'key_repo.json' -# KEY_WAITING_PATH = Path( -# '.') / 'ATRI' / 'plugins' / 'plugin_admin' / 'key_repo_waiting.json' -# with open(KEY_PATH, 'r', encoding='utf-8') as f: -# data = json.load(f) -# with open(KEY_WAITING_PATH, 'r', encoding='utf-8') as f: -# data_rev = json.load(f) - - -# @keyRepoAddReview.got('rev') -# @keyRepoAddReview.args_parser -# async def _(bot: Bot, event: Event, state: dict) -> None: -# rev = state['rev'] -# key = sample(data_rev.keys(), 1) -# await bot.send( -# event, -# f'Key: {data_rev[key]}\nRepo: {data_rev[key][0]}\nProba: {data_rev[key][1]}\nSender: {data_rev[key][2]}\nGroup: {data_rev[key][3]}\nTime: {data_rev[key][4]}' -# ) - -# if rev == '歇了': -# await keyRepoAddReview.finish("むー……ご苦労様でしたよ。") -# else: -# if rev == '通过' or rev == '过' or rev == '好' or rev == 'y': -# await bot.send(event, '好!') -# data[data_rev[key]] = [ -# data_rev[key][0], data_rev[key][1], data_rev[key][2], -# data_rev[key][3], data_rev[key][4] -# ] -# with open(KEY_PATH, 'w') as f: -# f.write(data) -# elif rev == '不行' or rev == '不' or rev == 'n': -# del data_rev[key] -# await bot.send(event, '好8') |