summaryrefslogtreecommitdiff
path: root/ATRI
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2020-11-07 13:54:57 +0800
committerKyomotoi <[email protected]>2020-11-07 13:54:57 +0800
commit7cae371b51a14c626ce184987eea2392e15430b9 (patch)
treea35aa21a64dad59a8dc91270d78f781dbed8953d /ATRI
parent11e4632aaf2be56c776dbc4e9f0ad5065bb60b5f (diff)
downloadATRI-7cae371b51a14c626ce184987eea2392e15430b9.tar.gz
ATRI-7cae371b51a14c626ce184987eea2392e15430b9.tar.bz2
ATRI-7cae371b51a14c626ce184987eea2392e15430b9.zip
[Update]
Diffstat (limited to 'ATRI')
-rw-r--r--ATRI/__init__.py81
-rw-r--r--ATRI/data/data_HTML/api/data/times/api_all.json1
-rw-r--r--ATRI/data/data_HTML/api/data/times/api_index.json1
-rw-r--r--ATRI/data/data_Log/message_private.json2
-rw-r--r--ATRI/plugins/plugin_admin/__init__.py296
-rw-r--r--ATRI/plugins/plugin_anime/__init__.py305
-rw-r--r--ATRI/plugins/plugin_anime/body.py16
-rw-r--r--ATRI/plugins/plugin_chat/__init__.py192
-rw-r--r--ATRI/plugins/plugin_pixiv/__init__.py140
-rw-r--r--ATRI/plugins/plugin_rich/__init__.py182
-rw-r--r--ATRI/plugins/plugin_sqlite/__init__.py45
-rw-r--r--ATRI/plugins/plugin_status/__init__.py150
-rw-r--r--ATRI/plugins/plugin_test/__init__.py23
-rw-r--r--ATRI/plugins/plugin_utils/__init__.py63
14 files changed, 740 insertions, 757 deletions
diff --git a/ATRI/__init__.py b/ATRI/__init__.py
deleted file mode 100644
index fd5b8f6..0000000
--- a/ATRI/__init__.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-'''
-@File : __init__.py
-@Time : 2020/10/11 14:42:47
-@Author : Kyomotoi
-@Contact : [email protected]
-@Github : https://github.com/Kyomotoi
-@License : Copyright © 2018-2020 Kyomotoi, All Rights Reserved.
-'''
-__author__ = 'kyomotoi'
-
-import json
-from pathlib import Path
-from nonebot.log import logger
-
-from utils.utils_switch import controlSwitch
-
-file_key = Path('.') / 'key.json'
-
-try:
- with open(file_key, 'r') as f:
- data_key = json.load(f)
-except:
- data_key = {}
- data_key["API"]["LoliconAPI"] = ""
- data_key["API"]["FaceplusAPI"] = ""
- data_key["API"]["FaceplusSECRET"] = ""
- data_key["API"]["SauceNaoKEY"] = ""
-
- data_key["config"]["SUPERUSERS"] = [0]
-
-if data_key["API"]["LoliconAPI"]:
- logger.info("Succeeded to load key: LoliconAPI")
-else:
- logger.error("Key: LoliconAPI Can't find! URL: https://api.lolicon.app/#/setu")
- key_LoliconAPI = input("Please enter: (Enter KEY or enter 'pass' to pass)")
- if key_LoliconAPI == "pass":
- logger.error("Pass! Now func(setu) use local content.")
- data_key["API"]["LoliconAPI"] = key_LoliconAPI
-
-if data_key["API"]["FaceplusAPI"]:
- logger.info("Succeeded to load key: FaceplusAPI")
-else:
- logger.error("Key: FaceplusAPI Can't find! URL: https://www.faceplusplus.com.cn/")
- key_FaceplusAPI = input("Please enter: (Enter KEY or enter 'pass' to pass)")
- if key_FaceplusAPI == "pass":
- logger.error("Pass! This func(aichangeface) has been closed NOW!")
- controlSwitch("ai-face", False)
- data_key["API"]["FaceplusAPI"] = key_FaceplusAPI
-
-if data_key["API"]["FaceplusSECRET"]:
- logger.info("Succeeded to load secret: FaceplusSECRET")
-else:
- logger.error("Secret: FaceplusSECRET Can't find! URL: https://www.faceplusplus.com.cn/")
- secret_FaceplusSECRET = input("Please enter: (Enter SECRET or enter 'pass' to pass)")
- if secret_FaceplusSECRET == "pass":
- logger.error("Pass! This func(ai_change_face) has been closed NOW!")
- controlSwitch("ai-face", False)
- data_key["API"]["FaceplusSECRET"] = secret_FaceplusSECRET
-
-if data_key["API"]["SauceNaoKEY"]:
- logger.info("Succeeded to load key: SauceNaoKEY")
-else:
- logger.error("Key: SauceNaoKEY Can't find! URL: https://saucenao.com/")
- key_SauceNaoKEY = input("Please enter: (Enter KEY or enter 'pass' to pass)")
- if key_SauceNaoKEY == "pass":
- logger.error("Pass! This func(anime_img_search) has been closed NOW!")
- controlSwitch("anime-pic-search", False)
- data_key["API"]["SauceNaoKEY"] = key_SauceNaoKEY
-
-with open(file_key, 'w') as f:
- f.write(json.dumps(data_key))
- f.close()
-
-key_LoliconAPI = data_key["API"]["LoliconAPI"]
-key_FaceplusAPI = data_key["API"]["FaceplusAPI"]
-secret_FaceplusSECRET = data_key["API"]["FaceplusSECRET"]
-key_SauceNaoKEY = data_key["API"]["SauceNaoKEY"]
-
-config_SUPERUSERS = data_key["config"]["SUPERUSERS"] \ No newline at end of file
diff --git a/ATRI/data/data_HTML/api/data/times/api_all.json b/ATRI/data/data_HTML/api/data/times/api_all.json
index e69de29..9e26dfe 100644
--- a/ATRI/data/data_HTML/api/data/times/api_all.json
+++ b/ATRI/data/data_HTML/api/data/times/api_all.json
@@ -0,0 +1 @@
+{} \ No newline at end of file
diff --git a/ATRI/data/data_HTML/api/data/times/api_index.json b/ATRI/data/data_HTML/api/data/times/api_index.json
index e69de29..9e26dfe 100644
--- a/ATRI/data/data_HTML/api/data/times/api_index.json
+++ b/ATRI/data/data_HTML/api/data/times/api_index.json
@@ -0,0 +1 @@
+{} \ No newline at end of file
diff --git a/ATRI/data/data_Log/message_private.json b/ATRI/data/data_Log/message_private.json
index b1e2825..9e26dfe 100644
--- a/ATRI/data/data_Log/message_private.json
+++ b/ATRI/data/data_Log/message_private.json
@@ -1 +1 @@
-{"-1805088803": {"message": "1", "user_id": "1172294279"}, "-479217845": {"message": "1", "user_id": "1172294279"}, "2053564145": {"message": "123", "user_id": "1172294279"}, "224515687": {"message": "[CQ:image,file=40827d6aa9554b3478072caac9f8216d.image,url=http://c2cpicdw.qpic.cn/offpic_new/1172294279//1172294279-3531381872-40827D6AA9554B3478072CAAC9F8216D/0?term=2]", "user_id": "1172294279"}} \ No newline at end of file
+{} \ No newline at end of file
diff --git a/ATRI/plugins/plugin_admin/__init__.py b/ATRI/plugins/plugin_admin/__init__.py
index 571d83d..55a14fb 100644
--- a/ATRI/plugins/plugin_admin/__init__.py
+++ b/ATRI/plugins/plugin_admin/__init__.py
@@ -19,149 +19,154 @@ from nonebot.plugin import on_command
from nonebot.adapters.cqhttp import Bot, Event
from nonebot.permission import GROUP_ADMIN, GROUP_OWNER, SUPERUSER
-from utils.utils_banList import banList
+from utils.utils_yml import load_yaml
+from utils.utils_rule import check_banlist
from utils.utils_switch import controlSwitch
-import ATRI
+CONFIG_PATH = Path('.') / 'config.yml'
+master = load_yaml(CONFIG_PATH)['bot']['superusers']
+switch = on_command('switch',
+ rule=check_banlist(),
+ permission=(SUPERUSER | GROUP_OWNER | GROUP_ADMIN))
-master = ATRI.config_SUPERUSERS
-
-switch = on_command('switch', permission=(SUPERUSER|GROUP_OWNER|GROUP_ADMIN))
-
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
- if banList(user, group):
- func = str(event.message).strip()
-
- if func:
- pass
+ func = str(event.message).strip()
+
+ if func:
+ pass
+ else:
+ msg0 = "-==ATRI Switch Control System==-\n"
+ msg0 += "┌Usage: switch on/off-{service}\n"
+ msg0 += "├For SUPERUSER:\n"
+ msg0 += "│ └Usage: switch all-on/off-{service}\n"
+ msg0 += "└Service:\n"
+ msg0 += " ├anime-setu\n"
+ msg0 += " ├anime-pic-search\n"
+ msg0 += " ├anime-vid-search\n"
+ msg0 += " ├ai-face\n"
+ msg0 += " ├pixiv-pic-search\n"
+ msg0 += " ├pixiv-author-search\n"
+ msg0 += " └pixiv-rank"
+
+ await switch.finish(msg0)
+
+ funct = re.findall(r"[on|off]-(.*)", func)
+
+ if "all-on" in func:
+ if int(user) in master:
+ await switch.finish(controlSwitch(funct[0], True))
+
else:
- msg0 = "-==ATRI Switch Control System==-\n"
- msg0 += "┌Usage: switch on/off-{service}\n"
- msg0 += "├For SUPERUSER:\n"
- msg0 += "│ └Usage: switch all-on/off-{service}\n"
- msg0 += "└Service:\n"
- msg0 += " ├anime-setu\n"
- msg0 += " ├anime-pic-search\n"
- msg0 += " ├anime-vid-search\n"
- msg0 += " ├ai-face\n"
- msg0 += " ├pixiv-pic-search\n"
- msg0 += " ├pixiv-author-search\n"
- msg0 += " └pixiv-rank"
-
- await switch.finish(msg0)
-
- funct = re.findall(r"[on|off]-(.*)", func)
-
- if "all-on" in func:
- if int(user) in master:
- await switch.finish(controlSwitch(funct[0], True))
-
- else:
- await switch.finish("You don't have enough permissions do THIS!")
-
- elif "all-off" in func:
- if int(user) in master:
- await switch.finish(controlSwitch(funct[0], False))
-
- else:
- await switch.finish("You don't have enough permissions do THIS!")
-
- elif "on" in func:
- await switch.finish(controlSwitch(funct[0], True, group))
-
- elif "off" in func:
- await switch.finish(controlSwitch(funct[0], False, group))
+ await switch.finish("Permission Denied")
+
+ elif "all-off" in func:
+ if int(user) in master:
+ await switch.finish(controlSwitch(funct[0], False))
else:
- await switch.finish("请检查拼写是否正确嗷~~!")
+ await switch.finish("Permission Denied")
+
+ elif "on" in func:
+ await switch.finish(controlSwitch(funct[0], True, group))
+
+ elif "off" in func:
+ await switch.finish(controlSwitch(funct[0], False, group))
+
+ else:
+ await switch.finish("请检查拼写是否正确嗷~~!")
# 舆情监控系统
-publicOpinion = on_command("舆情", permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER)
-data_PO = Path('.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'public_opinion.json'
+publicOpinion = on_command("舆情",
+ rule=check_banlist(),
+ permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER)
+data_PO = Path(
+ '.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'public_opinion.json'
+
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
msg = str(event.message).strip().split(' ')
- if banList(user, group):
- if msg[0] == '':
- msg0 = "---=====ATRI POM System=====---\n"
- msg0 += "Usage:\n"
- msg0 += " - 舆情 [key] [times] [ban time(bot)] [repo]\n"
- msg0 += "Tips:\n"
- msg0 += " - 非 SUPERU 只能设置本群\n"
- msg0 += " - SUPERU 需在后跟随 -a 以启用全局效果\n"
- msg0 += " - 参数类型:\n"
- msg0 += " * key: 关键词(将使用正则匹配)\n"
- msg0 += " * times: 容忍次数(n>0, int)\n"
- msg0 += " * ban time: bot对其失效时间(min, int)\n"
- msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img"
-
- await publicOpinion.finish(msg0)
-
- if msg[0] and msg[1] and msg[2] and msg[3]:
+ if msg[0] == '':
+ msg0 = "---=====ATRI POM System=====---\n"
+ msg0 += "Usage:\n"
+ msg0 += " - 舆情 [key] [times] [ban time(bot)] [repo]\n"
+ msg0 += "Tips:\n"
+ msg0 += " - 非 SUPERU 只能设置本群\n"
+ msg0 += " - SUPERU 需在后跟随 -a 以启用全局效果\n"
+ msg0 += " - 参数类型:\n"
+ msg0 += " * key: 关键词(将使用正则匹配)\n"
+ msg0 += " * times: 容忍次数(n>0, int)\n"
+ msg0 += " * ban time: bot对其失效时间(min, int)\n"
+ msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img"
+
+ await publicOpinion.finish(msg0)
+
+ if msg[0] and msg[1] and msg[2] and msg[3]:
+ pass
+ else:
+ msg0 = "请检查格式奥~!\n"
+ msg0 += "舆情 [key] [times] [ban time(bot)] [repo]\n"
+ msg0 += " * key: 关键词(将使用正则匹配)\n"
+ msg0 += " * times: 容忍次数(n>0, int)\n"
+ msg0 += " * ban time: bot对其失效时间(min, int)\n"
+ msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img"
+ await publicOpinion.finish(msg0)
+
+ key_word = msg[0]
+ remind = msg[1]
+ punish = msg[2]
+ repo = msg[3]
+
+ if key_word and remind and punish and repo:
+ if re.findall(r"/^\d{1,}$/", remind) and re.findall(
+ r"/^\d{1,}$/", punish):
pass
- else:
- msg0 = "请检查格式奥~!\n"
- msg0 += "舆情 [key] [times] [ban time(bot)] [repo]\n"
- msg0 += " * key: 关键词(将使用正则匹配)\n"
- msg0 += " * times: 容忍次数(n>0, int)\n"
- msg0 += " * ban time: bot对其失效时间(min, int)\n"
- msg0 += " * repo: 触发后的关键词(可选),如为图片,键入 img"
- await publicOpinion.finish(msg0)
-
- key_word = msg[0]
- remind = msg[1]
- punish = msg[2]
- repo = msg[3]
-
- if key_word and remind and punish and repo:
- if re.findall(r"/^\d{1,}$/", remind) and re.findall(r"/^\d{1,}$/", punish):
- pass
- else:
- await publicOpinion.finish("非法字符!请注意(times, ban time)类型为int(阿拉伯数字)")
-
else:
- await publicOpinion.finish("请键入完整信息!\n如需帮助,请键入 舆情")
-
- if repo == "img":
- state["key_word"] = key_word
- state["remind"] = remind
- state["punish"] = punish
-
- else:
- try:
- with open(data_PO, "r") as f:
- data = json.load(f)
- except:
- data = {}
+ await publicOpinion.finish("非法字符!请注意(times, ban time)类型为int(阿拉伯数字)"
+ )
+
+ else:
+ await publicOpinion.finish("请键入完整信息!\n如需帮助,请键入 舆情")
+
+ if repo == "img":
+ state["key_word"] = key_word
+ state["remind"] = remind
+ state["punish"] = punish
+
+ else:
+ try:
+ with open(data_PO, "r") as f:
+ data = json.load(f)
+ except:
+ data = {}
+
+ data[key_word] = [remind, punish, repo]
- data[key_word] = [remind, punish, repo]
+ with open(data_PO, "w") as f:
+ f.write(json.dumps(data))
+ f.close()
- with open(data_PO, "w") as f:
- f.write(json.dumps(data))
- f.close()
-
- msg0 = "舆情信息记录完成~!\n"
- msg0 += f"Keyword: {key_word}\n"
- msg0 += f"Times: {remind}\n"
- msg0 += f"Ban time: {punish}\n"
- msg0 += f"Repo: {repo}"
+ msg0 = "舆情信息记录完成~!\n"
+ msg0 += f"Keyword: {key_word}\n"
+ msg0 += f"Times: {remind}\n"
+ msg0 += f"Ban time: {punish}\n"
+ msg0 += f"Repo: {repo}"
- await publicOpinion.finish(msg0)
+ await publicOpinion.finish(msg0)
[email protected]("repo", prompt="检测到 repo 类型为 img,请发送一张图片") # type: ignore
[email protected]("repo", prompt="检测到 repo 类型为 img,请发送一张图片") # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
key_word = state["key_word"]
remind = state["remind"]
@@ -170,7 +175,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
if "[CQ:image" not in repo:
await publicOpinion.reject("请发送一张图片而不是图片以外的东西~!(")
-
+
try:
with open(data_PO, "r") as f:
data = json.load(f)
@@ -195,13 +200,14 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
trackError = on_command('track', permission=SUPERUSER)
file_error = Path('.') / 'ATRI' / 'data' / 'data_Error' / 'error.json'
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
track_id = str(event.message).strip()
if not track_id:
await trackError.finish("请告诉咱追踪ID嗷~!不然无法获取错误堆栈呢!!")
-
+
data = {}
try:
@@ -209,14 +215,62 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
data = json.load(f)
except:
await trackError.finish(errorRepo("读取文件时错误"))
-
+
if track_id in data:
info_error = data[track_id]
-
+
msg0 = f"trackID: {track_id}\n"
- msg0 =+ info_error
+ msg0 = +info_error
await trackError.finish(msg0)
-
+
else:
await trackError.finish("未发现该ID")
+
+
+groupSendMessage = on_command("群发", permission=SUPERUSER)
+
+
[email protected]() # type: ignore
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ args = str(event.message)
+
+ if args:
+ state['args'] = args
+
+
[email protected]('args', prompt='请告诉咱需要群发的内容~!') # type: ignore
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ msg = state['args']
+ group_list = await bot.get_group_list()
+ sc_list = []
+ err_list = []
+
+ with open(Path('.') / 'utils' / 'utils_rule' / 'ban_list_group.json',
+ 'r') as f:
+ ban_group_list = json.load(f)
+
+ for group in group_list:
+ if group['group_id'] not in ban_group_list:
+
+ try:
+ await bot.send_group_msg(group_id=group['group_id'],
+ message=msg)
+ sc_list.append(group['group_id'])
+ except:
+ await bot.send(event, f"在尝试推送到群[{group['group_id']}]时失败了呢...")
+ err_list.append(group['group_id'])
+ pass
+
+ msg0 = ""
+ for i in err_list:
+ msg0 += f" {i}\n"
+
+ repo_msg = f"推送信息:\n{msg}"
+ repo_msg += f"\n————————\n"
+ repo_msg += f"总共:{len(group_list)}\n"
+ repo_msg += f"成功推送:{len(sc_list)}\n"
+ repo_msg += f"失败[{len(err_list)}]个:\n"
+ repo_msg += msg0
+
+ await groupSendMessage.finish(repo_msg)
diff --git a/ATRI/plugins/plugin_anime/__init__.py b/ATRI/plugins/plugin_anime/__init__.py
index ffe2be2..d01b032 100644
--- a/ATRI/plugins/plugin_anime/__init__.py
+++ b/ATRI/plugins/plugin_anime/__init__.py
@@ -21,24 +21,27 @@ from nonebot.permission import SUPERUSER
from nonebot.adapters.cqhttp import Bot, Event
from nonebot.plugin import on_message, on_command, on_regex
-from utils.utils_banList import banList
+from utils.utils_yml import load_yaml
from utils.utils_error import errorRepo
from utils.utils_history import getMessage
-from utils.utils_switch import checkSwitch
from utils.utils_translate import toSimpleString
+from utils.utils_rule import check_banlist, check_switch
from utils.utils_request import aio_get_bytes, request_get
from utils.utils_img import compress_image, aio_download_pics
from .body import resultRepo
-import ATRI
+CONFIG_PATH = Path('.') / 'config.yml'
+config = load_yaml(CONFIG_PATH)
plugin_name_0 = "anime-pic-search"
-key_SauceNAO = ATRI.key_SauceNaoKEY
+key_SauceNAO = config['api']['SauceNaoKEY']
-SaucenaoSearch = on_command('以图搜图')
+SaucenaoSearch = on_command('以图搜图',
+ rule=check_banlist() & check_switch(plugin_name_0))
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
@@ -46,16 +49,13 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
state["user"] = user
state["group"] = group
- if banList(user, group):
- if checkSwitch(plugin_name_0, group):
- img = str(event.message).strip()
-
- if img:
- state["img_url"] = img
- else:
- await SaucenaoSearch.finish(f"Service-{plugin_name_0} has been closed.")
+ img = str(event.message).strip()
+
+ if img:
+ state["img_url"] = img
[email protected]("img_url", prompt="请发送一张目标图片") # type: ignore
+
[email protected]("img_url", prompt="请发送一张目标图片") # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
img = state["img_url"]
img = re.findall(r"(http://.*?)]", img)
@@ -64,56 +64,54 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
pass
else:
await SaucenaoSearch.reject("请发送一张目标图片,而非文字或其他非图片成分( -'`-; )")
-
+
await bot.send(event, "别急!正在找图!")
- await SaucenaoSearch.finish(resultRepo(state['user'], key_SauceNAO, img[0]))
+ await SaucenaoSearch.finish(resultRepo(state['user'], key_SauceNAO,
+ img[0]))
SaucenaoSearch_repo = on_message()
-@SaucenaoSearch_repo.handle() # type: ignore
+
+@SaucenaoSearch_repo.handle() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
group = str(event.group_id)
msg = str(event.message)
- if banList(user, group):
- if checkSwitch(plugin_name_0, group):
- if "[CQ:reply" in msg:
- if "搜图" in msg or "识图" in msg:
- if group == "None":
- await SaucenaoSearch_repo.finish("ごめんなさい...\n该功能只对群聊开放哦~~")
-
- try:
- repo_info = re.findall(r"CQ:reply,id=([0-9]\S+)]", msg)
- msg_id = repo_info[0]
- except Exception:
- logger.warning(f"Get message_id ERROR!")
- await SaucenaoSearch_repo.finish(errorRepo('定位消息内容失败'))
- return
-
- aim = getMessage(msg_id)[f"{msg_id}"]["message"]
- img = img = re.findall(r"(http://.*?)]", aim)
-
- if len(img):
- pass
- else:
- await SaucenaoSearch_repo.finish('这消息内貌似没图片呢...')
-
- await bot.send(event, "别急!正在找图!")
-
- await SaucenaoSearch.finish(resultRepo(state['user'], key_SauceNAO, img[0]))
-
- else:
- await SaucenaoSearch.finish(f"Service-{plugin_name_0} has been closed.")
+ if "[CQ:reply" in msg:
+ if "搜图" in msg or "识图" in msg:
+ if group == "None":
+ await SaucenaoSearch_repo.finish("ごめんなさい...\n该功能只对群聊开放哦~~")
+
+ try:
+ repo_info = re.findall(r"CQ:reply,id=([0-9]\S+)]", msg)
+ msg_id = repo_info[0]
+ except Exception:
+ logger.error(f"Get message_id ERROR!")
+ await SaucenaoSearch_repo.finish(errorRepo('定位消息内容失败'))
+ return
+
+ aim = getMessage(msg_id)[f"{msg_id}"]["message"]
+ img = img = re.findall(r"(http://.*?)]", aim)
+
+ if len(img):
+ pass
+ else:
+ await SaucenaoSearch_repo.finish('这消息内貌似没图片呢...')
+
+ await bot.send(event, "别急!正在找图!")
+
+ await SaucenaoSearch.finish(
+ resultRepo(state['user'], key_SauceNAO, img[0]))
plugin_name_1 = "anime-vid-search"
+AnimeSearch = on_command('以图搜番',
+ rule=check_banlist() & check_switch(plugin_name_1))
-AnimeSearch = on_command('以图搜番')
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
@@ -121,16 +119,13 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
state["user"] = user
state["group"] = group
- if banList(user, group):
- if checkSwitch(plugin_name_1, group):
- img = str(event.message).strip()
+ img = str(event.message).strip()
+
+ if img:
+ state["img_url"] = img
- if img:
- state["img_url"] = img
- else:
- await AnimeSearch.finish(f"Service-{plugin_name_1} has been closed.")
[email protected]("img_url", prompt="请发送一张目标图片") # type: ignore
[email protected]("img_url", prompt="请发送一张目标图片") # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
img = state["img_url"]
img = re.findall(r"(http://.*?)]", img)
@@ -139,134 +134,142 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
pass
else:
await SaucenaoSearch.reject("请发送一张目标图片,而非文字或其他非图片成分( -'`-; )")
-
+
await bot.send(event, "别急!正在搜索!")
-
+ req = None
+
URL = f'https://trace.moe/api/search?url={img[0]}'
try:
req = await aio_get_bytes(URL)
except:
await AnimeSearch.finish(errorRepo("请求数据失败"))
-
- data = json.loads(req.decode()) # type: ignore
+
+ data = json.loads(req.decode())
try:
d = {}
for i in range(len(data['docs'])):
if data['docs'][i]['title_chinese'] in d:
- d[data['docs'][i]['title_chinese']][0] += data['docs'][i]['similarity']
-
+ d[data['docs'][i]
+ ['title_chinese']][0] += data['docs'][i]['similarity']
+
else:
- m = data['docs'][i]['at']/60
- s = data['docs'][i]['at']%60
+ m = data['docs'][i]['at'] / 60
+ s = data['docs'][i]['at'] % 60
if data['docs'][i]['episode'] == '':
n = 1
-
+
else:
n = data['docs'][i]['episode']
-
- d[toSimpleString(data['docs'][i]['title_chinese'])] = [data['docs'][i]['similarity'],f'第{n}集',f'{int(m)}分{int(s)}秒处']
+
+ d[toSimpleString(data['docs'][i]['title_chinese'])] = [
+ data['docs'][i]['similarity'], f'第{n}集',
+ f'{int(m)}分{int(s)}秒处'
+ ]
except:
await AnimeSearch.finish(errorRepo("处理数据失败"))
-
+
result = sorted(
- d.items(), # type: ignore
- key=lambda x:x[1],
+ d.items(), # type: ignore
+ key=lambda x: x[1],
reverse=True)
-
+
t = 0
- msg0 = f'[CQ:at,qq={state["user"]}]\n根据所提供的图片按照相似度找到{len(d)}个结果:' # type: ignore
+ msg0 = f'[CQ:at,qq={state["user"]}]\n根据所提供的图片按照相似度找到{len(d)}个结果:' # type: ignore
for i in result:
- t +=1
+ t += 1
lk = ('%.2f%%' % (i[1][0] * 100))
- msg = (f'\n——————————\n({t})\n相似度:{lk}\n动漫名:《{i[0]}》\n时间点:{i[1][1]} {i[1][2]}')
+ msg = (
+ f'\n——————————\n({t})\n相似度:{lk}\n动漫名:《{i[0]}》\n时间点:{i[1][1]} {i[1][2]}'
+ )
msg0 += msg
-
+
await AnimeSearch.finish(msg0)
plugin_name_2 = "anime-setu"
-key_LoliconAPI = ATRI.key_LoliconAPI
+key_LoliconAPI = config['api']['LoliconAPI']
setu_type = 1 # setu-type: 1(local), 2(url: https://api.lolicon.app/#/setu) default: 1(local)
-setu = on_regex(r"来[点丶张份副个幅][涩色瑟][图圖]|[涩色瑟][图圖]来|[涩色瑟][图圖][gkd|GKD|搞快点]|[gkd|GKD|搞快点][涩色瑟][图圖]")
+setu = on_regex(
+ r"来[点丶张份副个幅][涩色瑟][图圖]|[涩色瑟][图圖]来|[涩色瑟][图圖][gkd|GKD|搞快点]|[gkd|GKD|搞快点][涩色瑟][图圖]",
+ rule=check_banlist() & check_switch(plugin_name_1))
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _setu(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
group = str(event.group_id)
- if banList(user, group):
- if checkSwitch(plugin_name_2, group):
-
- res = randint(1,5)
-
- if setu_type == 1:
-
- con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / 'nearR18.db')
- cur = con.cursor()
- msg = cur.execute('SELECT * FROM nearR18 ORDER BY RANDOM() limit 1;')
-
- for i in msg:
- pid = i[0]
- title = i[1]
- img = i[7]
-
- msg0 = f"setu info:\n"
- msg0 += f"Title: {title}\n"
- msg0 += f"Pid: {pid}\n"
- msg0 += f"[CQ:image,file=file:///{compress_image(await aio_download_pics(img))}]"
-
- if 1 <= res < 5:
- await setu.finish(msg0)
-
- elif res == 5:
- await bot.send(event, "我找到涩图了!但我发给主人了\nο(=•ω<=)ρ⌒☆")
-
- await bot.send_private_msg(
- user_id=ATRI.config_SUPERUSERS,
- message=f"主人,从群{group}来的涩图!热乎着!\nTitle: {title}\nPid: {pid}\n[CQ:image,file=file:///{compress_image(await aio_download_pics(img))}]"
- )
-
- else:
- params = {
- "apikey": key_LoliconAPI,
- "r18": "0",
- "num": "1"
- }
-
- data = {}
-
- try:
- data = json.loads(request_get('https://api.lolicon.app/setu/', params))
- except Exception:
- await setu.finish(errorRepo("请求数据失败,也可能为接口调用次数达上限"))
-
- msg0 = f"setu info:\n"
- msg0 += f'Title: {data["data"][0]["title"]}\n'
- msg0 += f'Pid: {data["data"][0]["pid"]}\n'
- msg0 += f'[CQ:image,file=file:///{compress_image(await aio_download_pics(data["data"][0]["url"]))}]'
-
- if 1 <= res < 5:
- await setu.finish(msg0)
-
- elif res == 5:
- await bot.send(event, "我找到涩图了!但我发给主人了\nο(=•ω<=)ρ⌒☆")
+ res = randint(1, 5)
+ if setu_type == 1:
+
+ con = sqlite3.connect(
+ Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' /
+ 'nearR18.db')
+ cur = con.cursor()
+ msg = cur.execute('SELECT * FROM nearR18 ORDER BY RANDOM() limit 1;')
+
+ for i in msg:
+ pid = i[0]
+ title = i[1]
+ img = i[7]
+
+ msg0 = f"setu info:\n"
+ msg0 += f"Title: {title}\n"
+ msg0 += f"Pid: {pid}\n"
+ msg0 += f"[CQ:image,file=file:///{compress_image(await aio_download_pics(img))}]"
+
+ if 1 <= res < 5:
+ await setu.finish(msg0)
+
+ elif res == 5:
+ await bot.send(event, "我找到涩图了!但我发给主人了\nο(=•ω<=)ρ⌒☆")
+
+ for sup in config['bot']['superusers']:
await bot.send_private_msg(
- user_id=ATRI.config_SUPERUSERS,
- message=f'主人,从群{group}来的涩图!热乎着!\nTitle: {data["data"][0]["title"]}\nPid: {data["data"][0]["pid"]}\n[CQ:image,file=file:///{compress_image(await aio_download_pics(data["data"][0]["url"]))}]'
+ user_id=sup,
+ message=
+ f"主人,从群{group}来的涩图!热乎着!\nTitle: {title}\nPid: {pid}\n[CQ:image,file=file:///{compress_image(await aio_download_pics(img))}]"
)
-
- else:
- await setu.finish(f"Service-{plugin_name_2} has been closed.")
+
+ else:
+ params = {"apikey": key_LoliconAPI, "r18": "0", "num": "1"}
+
+ data = {}
+
+ try:
+ data = json.loads(
+ request_get('https://api.lolicon.app/setu/', params))
+ except Exception:
+ await setu.finish(errorRepo("请求数据失败,也可能为接口调用次数达上限"))
+
+ msg0 = f"setu info:\n"
+ msg0 += f'Title: {data["data"][0]["title"]}\n'
+ msg0 += f'Pid: {data["data"][0]["pid"]}\n'
+ msg0 += f'[CQ:image,file=file:///{compress_image(await aio_download_pics(data["data"][0]["url"]))}]'
+
+ if 1 <= res < 5:
+ await setu.finish(msg0)
+
+ elif res == 5:
+ await bot.send(event, "我找到涩图了!但我发给主人了\nο(=•ω<=)ρ⌒☆")
+
+ for sup in config['bot']['superusers']:
+ await bot.send_private_msg(
+ user_id=sup,
+ message=
+ f'主人,从群{group}来的涩图!热乎着!\nTitle: {data["data"][0]["title"]}\nPid: {data["data"][0]["pid"]}\n[CQ:image,file=file:///{compress_image(await aio_download_pics(data["data"][0]["url"]))}]'
+ )
+
setuType = on_command("setu-type", permission=SUPERUSER)
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
global setu_type
msg = str(event.message).strip()
@@ -282,16 +285,16 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
msg0 += " └url"
await setuType.finish(msg0)
-
+
if msg == "local":
setu_type = 1
-
+
elif msg == "url":
setu_type = 2
-
+
else:
await setuType.finish("请检查类型是否输入正确嗷!")
-
+
await setuType.finish("Type conversion completed!")
@@ -321,14 +324,14 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
# pid = i[0]
# title = i[1]
# img = i[7]
-
+
# msg0 = f"setu info:\n"
# msg0 += f"Title: {title}\n"
# msg0 += f"Pid: {pid}\n"
# msg0 += f"[CQ:image,file=file:///{compress_image(await aio_download_pics(img))}]"
-
+
# await setu.finish(msg0)
-
+
# else:
# params = {
# "apikey": key_LoliconAPI,
@@ -342,7 +345,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
# data = json.loads(request_get('https://api.lolicon.app/setu/', params))
# except Exception:
# await setu.finish(errorRepo("请求数据失败,也可能为接口调用次数达上限"))
-
+
# msg0 = f"setu info:\n"
# msg0 += f'Title: {data["data"][0]["title"]}\n'
# msg0 += f'Pid: {data["data"][0]["pid"]}\n'
diff --git a/ATRI/plugins/plugin_anime/body.py b/ATRI/plugins/plugin_anime/body.py
index 00ee4ec..a68dc48 100644
--- a/ATRI/plugins/plugin_anime/body.py
+++ b/ATRI/plugins/plugin_anime/body.py
@@ -15,9 +15,16 @@ import json
from utils.utils_error import errorRepo
from utils.utils_request import request_get
-class SauceNAO:
- def __init__(self, api_key, output_type=2, testmode=0, dbmask=None, dbmaski=32768, db=5, numres=1):
+class SauceNAO:
+ def __init__(self,
+ api_key,
+ output_type=2,
+ testmode=0,
+ dbmask=None,
+ dbmaski=32768,
+ db=5,
+ numres=1):
api = 'https://saucenao.com/search.php'
self.api = api
params = dict()
@@ -34,6 +41,7 @@ class SauceNAO:
self.params['url'] = url
return request_get(self.api, self.params)
+
def resultRepo(user: str, key: str, img_url: str):
try:
task = SauceNAO(key)
@@ -57,6 +65,6 @@ def resultRepo(user: str, key: str, img_url: str):
msg0 += f"Pic URL: https://pixiv.cat/{data['data'].get('pixiv_id', None)}.jpg"
if float(data['header'].get('similarity', 0)) < 65:
- msg0 += '注:相似率小于65%不一定正确'
-
+ msg0 += '\n注:相似率小于65%不一定正确'
+
return msg0 \ No newline at end of file
diff --git a/ATRI/plugins/plugin_chat/__init__.py b/ATRI/plugins/plugin_chat/__init__.py
index d30bd2e..0a718b0 100644
--- a/ATRI/plugins/plugin_chat/__init__.py
+++ b/ATRI/plugins/plugin_chat/__init__.py
@@ -20,16 +20,20 @@ from nonebot.adapters.cqhttp import Bot, Event
from nonebot.plugin import on_command, on_message, on_notice
from utils.utils_times import countX
+from utils.utils_yml import load_yaml
from utils.utils_error import errorRepo
-from utils.utils_banList import banList
+from utils.utils_rule import check_banlist
from utils.utils_history import saveMessage
from utils.utils_request import request_api_text
+CONFIG_PATH = Path('.') / 'config.yml'
+config = load_yaml(CONFIG_PATH)['bot']
# 收集 bot 所在群的聊天记录
MessageSave = on_message()
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
@@ -40,148 +44,146 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
saveMessage(message_id, message, user)
else:
saveMessage(message_id, message, user, group)
-
- logger.opt(colors=True).info(f"[<yellow>{group}</yellow>]-U: (<blue>{user}</blue>) | Message: (<green>{message}</green>) Saved successfully")
+
+ logger.opt(colors=True).info(
+ f"[<yellow>{group}</yellow>]-U: (<blue>{user}</blue>) | Message: (<green>{message}</green>) Saved successfully"
+ )
# Call bot
-callMe = on_message()
+callMe = on_message(rule=check_banlist())
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
- group = str(event.group_id)
+ msg = str(event.message)
+
+ if msg in config['nickname']:
+ await callMe.finish("叫咱有啥事吗w")
- if banList(user, group):
- msg = str(event.message)
+ elif "萝卜子" in msg:
+ await bot.send(event, "萝卜子是对咱的蔑称!!")
- if "ATRI" == msg or "亚托莉" == msg or "アトリ" == msg:
- await callMe.finish("叫我有啥事吗w")
-
- elif "萝卜子" in msg:
- await bot.send(event, "萝卜子是对咱的蔑称!!")
-
- else:
- pass
+ else:
+ pass
# 戳 一 戳
-pokehah = on_command("戳一戳", rule=to_me())
+pokehah = on_command("戳一戳", rule=to_me() & check_banlist())
[email protected]() # type: ignore
-async def _poke(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
- group = str(event.group_id)
- if banList(user, group):
- msg = choice(
- [
- "你再戳!",
- "?再戳试试?",
- "别戳了别戳了再戳就坏了555",
- "我爪巴爪巴,球球别再戳了",
- "你戳你🐎呢?!",
- "那...那里...那里不能戳...绝对...",
- "(。´・ω・)ん?",
- "有事恁叫我,别天天一个劲戳戳戳!",
- "欸很烦欸!你戳🔨呢",
- "?"
- ])
[email protected]() # type: ignore
+async def _poke(bot: Bot, event: Event, state: dict) -> None:
+ msg = choice([
+ "你再戳!", "?再戳试试?", "别戳了别戳了再戳就坏了555", "我爪巴爪巴,球球别再戳了", "你戳你🐎呢?!",
+ "那...那里...那里不能戳...绝对...", "(。´・ω・)ん?", "有事恁叫我,别天天一个劲戳戳戳!", "欸很烦欸!你戳🔨呢",
+ "?"
+ ])
- await pokehah.finish(msg)
+ await pokehah.finish(msg)
async def poke_(bot: Bot, event: Event, state: dict) -> bool:
- return (event.detail_type == "notify" and event.raw_event["sub_type"] == "poke" and
- event.sub_type == "notice" and int(event.self_id) == event.raw_event["target_id"])
+ return (event.detail_type == "notify"
+ and event.raw_event["sub_type"] == "poke" # type: ignore
+ and event.sub_type == "notice" and int(
+ event.self_id) == event.raw_event["target_id"] # type: ignore
+ )
+
poke = on_notice(poke_, block=True)
poke.handle()(_poke)
-
# 处理 进 / 退 群事件
-groupEvent = on_notice()
+groupEvent = on_notice(rule=check_banlist())
[email protected]() # type: ignore
-async def _(bot: Bot, event: Event, state: dict) -> None:
- group = str(event.group_id)
- if banList(group):
- if event.raw_event["notice_type"] == "group_increase":
- await groupEvent.finish(f'好欸!事新人[CQ:at,qq={event.raw_event["user_id"]}]')
- await groupEvent.finish(f"在下 ATRI,你可以叫我 亚托莉 或 アトリ !~w")
[email protected]() # type: ignore
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ if event.raw_event["notice_type"] == "group_increase": # type: ignore
+ await groupEvent.finish(
+ f'好欸!事新人[CQ:at,qq={event.raw_event["user_id"]}]' # type: ignore
+ ) # type: ignore
+ await groupEvent.finish(f"在下 ATRI,你可以叫我 亚托莉 或 アトリ !~w")
- elif event.raw_event["notice_type"] == "group_decrease":
- await groupEvent.finish(f'[{event.raw_event["operator_id"]}] 离开了我们...')
+ elif event.raw_event[ # type: ignore
+ "notice_type"] == "group_decrease":
+ await groupEvent.finish(
+ f'[{event.raw_event["operator_id"]}] 离开了我们...' # type: ignore
+ )
# 舆情监听系统
listenPublicOpinion = on_message()
-file_PO = Path('.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'public_opinion.json'
+file_PO = Path(
+ '.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'public_opinion.json'
+
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
with open(file_PO, 'r') as f:
data = json.load(f)
# 口臭一下
-fxxkMe = on_command('口臭一下', aliases={'口臭', '骂我'}, rule=to_me())
+fxxkMe = on_command('口臭一下',
+ aliases={'口臭', '骂我'},
+ rule=to_me() & check_banlist())
list_M = []
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
- group = str(event.group_id)
global list_M
- if banList(user, group):
- if countX(list_M, user) >= 3:
- await fxxkMe.finish("不是??你这么想被咱骂的嘛??被咱骂就这么舒服的吗?!该......你该不会是.....M吧!")
-
- elif countX(list_M, user) >= 6:
- await fxxkMe.finish("给我适可而止阿!?")
- list_M = list(set(list_M))
+ if countX(list_M, user) >= 3:
+ await fxxkMe.finish("不是??你这么想被咱骂的嘛??被咱骂就这么舒服的吗?!该......你该不会是.....M吧!")
+
+ elif countX(list_M, user) >= 6:
+ await fxxkMe.finish("给我适可而止阿!?")
+ list_M = list(set(list_M))
+
+ else:
+ list_M.append(user)
+ URL = "https://nmsl.shadiao.app/api.php?level=min&lang=zh_cn"
+ msg = ""
- else:
- list_M.append(user)
- URL = "https://nmsl.shadiao.app/api.php?level=min&lang=zh_cn"
- msg = ""
+ try:
+ msg = request_api_text(URL)
+ except:
+ await fxxkMe.finish(errorRepo("请求错误"))
- try:
- msg = request_api_text(URL)
- except:
- await fxxkMe.finish(errorRepo("请求错误"))
-
- await fxxkMe.finish(msg)
+ await fxxkMe.finish(msg)
# Hitokoto
-hitokoto = on_command('一言', aliases={'抑郁一下', '网抑云'}, rule=to_me())
+hitokoto = on_command('一言',
+ aliases={'抑郁一下', '网抑云'},
+ rule=to_me() & check_banlist())
list_Y = []
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
- group = str(event.group_id)
global list_Y
- if banList(user, group):
- if countX(list_Y, user) >= 3:
- await hitokoto.finish("额......需要咱安慰一下嘛~?")
-
- elif countX(list_Y, user) >= 6:
- await hitokoto.finish("如果心里感到难受就赶快去睡觉奥!别再憋自己了!")
- list_Y = list(set(list_Y))
-
- else:
- list_Y.append(user)
- URL = "https://api.imjad.cn/hitokoto/?cat=a&charset=utf-8&length=50&encode=json&fun=sync&source="
- info = {}
-
- try:
- info = json.loads(request_api_text(URL))
- except:
- await hitokoto.finish(errorRepo("请求错误"))
-
- await hitokoto.finish(info["hitokoto"])
+ if countX(list_Y, user) >= 3:
+ await hitokoto.finish("额......需要咱安慰一下嘛~?")
+
+ elif countX(list_Y, user) >= 6:
+ await hitokoto.finish("如果心里感到难受就赶快去睡觉奥!别再憋自己了!")
+ list_Y = list(set(list_Y))
+
+ else:
+ list_Y.append(user)
+ URL = "https://api.imjad.cn/hitokoto/?cat=a&charset=utf-8&length=50&encode=json&fun=sync&source="
+ info = {}
+
+ try:
+ info = json.loads(request_api_text(URL))
+ except:
+ await hitokoto.finish(errorRepo("请求错误"))
+
+ await hitokoto.finish(info["hitokoto"])
diff --git a/ATRI/plugins/plugin_pixiv/__init__.py b/ATRI/plugins/plugin_pixiv/__init__.py
index 545e0f7..e8a66df 100644
--- a/ATRI/plugins/plugin_pixiv/__init__.py
+++ b/ATRI/plugins/plugin_pixiv/__init__.py
@@ -17,15 +17,15 @@ from nonebot.plugin import on_command
from nonebot.adapters.cqhttp import Bot, Event
from utils.utils_error import errorRepo
-from utils.utils_banList import banList
-from utils.utils_switch import checkSwitch
from utils.utils_request import request_get
-
+from utils.utils_rule import check_banlist, check_switch
plugin_name_0 = "pixiv-pic-search"
-pixivSearchIMG = on_command('p站搜图')
+pixivSearchIMG = on_command('p站搜图',
+ rule=check_banlist() & check_switch(plugin_name_0))
+
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
@@ -33,16 +33,13 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
state["user"] = user
state["group"] = group
- if banList(user, group):
- if checkSwitch(plugin_name_0, group):
- pid = str(event.message).strip()
+ pid = str(event.message).strip()
- if pid:
- state["pid"] = pid
- else:
- await pixivSearchIMG.finish(f"Service-{plugin_name_0} has been closed.")
+ if pid:
+ state["pid"] = pid
[email protected]("pid", prompt="请发送目标PID码") # type: ignore
+
[email protected]("pid", prompt="请发送目标PID码") # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
pid = state["pid"]
pid = re.findall(r"\d+", pid)
@@ -61,7 +58,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
data = json.loads(request_get(URL))
except:
await pixivSearchIMG.finish(errorRepo("请求数据失败"))
-
+
msg0 = f'[CQ:at,qq={state["user"]}]\n'
msg0 += f"Search result:\n"
msg0 += f"Pid: {pid}\n"
@@ -77,9 +74,12 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
plugin_name_1 = "pixiv-author-search"
-pixivSearchAuthor = on_command("p站画师")
+pixivSearchAuthor = on_command("p站画师",
+ rule=check_banlist()
+ & check_switch(plugin_name_1))
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
@@ -87,16 +87,13 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
state["user"] = user
state["group"] = group
- if banList(user, group):
- if checkSwitch(plugin_name_1, group):
- author_id = str(event.message).strip()
+ author_id = str(event.message).strip()
+
+ if author_id:
+ state["author_id"] = author_id
- if author_id:
- state["author_id"] = author_id
- else:
- await pixivSearchAuthor.finish(f"Service-{plugin_name_1} has been closed.")
[email protected]("author_id", prompt="请发送目标画师id") # type: ignore
[email protected]("author_id", prompt="请发送目标画师id") # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
author_id = state["author_id"]
author_id = re.findall(r"\d+", author_id)
@@ -105,7 +102,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
pass
else:
await pixivSearchAuthor.reject("请发送纯阿拉伯数字的画师id")
-
+
await bot.send(event, f"别急!在搜索了!\n将展示画师[{author_id}]的前三项作品")
URL = f"https://api.imjad.cn/pixiv/v1/?type=member_illust&id={author_id}"
@@ -116,18 +113,14 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
except:
await pixivSearchAuthor.finish(errorRepo("请求网络失败"))
- for i in range(0,3):
+ for i in range(0, 3):
pid = data["response"][i]["id"]
IMG = f"https://pixiv.cat/{author_id}.jpg"
data[i] = [f"{pid}", f"{IMG}"]
-
+
msg0 = f'[CQ:at,qq={state["user"]}]\n'
-
- result = sorted(
- data.items(),
- key=lambda x:x[1],
- reverse=True
- )
+
+ result = sorted(data.items(), key=lambda x: x[1], reverse=True)
t = 0
@@ -137,55 +130,46 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
msg += f"({t})\n"
msg += f"Pid: {i[1][0]}\n{i[1][1]}"
msg0 += msg
-
+
await pixivSearchAuthor.finish(msg0)
-plugin_name_2 = "pixiv_rank"
-pixivRank = on_command("p站排行榜")
+plugin_name_2 = "pixiv-rank"
+pixivRank = on_command("p站排行榜",
+ rule=check_banlist() & check_switch(plugin_name_2))
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
- group = str(event.group_id)
- if banList(user, group):
- if checkSwitch(plugin_name_2, group):
-
- await bot.send(event, "正在获取P站每日排行榜前五作品")
-
- URL = "https://api.imjad.cn/pixiv/v1/?type=rank"
- data = {}
-
- try:
- data = json.loads(request_get(URL))
- except:
- await pixivRank.finish(errorRepo("网络请求失败"))
-
- for i in range(0,5):
- pid = data["response"][0]["works"][i]["work"]["id"]
- IMG = f"https://pixiv.cat/{pid}.jpg"
- data[i] = [f"{pid}", f"{IMG}"]
-
- msg0 = f"[CQ:at,qq={user}]"
-
- result = sorted(
- data.items(),
- key=lambda x:x[1],
- reverse=True
- )
-
- t = 0
-
- for i in result:
- t += 1
- msg = "\n---------------\n"
- msg += f"({t})\n"
- msg += f"Pid: {i[1][0]}"
- msg += f"{i[1][1]}"
- msg0 += msg
-
- await pixivRank.finish(msg0)
-
- else:
- await pixivRank.finish(f"Service-{plugin_name_2} has been closed.") \ No newline at end of file
+ await bot.send(event, "正在获取P站每日排行榜前五作品")
+
+ URL = "https://api.imjad.cn/pixiv/v1/?type=rank"
+ data = {}
+
+ try:
+ data = json.loads(request_get(URL))
+ except:
+ await pixivRank.finish(errorRepo("网络请求失败"))
+
+ for i in range(0, 5):
+ pid = data["response"][0]["works"][i]["work"]["id"]
+ IMG = f"https://pixiv.cat/{pid}.jpg"
+ data[i] = [f"{pid}", f"{IMG}"]
+
+ msg0 = f"[CQ:at,qq={user}]"
+
+ result = sorted(data.items(), key=lambda x: x[1], reverse=True)
+
+ t = 0
+
+ for i in result:
+ t += 1
+ msg = "\n---------------\n"
+ msg += f"({t})\n"
+ msg += f"Pid: {i[1][0]}"
+ msg += f"{i[1][1]}"
+ msg0 += msg
+
+ await pixivRank.finish(msg0)
diff --git a/ATRI/plugins/plugin_rich/__init__.py b/ATRI/plugins/plugin_rich/__init__.py
index 99026a8..51b86e8 100644
--- a/ATRI/plugins/plugin_rich/__init__.py
+++ b/ATRI/plugins/plugin_rich/__init__.py
@@ -18,11 +18,10 @@ from nonebot.log import logger
from nonebot.plugin import on_message
from nonebot.adapters.cqhttp import Bot, Event
-from utils.utils_banList import banList
+from utils.utils_times import countX
+from utils.utils_rule import check_banlist
from utils.utils_request import request_get
-
-from .body import dec, enc
-
+from .body import dec
BILI_REPORT_FORMAT = """[{aid}] Info:
Title: {title}
@@ -33,64 +32,64 @@ Link:
{aid_link}
{bid_link}"""
-bilibiliRich = on_message()
+bilibiliRich = on_message(rule=check_banlist())
+b_list = []
+
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
- group = str(event.group_id)
+ global b_list
+ user = event.user_id
+ msg = str(event.message)
+
+ # 防刷屏机制:回复次数达到五次自动忽略下一次
+ if countX(b_list, user) == 5:
+ return
+
+ if "qqdocurl" not in msg:
+ try:
+ bv = re.findall(r"(BV\w+)", msg)
+ except:
+ return
+ else:
+ bvURL = re.findall(r"(........b23...\S+\=)", msg)
- if banList(user, group):
- msg = str(event.message)
+ try:
+ r = requests.get(bvURL[0], stream=True, allow_redirects=True)
+ except:
+ logger.waring("Get BV ERROR. (Request ERROR)")
+ return
- if "qqdocurl" not in msg:
- try:
- bv = re.findall(r"(BV\w+)", msg)
- except:
- return
- else:
- bvURL = re.findall(r"(........b23...\S+\=)", msg)
+ bv = re.findall(r"(BV\w+)", r.url)
- try:
- r = requests.get(bvURL[0], stream=True, allow_redirects = True)
- except:
- logger.waring("Get BV ERROR. (Request ERROR)")
- return
+ if bv:
+ aid = str(dec(bv[0]))
+ ad = 'av' + aid
+ URL = f'https://api.imjad.cn/bilibili/v2/?aid={aid}'
- bv = re.findall(r"(BV\w+)", r.url)
-
- if bv:
- aid = str(dec(bv[0]))
- ad = 'av' + aid
- URL = f'https://api.imjad.cn/bilibili/v2/?aid={aid}'
-
- try:
- res = request_get(URL)
- except:
- logger.waring("Request ERROR")
- return
-
- data = json.loads(res)
- msg = BILI_REPORT_FORMAT.format(
- title = data["data"]["title"],
-
- view = data["data"]["stat"]["view"],
- coin = data["data"]["stat"]["coin"],
- share = data["data"]["stat"]["share"],
- like = data["data"]["stat"]["like"],
-
- bid = data["data"]["bvid"],
- bid_link = data["data"]["short_link"],
-
- aid = ad,
- aid_link = f'https://b23.tv/{ad}'
- )
-
- await bilibiliRich.finish(msg)
-
- else:
+ try:
+ res = request_get(URL)
+ except:
+ logger.waring("Request ERROR")
return
+ data = json.loads(res)
+ msg = BILI_REPORT_FORMAT.format(title=data["data"]["title"],
+ view=data["data"]["stat"]["view"],
+ coin=data["data"]["stat"]["coin"],
+ share=data["data"]["stat"]["share"],
+ like=data["data"]["stat"]["like"],
+ bid=data["data"]["bvid"],
+ bid_link=data["data"]["short_link"],
+ aid=ad,
+ aid_link=f'https://b23.tv/{ad}')
+
+ b_list.append(user)
+ await bilibiliRich.finish(msg)
+
+ else:
+ return
+
CLOUDMUSIC_REPORT_FORMAT = """Status: {status}
Song id: {id}
@@ -98,40 +97,45 @@ Br: {br}
Download: {url}
MD5: {md5}"""
-cloudmusicRich = on_message()
+cloudmusicRich = on_message(rule=check_banlist())
+c_list = []
+
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
- group = str(event.group_id)
-
- if banList(user, group):
- msg = str(event.message)
-
- if "music.163.com" in msg:
- music_id = re.findall(r"song\S+\/|id=\S+\&", msg)
-
- if music_id:
- music_id = str(music_id[0])
- music_id = re.findall(r"-?[1-9]\d*", music_id)
- URL = f'https://api.imjad.cn/cloudmusic/?type=song&id={music_id[0]}&br=320000'
-
- try:
- res = request_get(URL)
- except:
- logger.waring("Request ERROR")
- return
-
- data = json.loads(res)
- msg = CLOUDMUSIC_REPORT_FORMAT.format(
- status = data["code"],
- id = data["data"][0]["id"],
- br = data["data"][0]["br"],
- url = data["data"][0]["url"],
- md5 = data["data"][0]["md5"],
- )
-
- await cloudmusicRich.finish(msg)
-
- else:
- return
+ global c_list
+ user = event.user_id
+ msg = str(event.message)
+
+ # 防刷屏机制:回复次数达到五次自动忽略下一次
+ if countX(b_list, user) == 5:
+ return
+
+ if "music.163.com" in msg:
+ music_id = re.findall(r"song\S+\/|id=\S+\&", msg)
+
+ if music_id:
+ music_id = str(music_id[0])
+ music_id = re.findall(r"-?[1-9]\d*", music_id)
+ URL = f'https://api.imjad.cn/cloudmusic/?type=song&id={music_id[0]}&br=320000'
+
+ try:
+ res = request_get(URL)
+ except:
+ logger.waring("Request ERROR")
+ return
+
+ data = json.loads(res)
+ msg = CLOUDMUSIC_REPORT_FORMAT.format(
+ status=data["code"],
+ id=data["data"][0]["id"],
+ br=data["data"][0]["br"],
+ url=data["data"][0]["url"],
+ md5=data["data"][0]["md5"],
+ )
+
+ c_list.append(user)
+ await cloudmusicRich.finish(msg)
+
+ else:
+ return
diff --git a/ATRI/plugins/plugin_sqlite/__init__.py b/ATRI/plugins/plugin_sqlite/__init__.py
index 1663026..9108b34 100644
--- a/ATRI/plugins/plugin_sqlite/__init__.py
+++ b/ATRI/plugins/plugin_sqlite/__init__.py
@@ -21,10 +21,10 @@ from nonebot.adapters.cqhttp import Bot, Event
from utils.utils_error import errorRepo
from utils.utils_request import aio_get_bytes
-
SetuData = on_command('setu', permission=SUPERUSER)
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
msg0 = "-==ATRI Setu Data System==-\n"
msg0 += "Upload:\n"
@@ -37,13 +37,12 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
UploadSetu = on_command('setu-upload', permission=SUPERUSER)
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
msg = str(event.message).strip().split(' ')
- if msg[0] and msg[1]:
- pass
- else:
+ if not msg[0] and msg[1]:
msg0 = "请检查格式奥~!\n"
msg0 += "setu-upload [type] [pid]\n"
msg0 += "type: normal, nearR18, r18"
@@ -64,7 +63,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
info = json.loads(await aio_get_bytes(URL))
except:
await UploadSetu.finish(errorRepo("网络请求出错"))
-
+
info = info["response"][0]
title = info["title"]
tags = info["tags"]
@@ -74,7 +73,8 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
user_link = f'https://www.pixiv.net/users/' + f'{u_id}'
img = f'https://pixiv.cat/{pid}.jpg'
- data_setu = (f'{pid}', f'{title}', f'{tags}', f'{account}', f'{name}', f'{u_id}', f'{user_link}', f'{img}')
+ data_setu = (f'{pid}', f'{title}', f'{tags}', f'{account}', f'{name}',
+ f'{u_id}', f'{user_link}', f'{img}')
if s_type == "nearr18":
s_type = "nearR18"
@@ -87,16 +87,23 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
print('数据文件存在!')
else:
await DeleteSetu.finish("数据库都不在添加🔨!?罢了我现创一个")
- con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / f'{s_type}.db')
+ con = sqlite3.connect(
+ Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' /
+ f'{s_type}.db')
cur = con.cursor()
- cur.execute(f'CREATE TABLE {s_type}(pid PID, title TITLE, tags TAGS, account ACCOUNT, name NAME, u_id UID, user_link USERLINK, img IMG, UNIQUE(pid, title, tags, account, name, u_id, user_link, img))')
+ cur.execute(
+ f'CREATE TABLE {s_type}(pid PID, title TITLE, tags TAGS, account ACCOUNT, name NAME, u_id UID, user_link USERLINK, img IMG, UNIQUE(pid, title, tags, account, name, u_id, user_link, img))'
+ )
con.commit()
cur.close()
await bot.send(event, '完成')
- con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / f'{s_type}.db')
+ con = sqlite3.connect(
+ Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / f'{s_type}.db')
cur = con.cursor()
- cur.execute(f'INSERT INTO {s_type}(pid, title, tags, account, name, u_id, user_link, img) VALUES(?, ?, ?, ?, ?, ?, ?, ?)', data_setu)
+ cur.execute(
+ f'INSERT INTO {s_type}(pid, title, tags, account, name, u_id, user_link, img) VALUES(?, ?, ?, ?, ?, ?, ?, ?)',
+ data_setu)
con.commit()
cur.close()
@@ -105,13 +112,12 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
DeleteSetu = on_command('setu-delete', permission=SUPERUSER)
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
msg = str(event.message).strip().split(' ')
-
- if msg[0] and msg[1]:
- pass
- else:
+
+ if not msg[0] and msg[1]:
msg0 = "请检查格式奥~!\n"
msg0 += "setu-delete [type] [pid]\n"
msg0 += "type: normal, nearR18, r18"
@@ -136,8 +142,9 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
print('数据文件存在!')
else:
await DeleteSetu.finish("数据库都不在删🔨!?")
-
- con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / f'{s_type}.db')
+
+ con = sqlite3.connect(
+ Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / f'{s_type}.db')
cur = con.cursor()
cur.execute(f'DELETE FROM {s_type} WHERE pid = {pid}')
con.commit()
diff --git a/ATRI/plugins/plugin_status/__init__.py b/ATRI/plugins/plugin_status/__init__.py
index 0d89d9d..bc4fd13 100644
--- a/ATRI/plugins/plugin_status/__init__.py
+++ b/ATRI/plugins/plugin_status/__init__.py
@@ -17,82 +17,84 @@ from pathlib import Path
from nonebot.plugin import on_command
from nonebot.adapters.cqhttp import Bot, Event
-from utils.utils_banList import banList
from utils.utils_error import errorRepo
+from utils.utils_rule import check_banlist
+status_info = on_command('status', rule=check_banlist())
-status_info = on_command('status')
-@status_info.handle() # type: ignore
+@status_info.handle() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
- group = str(event.group_id)
-
- if banList(user, group):
- msg = str(event.message).strip()
-
- if msg:
- pass
- else:
- msg0 = "States parameter:\n"
- msg0 += "├info\n"
- msg0 += "└sqlite\n"
- msg0 += "* DEMO: status info"
-
- await status_info.finish(msg0)
-
- if msg == "info":
- try:
- cpu = psutil.cpu_percent(interval=1)
- memory = psutil.virtual_memory().percent
- disk = psutil.disk_usage('/').percent
- inteSENT = psutil.net_io_counters().bytes_sent # type: ignore
- inteRECV = psutil.net_io_counters().bytes_recv # type: ignore
- except:
- await status_info.finish(errorRepo("读取系统状态失败"))
-
- status = "アトリは、高性能ですから!"
-
- if cpu > 80: # type: ignore
- status = 'ATRI感觉头有点晕...'
- if memory > 80: # type: ignore
- status = 'ATRI感觉有点头晕并且有点累...'
- elif disk > 80: # type: ignore
- status = 'ATRI感觉身体要被塞满了...'
-
- msg0 = "ATRI status-info:\n"
- msg0 += f"* CPU: {cpu}%\n" # type: ignore
- msg0 += f"* MEM: {memory}%\n" # type: ignore
- msg0 += f"* Disk {disk}%\n" # type: ignore
- msg0 += f"* BytesSENT: {inteSENT}\n" # type: ignore
- msg0 += f"* BytesRECV: {inteRECV}\n" # type: ignore
- msg0 += status
-
- await status_info.finish(msg0)
-
- elif msg == "sqlite":
- con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / 'normal.db') # setu-normal
- cur = con.cursor()
- cur.execute("select * from normal")
- data_normal = len(cur.fetchall())
- con.close()
-
- con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / 'nearR18.db') # setu-nearR18
- cur = con.cursor()
- cur.execute("select * from nearR18")
- data_nearR18 = len(cur.fetchall())
- con.close()
-
- con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' / 'r18.db') # setu-r18
- cur = con.cursor()
- cur.execute("select * from r18")
- data_r18 = len(cur.fetchall())
- con.close()
-
- msg0 = "ATRI status-sqlite:\n"
- msg0 += "Setu:\n"
- msg0 += f"├normal: {data_normal}\n"
- msg0 += f"├nearR18: {data_nearR18}\n"
- msg0 += f"└R18: {data_r18}"
-
- await status_info.finish(msg0) \ No newline at end of file
+ msg = str(event.message).strip()
+
+ if msg:
+ pass
+ else:
+ msg0 = "States parameter:\n"
+ msg0 += "├info\n"
+ msg0 += "└sqlite\n"
+ msg0 += "* DEMO: status info"
+
+ await status_info.finish(msg0)
+
+ if msg == "info":
+ try:
+ cpu = psutil.cpu_percent(interval=1)
+ memory = psutil.virtual_memory().percent
+ disk = psutil.disk_usage('/').percent
+ inteSENT = psutil.net_io_counters().bytes_sent # type: ignore
+ inteRECV = psutil.net_io_counters().bytes_recv # type: ignore
+ except:
+ await status_info.finish(errorRepo("读取系统状态失败"))
+
+ status = "アトリは、高性能ですから!"
+
+ if cpu > 80: # type: ignore
+ status = 'ATRI感觉头有点晕...'
+ if memory > 80: # type: ignore
+ status = 'ATRI感觉有点头晕并且有点累...'
+ elif disk > 80: # type: ignore
+ status = 'ATRI感觉身体要被塞满了...'
+
+ msg0 = "ATRI status-info:\n"
+ msg0 += f"* CPU: {cpu}%\n" # type: ignore
+ msg0 += f"* MEM: {memory}%\n" # type: ignore
+ msg0 += f"* Disk {disk}%\n" # type: ignore
+ msg0 += f"* BytesSENT: {inteSENT}\n" # type: ignore
+ msg0 += f"* BytesRECV: {inteRECV}\n" # type: ignore
+ msg0 += status
+
+ await status_info.finish(msg0)
+
+ elif msg == "sqlite":
+ con = sqlite3.connect(
+ Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' /
+ 'normal.db') # setu-normal
+ cur = con.cursor()
+ cur.execute("select * from normal")
+ data_normal = len(cur.fetchall())
+ con.close()
+
+ con = sqlite3.connect(
+ Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' /
+ 'nearR18.db') # setu-nearR18
+ cur = con.cursor()
+ cur.execute("select * from nearR18")
+ data_nearR18 = len(cur.fetchall())
+ con.close()
+
+ con = sqlite3.connect(
+ Path('.') / 'ATRI' / 'data' / 'data_Sqlite' / 'setu' /
+ 'r18.db') # setu-r18
+ cur = con.cursor()
+ cur.execute("select * from r18")
+ data_r18 = len(cur.fetchall())
+ con.close()
+
+ msg0 = "ATRI status-sqlite:\n"
+ msg0 += "Setu:\n"
+ msg0 += f"├normal: {data_normal}\n"
+ msg0 += f"├nearR18: {data_nearR18}\n"
+ msg0 += f"└R18: {data_r18}"
+
+ await status_info.finish(msg0)
diff --git a/ATRI/plugins/plugin_test/__init__.py b/ATRI/plugins/plugin_test/__init__.py
index 8ff0a28..6455d95 100644
--- a/ATRI/plugins/plugin_test/__init__.py
+++ b/ATRI/plugins/plugin_test/__init__.py
@@ -20,35 +20,32 @@ from nonebot.plugin import on_command, on_message
from nonebot.permission import SUPERUSER
from nonebot.adapters.cqhttp import Bot, Event
-
# 此目录下均为功能测试!
-
testRecord = on_command('测试语音', permission=SUPERUSER)
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- await testRecord.finish(f"[CQ:record,file=file:///{os.path.abspath(Path('.') / 'ATRI' / 'plugins' / 'plugin_test' / 'test.mp3')}]")
+ await testRecord.finish(
+ f"[CQ:record,file=file:///{os.path.abspath(Path('.') / 'ATRI' / 'plugins' / 'plugin_test' / 'test.mp3')}]"
+ )
testGroupList = on_command('获取群列表', permission=SUPERUSER)
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
group_list = await bot.get_group_list()
group = sample(group_list, 1)
print(group[0]['group_id'], type(group[0]['group_id']))
+
testBot = on_command('获取bot', permission=SUPERUSER)
[email protected]() # type: ignore
+
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
test_bot = nonebot.get_bots()
print(test_bot, type(test_bot.keys()))
-
-testPrivate = on_message()
-
[email protected]() # type: ignore
-async def _(bot: Bot, event: Event, state: dict) -> None:
- if event.user_id == "1172294279":
- await bot.send(event, "123") \ No newline at end of file
diff --git a/ATRI/plugins/plugin_utils/__init__.py b/ATRI/plugins/plugin_utils/__init__.py
index eeae31d..a35bb41 100644
--- a/ATRI/plugins/plugin_utils/__init__.py
+++ b/ATRI/plugins/plugin_utils/__init__.py
@@ -23,11 +23,11 @@ from datetime import datetime, timedelta
from nonebot.plugin import on_command
from nonebot.adapters.cqhttp import Bot, Event
-from utils.utils_banList import banList
-
+from utils.utils_rule import check_banlist, check_switch
file = Path('.') / 'ATRI' / 'data' / 'data_IDcard' / 'main.bin'
+
def infoID() -> Tuple[Dict[str, List[str]], Dict[str, str]]:
with PyZipFile(os.path.abspath(file), "r") as zipFile:
with zipFile.open("name.json", "r") as f:
@@ -36,23 +36,24 @@ def infoID() -> Tuple[Dict[str, List[str]], Dict[str, str]]:
area = json.loads(f.read().decode())
return name, area
+
NAME, AREA = infoID()
-BIRTH_BEGIN = datetime(*[1980, 10, 10]) # type: ignore
-BIRTH_END = datetime(*[2002, 10, 10]) # type: ignore
+BIRTH_BEGIN = datetime(*[1980, 10, 10]) # type: ignore
+BIRTH_END = datetime(*[2002, 10, 10]) # type: ignore
+
def numberID(area: int, sex: int, birth: int) -> str:
def checkSum(fullCode: str) -> int or str:
assert len(fullCode) == 17
- checkSum = sum(
- [((1 << (17 - i)) % 11) * int(fullCode[i]) for i in range(0, 17)]
- )
+ checkSum = sum([((1 << (17 - i)) % 11) * int(fullCode[i])
+ for i in range(0, 17)])
checkDigit = (12 - (checkSum % 11)) % 11
if checkDigit < 10:
return checkDigit
else:
return "X"
-
+
orderCode = str(random.randint(10, 99))
sexCode = str(random.randrange(sex, 10, step=2))
fullCode = str(area) + str(birth) + str(orderCode) + str(sexCode)
@@ -61,28 +62,28 @@ def numberID(area: int, sex: int, birth: int) -> str:
plugin_name = "one-key-adult"
-generateID = on_command("我要转大人,一天打25小时游戏")
+generateID = on_command("我要转大人,一天打25小时游戏",
+ rule=check_banlist() & check_switch(plugin_name))
+
[email protected]() # type: ignore
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
- user = str(event.user_id)
- group = str(event.group_id)
-
- if banList(user, group):
- id_card_area = int(random.choice(list(AREA.keys())))
- id_card_area_name = AREA[str(id_card_area)]
- id_card_year_old = timedelta(days=random.randint(0, (BIRTH_END - BIRTH_BEGIN).days) + 1)
- id_card_birth_day = strftime("%Y%m%d", (BIRTH_BEGIN + id_card_year_old).timetuple())
- id_card_sex = random.choice([0, 1])
- id_card_name = random.choice(NAME[{0: "female", 1: "male"}[id_card_sex]])
- id_card_id = numberID(id_card_area, id_card_sex, id_card_birth_day)
-
- msg0 = "恭喜,你已经成大人了!\n"
- msg0 += "这是你一天25h游戏的通行证:\n"
- msg0 += f"NumberID: {id_card_id}\n"
- msg0 += f"Sex: {'男' if id_card_sex == 1 else '女'}\n"
- msg0 += f"Name: {id_card_name} || Address: {id_card_area_name}\n"
- msg0 += "注: 1、以上信息根据国家公开标准生成,非真实信息。\n"
- msg0 += " 2、不适用于网易和腾讯。"
-
- await generateID.finish(msg0)
+ id_card_area = int(random.choice(list(AREA.keys())))
+ id_card_area_name = AREA[str(id_card_area)]
+ id_card_year_old = timedelta(
+ days=random.randint(0, (BIRTH_END - BIRTH_BEGIN).days) + 1)
+ id_card_birth_day = strftime("%Y%m%d",
+ (BIRTH_BEGIN + id_card_year_old).timetuple())
+ id_card_sex = random.choice([0, 1])
+ id_card_name = random.choice(NAME[{0: "female", 1: "male"}[id_card_sex]])
+ id_card_id = numberID(id_card_area, id_card_sex, id_card_birth_day)
+
+ msg0 = "恭喜,你已经成大人了!\n"
+ msg0 += "这是你一天25h游戏的通行证:\n"
+ msg0 += f"NumberID: {id_card_id}\n"
+ msg0 += f"Sex: {'男' if id_card_sex == 1 else '女'}\n"
+ msg0 += f"Name: {id_card_name} || Address: {id_card_area_name}\n"
+ msg0 += "注: 1、以上信息根据国家公开标准生成,非真实信息。\n"
+ msg0 += " 2、不适用于网易和腾讯。"
+
+ await generateID.finish(msg0)