summaryrefslogtreecommitdiff
path: root/ATRI/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins')
-rw-r--r--ATRI/plugins/anime_search.py69
-rw-r--r--ATRI/plugins/call_owner.py50
-rw-r--r--ATRI/plugins/code_runner.py101
-rw-r--r--ATRI/plugins/curse/__init__.py18
-rw-r--r--ATRI/plugins/essential.py206
-rw-r--r--ATRI/plugins/funny.py129
-rw-r--r--ATRI/plugins/github.py15
-rw-r--r--ATRI/plugins/help.py22
-rw-r--r--ATRI/plugins/hitokoto.py21
-rw-r--r--ATRI/plugins/manage/__init__.py3
-rw-r--r--ATRI/plugins/manage/modules/block.py124
-rw-r--r--ATRI/plugins/manage/modules/broadcast.py42
-rw-r--r--ATRI/plugins/manage/modules/debug.py97
-rw-r--r--ATRI/plugins/manage/modules/dormant.py12
-rw-r--r--ATRI/plugins/manage/modules/request.py102
-rw-r--r--ATRI/plugins/manage/modules/service.py160
-rw-r--r--ATRI/plugins/manage/modules/shutdown.py10
-rw-r--r--ATRI/plugins/nsfw.py86
-rw-r--r--ATRI/plugins/rich/__init__.py31
-rw-r--r--ATRI/plugins/rich/data_source.py10
-rw-r--r--ATRI/plugins/saucenao/__init__.py60
-rw-r--r--ATRI/plugins/saucenao/data_source.py26
-rw-r--r--ATRI/plugins/setu/__init__.py8
-rw-r--r--ATRI/plugins/setu/modules/data_source.py178
-rw-r--r--ATRI/plugins/setu/modules/main_setu.py193
-rw-r--r--ATRI/plugins/setu/modules/scheduler.py19
-rw-r--r--ATRI/plugins/setu/modules/store.py138
-rw-r--r--ATRI/plugins/status.py77
-rw-r--r--ATRI/plugins/utils/__init__.py89
-rw-r--r--ATRI/plugins/utils/data_source.py111
-rw-r--r--ATRI/plugins/wife/__init__.py129
-rw-r--r--ATRI/plugins/wife/data_source.py90
32 files changed, 1741 insertions, 685 deletions
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py
index 1c7e5a3..68a5add 100644
--- a/ATRI/plugins/anime_search.py
+++ b/ATRI/plugins/anime_search.py
@@ -8,7 +8,7 @@ from nonebot.typing import T_State
from ATRI.service import Service as sv
from ATRI.rule import is_in_service
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
from ATRI.utils.request import get_bytes
from ATRI.utils.translate import to_simple_string
from ATRI.utils.ub_paste import paste
@@ -24,40 +24,45 @@ __doc__ = """
以图搜番 (pic)
"""
-anime_search = sv.on_command(cmd="以图搜番", docs=__doc__, rule=is_in_service("以图搜番"))
-
+anime_search = sv.on_command(
+ cmd="以图搜番",
+ docs=__doc__,
+ rule=is_in_service('以图搜番')
+)
@anime_search.args_parser # type: ignore
async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
- quit_list = ["算了", "罢了", "不搜了", "取消"]
+ quit_list = ['算了', '罢了', '不搜了', '取消']
if msg in quit_list:
- await anime_search.finish("好吧...")
+ await anime_search.finish('好吧...')
if not msg:
- await anime_search.reject("图呢?")
+ await anime_search.reject('图呢?')
else:
- state["pic_anime"] = msg
-
+ state['pic_anime'] = msg
@anime_search.handle()
-async def _anime_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _anime_search(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["pic_anime"] = msg
-
+ state['pic_anime'] = msg
-@anime_search.got("pic_anime", prompt="图呢?")
-async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = state["pic_anime"]
+@anime_search.got('pic_anime', prompt='图呢?')
+async def _deal_search(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ msg = state['pic_anime']
img = re.findall(r"url=(.*?)]", msg)
if not img:
await anime_search.reject("请发送图片而不是其它东西!!")
-
+
try:
req = await get_bytes(URL + img[0])
- except RequestTimeOut:
- raise RequestTimeOut("Request failed!")
-
+ except RequestError:
+ raise RequestError("Request failed!")
+
data = json.loads(req)["docs"]
try:
d = dict()
@@ -67,24 +72,28 @@ async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
else:
m = data[i]["at"] / 60
s = data[i]["at"] % 60
-
+
if not data[i]["episode"]:
n = 1
else:
n = data[i]["episode"]
-
+
d[to_simple_string(data[i]["title_chinese"])] = [
data[i]["similarity"],
f"第{n}集",
- f"{int(m)}分{int(s)}秒处",
+ f"{int(m)}分{int(s)}秒处"
]
except Exception as err:
raise Exception(f"Invalid data.\n{err}")
-
- result = sorted(d.items(), key=lambda x: x[1], reverse=True)
-
+
+ result = sorted(
+ d.items(),
+ key=lambda x:x[1],
+ reverse=True
+ )
+
t = 0
-
+
msg0 = f"> {event.sender.nickname}"
for i in result:
t += 1
@@ -95,15 +104,15 @@ async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
f"Name: {i[0]}\n"
f"Time: {i[1][1]} {i[1][2]}"
)
-
+
if len(result) == 2:
await anime_search.finish(Message(msg0))
else:
data = FormData()
- data.add_field("poster", "ATRI running log")
- data.add_field("syntax", "text")
- data.add_field("expiration", "day")
- data.add_field("content", msg0)
+ data.add_field('poster', 'ATRI running log')
+ data.add_field('syntax', 'text')
+ data.add_field('expiration', 'day')
+ data.add_field('content', msg0)
repo = f"> {event.sender.nickname}\n"
repo = repo + f"详细请移步此处~\n{await paste(data)}"
diff --git a/ATRI/plugins/call_owner.py b/ATRI/plugins/call_owner.py
index d282824..3fb5a01 100644
--- a/ATRI/plugins/call_owner.py
+++ b/ATRI/plugins/call_owner.py
@@ -1,9 +1,12 @@
from nonebot.permission import SUPERUSER
from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.cqhttp import (
+ Bot,
+ MessageEvent
+)
from ATRI.service import Service as sv
-from ATRI.config import Config
+from ATRI.config import BotSelfConfig
from ATRI.utils.apscheduler import scheduler
from ATRI.utils.list import count_list
@@ -18,44 +21,48 @@ __doc__ = """
repo = sv.on_command(cmd="来杯红茶", docs=__doc__)
-
@repo.args_parser # type: ignore
async def _repo_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
if msg == "算了":
- await repo.finish("好吧")
-
+ await repo.finish('好吧')
+
if not msg:
- await repo.reject("话呢?")
+ await repo.reject('话呢?')
else:
- state["msg_repo"] = msg
-
+ state['msg_repo'] = msg
@repo.handle()
async def _repo(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["msg_repo"] = msg
-
+ state['msg_repo'] = msg
[email protected]("msg_repo", prompt="请告诉咱需要反馈的内容~!")
[email protected]('msg_repo', prompt="请告诉咱需要反馈的内容~!")
async def _repo_deal(bot: Bot, event: MessageEvent, state: T_State) -> None:
global repo_list
- msg = state["msg_repo"]
+ msg = state['msg_repo']
user = event.user_id
-
+
if count_list(repo_list, user) == 5:
await repo.finish("吾辈已经喝了五杯红茶啦!明天再来吧。")
-
+
repo_list.append(user)
- for sup in Config.BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}")
-
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(
+ user_id=sup,
+ message=f"来自用户[{user}]反馈:\n{msg}"
+ )
+
await repo.finish("吾辈的心愿已由咱转告给咱的维护者了~!")
[email protected]_job("cron", hour=0, misfire_grace_time=60)
+ "cron",
+ hour=0,
+ misfire_grace_time=60
+)
async def _() -> None:
global repo_list
repo_list.clear()
@@ -68,8 +75,11 @@ __doc__ = """
/重置红茶
"""
-reset_repo = sv.on_command(cmd="重置红茶", docs=__doc__, permission=SUPERUSER)
-
+reset_repo = sv.on_command(
+ cmd="重置红茶",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@reset_repo.handle()
async def _reset_repo(bot: Bot, event: MessageEvent) -> None:
diff --git a/ATRI/plugins/code_runner.py b/ATRI/plugins/code_runner.py
index d34872f..3c2b196 100644
--- a/ATRI/plugins/code_runner.py
+++ b/ATRI/plugins/code_runner.py
@@ -7,36 +7,36 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.rule import is_in_service
from ATRI.service import Service as sv
from ATRI.utils.request import post_bytes
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
-RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest"
+RUN_API_URL_FORMAT = 'https://glot.io/run/{}?version=latest'
SUPPORTED_LANGUAGES = {
- "assembly": {"ext": "asm"},
- "bash": {"ext": "sh"},
- "c": {"ext": "c"},
- "clojure": {"ext": "clj"},
- "coffeescript": {"ext": "coffe"},
- "cpp": {"ext": "cpp"},
- "csharp": {"ext": "cs"},
- "erlang": {"ext": "erl"},
- "fsharp": {"ext": "fs"},
- "go": {"ext": "go"},
- "groovy": {"ext": "groovy"},
- "haskell": {"ext": "hs"},
- "java": {"ext": "java", "name": "Main"},
- "javascript": {"ext": "js"},
- "julia": {"ext": "jl"},
- "kotlin": {"ext": "kt"},
- "lua": {"ext": "lua"},
- "perl": {"ext": "pl"},
- "php": {"ext": "php"},
- "python": {"ext": "py"},
- "ruby": {"ext": "rb"},
- "rust": {"ext": "rs"},
- "scala": {"ext": "scala"},
- "swift": {"ext": "swift"},
- "typescript": {"ext": "ts"},
+ 'assembly': {'ext': 'asm'},
+ 'bash': {'ext': 'sh'},
+ 'c': {'ext': 'c'},
+ 'clojure': {'ext': 'clj'},
+ 'coffeescript': {'ext': 'coffe'},
+ 'cpp': {'ext': 'cpp'},
+ 'csharp': {'ext': 'cs'},
+ 'erlang': {'ext': 'erl'},
+ 'fsharp': {'ext': 'fs'},
+ 'go': {'ext': 'go'},
+ 'groovy': {'ext': 'groovy'},
+ 'haskell': {'ext': 'hs'},
+ 'java': {'ext': 'java', 'name': 'Main'},
+ 'javascript': {'ext': 'js'},
+ 'julia': {'ext': 'jl'},
+ 'kotlin': {'ext': 'kt'},
+ 'lua': {'ext': 'lua'},
+ 'perl': {'ext': 'pl'},
+ 'php': {'ext': 'php'},
+ 'python': {'ext': 'py'},
+ 'ruby': {'ext': 'rb'},
+ 'rust': {'ext': 'rs'},
+ 'scala': {'ext': 'scala'},
+ 'swift': {'ext': 'swift'},
+ 'typescript': {'ext': 'ts'},
}
@@ -50,55 +50,54 @@ __doc__ = """
print('Hello world!')
"""
-code_runner = sv.on_command(cmd="/code", docs=__doc__, rule=is_in_service("code"))
-
+code_runner = sv.on_command(
+ cmd="/code",
+ docs=__doc__,
+ rule=is_in_service('code')
+)
@code_runner.handle()
async def _code_runner(bot: Bot, event: MessageEvent) -> None:
msg = str(event.message).split("\n")
-
+
if msg[0] == "list":
msg0 = "咱现在支持的语言如下:\n"
msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys()))
-
+
await code_runner.finish(msg0)
elif not msg[0]:
await code_runner.finish("请键入/help以获取更多支持...")
-
+
laug = msg[0].replace("\r", "")
if laug not in SUPPORTED_LANGUAGES:
await code_runner.finish("该语言暂不支持...")
-
+
del msg[0]
code = "\n".join(map(str, msg))
try:
req = await post_bytes(
RUN_API_URL_FORMAT.format(laug),
json={
- "files": [
- {
- "name": (
- SUPPORTED_LANGUAGES[laug].get("name", "main")
- + f".{SUPPORTED_LANGUAGES[laug]['ext']}"
- ),
- "content": code,
- }
- ],
+ "files": [{
+ "name": (SUPPORTED_LANGUAGES[laug].get("name", "main") +
+ f".{SUPPORTED_LANGUAGES[laug]['ext']}"),
+ "content": code
+ }],
"stdin": "",
- "command": "",
- },
+ "command": ""
+ }
)
- except RequestTimeOut:
- raise RequestTimeOut("Failed to request!")
-
+ except RequestError:
+ raise RequestError("Failed to request!")
+
payload = json.loads(req)
sent = False
- for k in ["stdout", "stderr", "error"]:
+ for k in ['stdout', 'stderr', 'error']:
v = payload.get(k)
lines = v.splitlines()
lines, remained_lines = lines[:10], lines[10:]
- out = "\n".join(lines)
- out, remained_out = out[: 60 * 10], out[60 * 10 :]
+ out = '\n'.join(lines)
+ out, remained_out = out[:60 * 10], out[60 * 10:]
if remained_lines or remained_out:
out += f"\n(太多了太多了...)"
@@ -106,6 +105,6 @@ async def _code_runner(bot: Bot, event: MessageEvent) -> None:
if out:
await bot.send(event, f"{k}:\n\n{out}")
sent = True
-
+
if not sent:
await code_runner.finish("Running success! Nothing print.")
diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py
index acfaa9b..a646eaa 100644
--- a/ATRI/plugins/curse/__init__.py
+++ b/ATRI/plugins/curse/__init__.py
@@ -4,7 +4,7 @@ from ATRI.service import Service as sv
from ATRI.rule import is_in_service
from ATRI.utils.list import count_list, del_list_aim
from ATRI.utils.request import get_text
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
URL = "https://zuanbot.com/api.php?level=min&lang=zh_cn"
@@ -19,17 +19,23 @@ __doc__ = """
"""
curse = sv.on_command(
- cmd="口臭", docs=__doc__, aliases={"口臭一下,骂我"}, rule=is_in_service("口臭")
+ cmd='口臭',
+ docs=__doc__,
+ aliases={'口臭一下,骂我'},
+ rule=is_in_service('口臭')
)
-
@curse.handle()
async def _curse(bot: Bot, event: MessageEvent) -> None:
global sick_list
user = event.get_user_id()
if count_list(sick_list, user) == 3:
sick_list.append(user)
- repo = "不是??你这么想被咱骂的嘛??" "被咱骂就这么舒服的吗?!" "该......你该不会是.....M吧!"
+ repo = (
+ "不是??你这么想被咱骂的嘛??"
+ "被咱骂就这么舒服的吗?!"
+ "该......你该不会是.....M吧!"
+ )
await curse.finish(repo)
elif count_list(sick_list, user) == 6:
sick_list = del_list_aim(sick_list, user)
@@ -38,5 +44,5 @@ async def _curse(bot: Bot, event: MessageEvent) -> None:
sick_list.append(user)
try:
await curse.finish(await get_text(URL))
- except RequestTimeOut:
- raise RequestTimeOut("Time out!")
+ except RequestError:
+ raise RequestError("Time out!")
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
index 7f14882..72090e3 100644
--- a/ATRI/plugins/essential.py
+++ b/ATRI/plugins/essential.py
@@ -24,19 +24,19 @@ from nonebot.adapters.cqhttp import (
LuckyKingNotifyEvent,
GroupUploadNoticeEvent,
GroupRecallNoticeEvent,
- FriendRecallNoticeEvent,
+ FriendRecallNoticeEvent
)
import ATRI
from ATRI.log import logger
from ATRI.exceptions import WriteError
-from ATRI.config import Config
+from ATRI.config import BotSelfConfig
from ATRI.service import Service as sv
from ATRI.utils.cqcode import coolq_code_check
-PLUGIN_INFO_DIR = Path(".") / "ATRI" / "data" / "service" / "services"
-ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential"
+PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services'
+ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
os.makedirs(PLUGIN_INFO_DIR, exist_ok=True)
os.makedirs(ESSENTIAL_DIR, exist_ok=True)
@@ -57,57 +57,71 @@ async def shutdown() -> None:
shutil.rmtree(PLUGIN_INFO_DIR)
logger.debug("成功!")
except Exception:
- repo = ("清理插件信息失败", "请前往 ATRI/data/service/services 下", "将 services 整个文件夹删除")
+ repo = (
+ '清理插件信息失败',
+ '请前往 ATRI/data/service/services 下',
+ '将 services 整个文件夹删除'
+ )
time.sleep(10)
raise Exception(repo)
@driver.on_bot_connect
async def connect(bot) -> None:
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 成功连接,数据开始传输。")
+ for superuser in BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ int(superuser),
+ "WebSocket 成功连接,数据开始传输。"
+ )
@driver.on_bot_disconnect
async def disconnect(bot) -> None:
- for superuser in Config.BotSelfConfig.superusers:
+ for superuser in BotSelfConfig.superusers:
try:
- await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 貌似断开了呢...")
+ await sv.NetworkPost.send_private_msg(
+ int(superuser),
+ "WebSocket 貌似断开了呢..."
+ )
except:
logger.error("WebSocket 已断开,等待重连")
@run_preprocessor # type: ignore
-async def _check_block(
- matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State
-) -> None:
+async def _check_block(matcher: Matcher,
+ bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
user = str(event.user_id)
try:
msg = str(event.message)
except:
msg = ""
if not sv.BlockSystem.auth_user(user):
- raise IgnoredException(f"Block user: {user}")
-
+ raise IgnoredException(f'Block user: {user}')
+
if not sv.Dormant.is_dormant():
if "/dormant" not in msg:
- raise IgnoredException("Bot has been dormant.")
-
+ raise IgnoredException('Bot has been dormant.')
+
if isinstance(event, GroupMessageEvent):
group = str(event.group_id)
if not sv.BlockSystem.auth_group(group):
- raise IgnoredException(f"Block group: {group}")
+ raise IgnoredException(f'Block group: {group}')
@run_preprocessor # type: ignore
-async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> None:
+async def _store_message(matcher: Matcher,
+ bot: Bot,
+ event,
+ state: T_State) -> None:
if isinstance(event, GroupMessageEvent):
if event.sub_type == "normal":
- now_time = datetime.now().strftime("%Y-%m-%d")
- GROUP_DIR = ESSENTIAL_DIR / "chat_history" / f"{event.group_id}"
+ now_time = datetime.now().strftime('%Y-%m-%d')
+ GROUP_DIR = ESSENTIAL_DIR / 'chat_history' / f'{event.group_id}'
os.makedirs(GROUP_DIR, exist_ok=True)
path = GROUP_DIR / f"{now_time}.chat.json"
- now_time = datetime.now().strftime("%Y%m%d-%H%M%S")
+ now_time = datetime.now().strftime('%Y%m%d-%H%M%S')
try:
data = json.loads(path.read_bytes())
@@ -133,12 +147,12 @@ async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> N
"area": event.sender.area,
"level": event.sender.level,
"role": event.sender.role,
- "title": event.sender.title,
+ "title": event.sender.title
},
- "to_me": str(event.to_me),
+ "to_me": str(event.to_me)
}
try:
- with open(path, "w", encoding="utf-8") as r:
+ with open(path, 'w', encoding='utf-8') as r:
r.write(json.dumps(data, indent=4))
logger.debug(f"写入消息成功,id: {event.message_id}")
except WriteError:
@@ -154,44 +168,52 @@ async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> N
# 处理:好友请求
request_friend_event = sv.on_request()
-
@request_friend_event.handle()
async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
file_name = "request_friend.json"
path = ESSENTIAL_DIR / file_name
path.parent.mkdir(exist_ok=True, parents=True)
-
+
try:
data = json.loads(path.read_bytes())
except:
data = dict()
- data[event.flag] = {"user_id": event.user_id, "comment": event.comment}
+ data[event.flag] = {
+ "user_id": event.user_id,
+ "comment": event.comment
+ }
try:
- with open(path, "w", encoding="utf-8") as r:
- r.write(json.dumps(data, indent=4))
+ with open(path, 'w', encoding='utf-8') as r:
+ r.write(
+ json.dumps(
+ data, indent=4
+ )
+ )
except WriteError:
raise WriteError("Writing file failed!")
-
- for superuser in Config.BotSelfConfig.superusers:
+
+ for superuser in BotSelfConfig.superusers:
msg = (
"主人,收到一条好友请求:\n"
f"请求人:{event.get_user_id()}\n"
f"申请信息:{event.comment}\n"
f"申请码:{event.flag}"
)
- await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
+ )
# 处理:邀请入群,如身为管理,还附有入群请求
request_group_event = sv.on_request()
-
@request_group_event.handle()
async def _request_group_event(bot, event: GroupRequestEvent) -> None:
file_name = "request_group.json"
path = ESSENTIAL_DIR / file_name
path.parent.mkdir(exist_ok=True, parents=True)
-
+
try:
data = json.loads(path.read_bytes())
except:
@@ -200,42 +222,56 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None:
"user_id": event.user_id,
"group_id": event.group_id,
"sub_type": event.sub_type,
- "comment": event.comment,
+ "comment": event.comment
}
try:
- with open(path, "w", encoding="utf-8") as r:
- r.write(json.dumps(data, indent=4))
+ with open(path, 'w', encoding='utf-8') as r:
+ r.write(
+ json.dumps(
+ data, indent=4
+ )
+ )
except WriteError:
raise WriteError("Writing file failed!")
-
- for superuser in Config.BotSelfConfig.superusers:
+
+ for superuser in BotSelfConfig.superusers:
msg = (
"主人,收到一条入群请求:\n"
f"请求人:{event.get_user_id()}\n"
f"申请信息:{event.comment}\n"
f"申请码:{event.flag}"
)
- await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
-
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
+ )
+
# 处理群成员变动
group_member_event = sv.on_notice()
-
@group_member_event.handle()
async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None:
- msg = "好欸!事新人!\n" f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!"
+ msg = (
+ "好欸!事新人!\n"
+ f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!"
+ )
await group_member_event.finish(msg)
-
@group_member_event.handle()
async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None:
if event.is_tome():
if event.user_id != event.self_id:
return
- msg = "呜呜呜,主人" f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..."
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+ msg = (
+ "呜呜呜,主人"
+ f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..."
+ )
+ for superuser in BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
+ )
else:
await group_member_event.finish("阿!有人离开了我们...")
@@ -243,82 +279,98 @@ async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None:
# 处理群管理事件
group_admin_event = sv.on_notice()
-
@group_admin_event.handle()
async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None:
if not event.is_tome():
return
-
- for superuser in Config.BotSelfConfig.superusers:
+
+ for superuser in BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(
- user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
+ user_id=int(superuser),
+ message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
)
# 处理群禁言事件
group_ban_event = sv.on_notice()
-
@group_ban_event.handle()
async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
if not event.is_tome():
return
-
+
if event.duration:
msg = (
"那个..。,主人\n"
f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n"
f"时长...是 {event.duration} 秒"
)
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+ for superuser in BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
+ )
else:
- msg = "好欸!主人\n" f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!"
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+ msg = (
+ "好欸!主人\n"
+ f"咱在群 {event.group_id} 的口球被 {event.operator_id} 解除了!"
+ )
+ for superuser in BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
+ )
# 处理群红包运气王事件
lucky_read_bag_event = sv.on_notice()
-
@lucky_read_bag_event.handle()
async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None:
- msg = "8行,这可忍?" f"gkd [CQ:at,qq={event.user_id}] 发一个!"
+ msg = (
+ "8行,这可忍?"
+ f"gkd [CQ:at,qq={event.user_id}] 发一个!"
+ )
await lucky_read_bag_event.finish(Message(msg))
# 处理群文件上传事件
group_file_upload_event = sv.on_notice()
-
@group_file_upload_event.handle()
-async def _group_file_upload_event(bot, event: GroupUploadNoticeEvent) -> None:
+async def _group_file_upload_event(bot,
+ event: GroupUploadNoticeEvent) -> None:
await group_file_upload_event.finish("让我康康传了啥好东西")
# 处理撤回事件
recall_event = sv.on_notice()
-
@recall_event.handle()
async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None:
try:
repo = await bot.get_msg(message_id=event.message_id)
except:
return
-
+
group = event.group_id
repo = str(repo["message"])
check = await coolq_code_check(repo, group=group)
if not check:
repo = repo.replace("CQ", "QC")
- msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[群:{event.group_id}]\n" "撤回了\n" f"{repo}"
-
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+ msg = (
+ "主人,咱拿到了一条撤回信息!\n"
+ f"{event.user_id}@[群:{event.group_id}]\n"
+ "撤回了\n"
+ f"{repo}"
+ )
+ for superuser in BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
+ )
@recall_event.handle()
async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None:
@@ -326,15 +378,23 @@ async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None:
repo = await bot.get_msg(message_id=event.message_id)
except:
return
-
+
user = event.user_id
repo = str(repo["message"])
check = await coolq_code_check(repo, user)
if not check:
repo = repo.replace("CQ", "QC")
-
- msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[私聊]" "撤回了\n" f"{repo}"
+
+ msg = (
+ "主人,咱拿到了一条撤回信息!\n"
+ f"{event.user_id}@[私聊]"
+ "撤回了\n"
+ f"{repo}"
+ )
await bot.send(event, "咱看到惹~!")
- for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+ for superuser in BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
+ )
diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py
index 72e5abb..39b9f28 100644
--- a/ATRI/plugins/funny.py
+++ b/ATRI/plugins/funny.py
@@ -12,7 +12,7 @@ from ATRI.utils.limit import is_too_exciting
from ATRI.rule import is_in_service
from ATRI.utils.request import post_bytes
from ATRI.utils.translate import to_simple_string
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
__doc__ = """
@@ -22,25 +22,27 @@ __doc__ = """
来句笑话
"""
-get_laugh = sv.on_command(cmd="来句笑话", docs=__doc__, rule=is_in_service("来句笑话"))
-
+get_laugh = sv.on_command(
+ cmd='来句笑话',
+ docs=__doc__,
+ rule=is_in_service('来句笑话')
+)
@get_laugh.handle()
async def _get_laugh(bot: Bot, event: MessageEvent) -> None:
user_name = event.sender.nickname
laugh_list = []
-
- FILE = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt"
- with open(FILE, "r", encoding="utf-8") as r:
+
+ FILE = Path('.') / 'ATRI' / 'data' / 'database' / 'funny' / 'laugh.txt'
+ with open(FILE, 'r', encoding='utf-8') as r:
for line in r:
- laugh_list.append(line.strip("\n"))
-
+ laugh_list.append(line.strip('\n'))
+
result = choice(laugh_list)
await get_laugh.finish(result.replace("%name", user_name))
-me_to_you = sv.on_message()
-
+me_to_you = sv.on_message(priority=5)
@me_to_you.handle()
async def _me_to_you(bot: Bot, event: MessageEvent) -> None:
@@ -51,38 +53,6 @@ async def _me_to_you(bot: Bot, event: MessageEvent) -> None:
__doc__ = """
-随机选择一位群友成为我的老婆!
-权限组:所有人
-用法:
- 抽老婆
-"""
-
-roll_wife = sv.on_command(cmd="抽老婆", docs=__doc__, rule=is_in_service("抽老婆"))
-
-
-@roll_wife.handle()
-async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None:
- user = event.user_id
- group = event.group_id
-
- user_name = await bot.get_group_member_info(group_id=group, user_id=user)
- user_name = user_name["nickname"]
- run = await is_too_exciting(user, group, 5, True)
- if not run:
- return
-
- luck_list = await bot.get_group_member_list(group_id=group)
- luck_user = choice(luck_list)
- luck_qq = luck_user["user_id"]
- luck_user = luck_user["nickname"]
- msg = "5秒后咱将随机抽取一位群友成为\n" f"{user_name} 的老婆!究竟是谁呢~?"
- await bot.send(event, msg)
- await asyncio.sleep(5)
- msg = f"> {luck_user}({luck_qq})\n" f"恭喜成为 {user_name} 的老婆~⭐"
- await bot.send(event, msg)
-
-
-__doc__ = """
伪造转发
权限组:所有人
用法:
@@ -95,34 +65,43 @@ __doc__ = """
/fakemsg 123456789*生草人*草 114514*仙贝*臭死了
"""
-fake_msg = sv.on_command(cmd="/fakemsg", docs=__doc__, rule=is_in_service("fakemsg"))
-
+fake_msg = sv.on_command(
+ cmd="/fakemsg",
+ docs=__doc__,
+ rule=is_in_service('fakemsg')
+)
@fake_msg.handle()
async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None:
- msg = str(event.message).split(" ")
+ msg = str(event.message).split(' ')
user = event.user_id
group = event.group_id
- node = []
- check = await is_too_exciting(user, group, 1, True)
-
+ node = list()
+ check = is_too_exciting(user, 1, seconds=600)
+
if check:
for i in msg:
- args = i.split("*")
+ args = i.split('*')
qq = args[0]
- name = args[1].replace("[", "[")
- name = name.replace("]", "]")
- repo = args[2].replace("[", "[")
- repo = repo.replace("]", "]")
- dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}}
+ name = args[1].replace('[', '[')
+ name = name.replace(']', ']')
+ repo = args[2].replace('[', '[')
+ repo = repo.replace(']', ']')
+ dic = {
+ "type": "node",
+ "data": {
+ "name": name,
+ "uin": qq,
+ "content": repo
+ }
+ }
node.append(dic)
await bot.send_group_forward_msg(group_id=group, messages=node)
EAT_URL = "https://wtf.hiigara.net/api/run/{}"
-eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service("今天吃什么"))
-
+eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service('今天吃什么'))
@eat_wat.handle()
async def _eat(bot: Bot, event: MessageEvent) -> None:
@@ -131,32 +110,32 @@ async def _eat(bot: Bot, event: MessageEvent) -> None:
user_n = event.sender.nickname
arg = re.findall(r"[今|明|后|大后]天(.*?)吃什么", msg)[0]
nd = re.match(r"[今|明|后|大后][天]", msg)[0]
-
+
if arg == "中午":
a = f"LdS4K6/{randint(0, 999999)}"
url = EAT_URL.format(a)
params = {"event": "ManualRun"}
try:
data = json.loads(await post_bytes(url, params))
- except RequestTimeOut:
- raise RequestTimeOut("Request failed!")
-
- text = to_simple_string(data["text"]).replace("今天", nd)
+ except RequestError:
+ raise RequestError('Request failed!')
+
+ text = to_simple_string(data['text']).replace('今天', nd)
get_a = re.search(r"非常(.*?)的", text)[0]
- result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, "")
-
+ result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, '')
+
elif arg == "晚上":
a = f"KaTMS/{randint(0, 999999)}"
url = EAT_URL.format(a)
params = {"event": "ManualRun"}
try:
data = json.loads(await post_bytes(url, params))
- except RequestTimeOut:
- raise RequestTimeOut("Request failed!")
-
- text = to_simple_string(data["text"]).replace("今天", "")
+ except RequestError:
+ raise RequestError('Request failed!')
+
+ text = to_simple_string(data['text']).replace('今天', '')
result = f"> {MessageSegment.at(user)}\n" + text
-
+
else:
rd = randint(1, 10)
if rd == 5:
@@ -167,13 +146,11 @@ async def _eat(bot: Bot, event: MessageEvent) -> None:
params = {"event": "ManualRun"}
try:
data = json.loads(await post_bytes(url, params))
- except RequestTimeOut:
- raise RequestTimeOut("Request failed!")
-
- text = to_simple_string(data["text"]).replace("今天", nd)
+ except RequestError:
+ raise RequestError('Request failed!')
+
+ text = to_simple_string(data['text']).replace('今天', nd)
get_a = re.match(r"(.*?)的智商", text)[0]
- result = f"> {MessageSegment.at(user)}\n" + text.replace(
- get_a, f"{user_n}的智商"
- )
-
+ result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, f'{user_n}的智商')
+
await eat_wat.finish(Message(result))
diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py
index e7c0d80..27c5328 100644
--- a/ATRI/plugins/github.py
+++ b/ATRI/plugins/github.py
@@ -4,7 +4,7 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import Service as sv
from ATRI.utils.request import get_bytes
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}"
@@ -12,7 +12,6 @@ URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}"
github_issues = sv.on_message()
-
@github_issues.handle()
async def _github_issues(bot: Bot, event: MessageEvent) -> None:
msg = str(event.message)
@@ -20,19 +19,21 @@ async def _github_issues(bot: Bot, event: MessageEvent) -> None:
need_info = re.findall(patt, msg)
if not need_info:
return
-
+
for i in need_info:
need_info = list(i)
owner = need_info[0]
repo = need_info[1]
issue_number = need_info[2]
- url = URL.format(owner=owner, repo=repo, issue_number=issue_number)
-
+ url = URL.format(owner=owner,
+ repo=repo,
+ issue_number=issue_number)
+
try:
data = await get_bytes(url)
- except RequestTimeOut:
+ except RequestError:
return
-
+
data = json.loads(data)
msg0 = (
f"{repo}: #{issue_number} {data['state']}\n"
diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py
index d2754b1..32d3f65 100644
--- a/ATRI/plugins/help.py
+++ b/ATRI/plugins/help.py
@@ -7,7 +7,7 @@ from ATRI.service import SERVICE_DIR
from ATRI.service import Service as sv
-SERVICE_DIR = SERVICE_DIR / "services"
+SERVICE_DIR = SERVICE_DIR / 'services'
__doc__ = """
@@ -19,12 +19,14 @@ __doc__ = """
/help info (cmd)
"""
-help = sv.on_command(cmd="/help", docs=__doc__)
-
+help = sv.on_command(
+ cmd="/help",
+ docs=__doc__
+)
@help.handle()
async def _help(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(" ")
+ msg = str(event.message).split(' ')
if msg[0] == "":
msg = (
"呀?找不到路了?\n"
@@ -39,7 +41,7 @@ async def _help(bot: Bot, event: MessageEvent) -> None:
for _, _, i in os.walk(SERVICE_DIR):
for a in i:
f = SERVICE_DIR / a
- files.append(json.loads(f.read_bytes())["command"])
+ files.append(json.loads(f.read_bytes())['command'])
cmds = " | ".join(map(str, files))
msg = "咱能做很多事!比如:\n" + cmds
msg0 = msg + "\n没反应可能是没权限...或者为探测类型...不属于可直接触发命令..."
@@ -51,9 +53,13 @@ async def _help(bot: Bot, event: MessageEvent) -> None:
try:
data = json.loads(path.read_bytes())
except:
- await help.finish("未找到相关命令...")
+ await help.finish('未找到相关命令...')
- msg = f"{cmd} INFO:\n" f"Enabled: {data['enabled']}\n" f"{data['docs']}"
+ msg = (
+ f"{cmd} INFO:\n"
+ f"Enabled: {data['enabled']}\n"
+ f"{data['docs']}"
+ )
await help.finish(msg)
else:
- await help.finish("请检查输入...")
+ await help.finish('请检查输入...')
diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py
index 1078ca3..09ff2c4 100644
--- a/ATRI/plugins/hitokoto.py
+++ b/ATRI/plugins/hitokoto.py
@@ -4,14 +4,14 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.rule import is_in_service, to_bot
from ATRI.service import Service as sv
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
from ATRI.utils.list import count_list, del_list_aim
from ATRI.utils.request import get_bytes
URL = [
"https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/a.json",
"https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/b.json",
- "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json",
+ "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json"
]
sick_list = []
@@ -24,10 +24,12 @@ __doc__ = """
"""
hitokoto = sv.on_command(
- cmd="一言", aliases={"抑郁一下", "网抑云"}, docs=__doc__, rule=is_in_service("一言") & to_bot()
+ cmd='一言',
+ aliases={'抑郁一下', '网抑云'},
+ docs=__doc__,
+ rule=is_in_service('一言') & to_bot()
)
-
@hitokoto.handle()
async def _hitokoto(bot: Bot, event: MessageEvent) -> None:
global sick_list
@@ -39,13 +41,16 @@ async def _hitokoto(bot: Bot, event: MessageEvent) -> None:
await hitokoto.finish("额......需要咱安慰一下嘛~?")
elif count_list(sick_list, user) == 6:
sick_list = del_list_aim(sick_list, user)
- msg = "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" "我...我会守在你身边的!...嗯..一定"
+ msg = (
+ "如果心里感到难受就赶快去睡觉!别再憋自己了!\n"
+ "我...我会守在你身边的!...嗯..一定"
+ )
await hitokoto.finish(msg)
else:
sick_list.append(user)
url = choice(URL)
try:
data = json.loads(await get_bytes(url))
- except RequestTimeOut:
- raise RequestTimeOut("Request failed!")
- await hitokoto.finish(data[randint(1, len(data) - 1)]["hitokoto"])
+ except RequestError:
+ raise RequestError("Request failed!")
+ await hitokoto.finish(data[randint(1, len(data) - 1)]['hitokoto'])
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py
index b316020..d9f1ffe 100644
--- a/ATRI/plugins/manage/__init__.py
+++ b/ATRI/plugins/manage/__init__.py
@@ -4,4 +4,5 @@ from pathlib import Path
_sub_plugins = set()
-_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve()))
+_sub_plugins |= nonebot.load_plugins(
+ str((Path(__file__).parent / 'modules').resolve()))
diff --git a/ATRI/plugins/manage/modules/block.py b/ATRI/plugins/manage/modules/block.py
index 0c2fcec..7b982d4 100644
--- a/ATRI/plugins/manage/modules/block.py
+++ b/ATRI/plugins/manage/modules/block.py
@@ -12,24 +12,36 @@ __doc__ = """
封禁用户 QQ号
"""
-block_user = sv.on_command(cmd="封禁用户", docs=__doc__, permission=SUPERUSER)
-
+block_user = sv.on_command(
+ cmd="封禁用户",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@block_user.args_parser # type: ignore
-async def _block_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _block_user_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ["算了", "罢了"]
+ cancel = ['算了', '罢了']
if msg in cancel:
- await block_user.finish("好吧...")
+ await block_user.finish('好吧...')
if not msg:
- await block_user.reject("是谁呢?!GKD!")
+ await block_user.reject('是谁呢?!GKD!')
else:
- state["noob"] = msg
-
+ state['noob'] = msg
@block_user.handle()
async def _block_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
- noob = state["noob"]
+ msg = str(event.message).strip()
+ if msg:
+ state['noob'] = msg
+
+@block_user.got('noob', prompt='是谁呢?!GKD!')
+async def _deal_block_user(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ noob = state['noob']
sv.BlockSystem.control_list(True, user=noob)
msg = f"用户[{noob}]已被封禁(;′⌒`)"
await block_user.finish(msg)
@@ -42,24 +54,36 @@ __doc__ = """
解封用户 QQ号
"""
-unblock_user = sv.on_command(cmd="解封用户", docs=__doc__, permission=SUPERUSER)
-
+unblock_user = sv.on_command(
+ cmd="解封用户",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@unblock_user.args_parser # type: ignore
-async def _unblock_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _unblock_user_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ["算了", "罢了"]
+ cancel = ['算了', '罢了']
if msg in cancel:
- await unblock_user.finish("好吧...")
+ await unblock_user.finish('好吧...')
if not msg:
- await unblock_user.reject("要原谅谁呢...")
+ await unblock_user.reject('要原谅谁呢...')
else:
- state["forgive"] = msg
-
+ state['forgive'] = msg
@unblock_user.handle()
async def _unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
- forgive = state["forgive"]
+ msg = str(event.message).strip()
+ if msg:
+ state['forgive'] = msg
+
+@unblock_user.got('forgive', prompt='要原谅谁呢...')
+async def _deal_unblock_user(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ forgive = state['forgive']
sv.BlockSystem.control_list(False, user=forgive)
msg = f"用户[{forgive}]已被解封ヾ(´・ω・`)ノ"
await unblock_user.finish(msg)
@@ -72,24 +96,36 @@ __doc__ = """
封禁群 群号
"""
-block_group = sv.on_command(cmd="封禁群", docs=__doc__, permission=SUPERUSER)
-
+block_group = sv.on_command(
+ cmd="封禁群",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@block_group.args_parser # type: ignore
-async def _block_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _block_group_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ["算了", "罢了"]
+ cancel = ['算了', '罢了']
if msg in cancel:
- await block_user.finish("好吧...")
+ await block_user.finish('好吧...')
if not msg:
- await block_user.reject("是哪个群?!GKD!")
+ await block_user.reject('是哪个群?!GKD!')
else:
- state["noob_g"] = msg
-
+ state['noob_g'] = msg
@block_group.handle()
async def _block_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
- noob_g = state["noob_g"]
+ msg = str(event.message).strip()
+ if msg:
+ state['noob_g'] = msg
+
+@block_group.got('noob_g', prompt='是哪个群?!GKD!')
+async def _deal_block_group(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ noob_g = state['noob_g']
sv.BlockSystem.control_list(True, group=noob_g)
msg = f"群[{noob_g}]已被封禁(;′⌒`)"
await block_user.finish(msg)
@@ -102,24 +138,38 @@ __doc__ = """
解封 群号
"""
-unblock_group = sv.on_command(cmd="解封群", docs=__doc__, permission=SUPERUSER)
-
+unblock_group = sv.on_command(
+ cmd="解封群",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@unblock_group.args_parser # type: ignore
-async def _unblock_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _unblock_group_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ["算了", "罢了"]
+ cancel = ['算了', '罢了']
if msg in cancel:
- await block_user.finish("好吧...")
+ await block_user.finish('好吧...')
if not msg:
- await block_user.reject("要原谅哪个群呢...")
+ await block_user.reject('要原谅哪个群呢...')
else:
- state["forgive_g"] = msg
-
+ state['forgive_g'] = msg
@unblock_group.handle()
-async def _unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
- forgive_g = state["forgive_g"]
+async def _unblock_group(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ msg = str(event.message).strip()
+ if msg:
+ state['forgive_g'] = msg
+
+@unblock_group.got('forgive_g', prompt='要原谅哪个群呢...')
+async def _deal_unblock_group(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ forgive_g = state['forgive_g']
sv.BlockSystem.control_list(False, group=forgive_g)
msg = f"群[{forgive_g}]已被解封ヾ(´・ω・`)ノ"
await unblock_user.finish(msg)
diff --git a/ATRI/plugins/manage/modules/broadcast.py b/ATRI/plugins/manage/modules/broadcast.py
index 7f7816d..5086fcf 100644
--- a/ATRI/plugins/manage/modules/broadcast.py
+++ b/ATRI/plugins/manage/modules/broadcast.py
@@ -15,46 +15,52 @@ __doc__ = """
广播 内容
"""
-broadcast = sv.on_command(cmd="广播", docs=__doc__, permission=SUPERUSER)
-
+broadcast = sv.on_command(
+ cmd="广播",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@broadcast.args_parser # type: ignore
-async def _broadcast_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _broadcast_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message)
- quit_list = ["算了", "罢了", "取消"]
+ quit_list = ['算了', '罢了', '取消']
if msg in quit_list:
- await broadcast.finish("好吧...")
+ await broadcast.finish('好吧...')
if not msg:
- await broadcast.reject("想群发啥呢0w0")
+ await broadcast.reject('想群发啥呢0w0')
else:
- state["msg"] = msg
-
+ state['msg'] = msg
@broadcast.handle()
async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["msg"] = msg
-
-
[email protected]("msg", prompt="请告诉咱需要群发的内容~!")
-async def _deal_broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = state["msg"]
+ state['msg'] = msg
+
[email protected]('msg', prompt='请告诉咱需要群发的内容~!')
+async def _deal_broadcast(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ msg = state['msg']
group_list = await bot.get_group_list()
succ_list = []
err_list = []
-
+
for group in group_list:
await asyncio.sleep(randint(0, 2))
try:
- await bot.send_group_msg(group_id=group["group_id"], message=msg)
+ await bot.send_group_msg(group_id=group["group_id"],
+ message=msg)
except BaseException:
err_list.append(group["group_id"])
-
+
msg0 = ""
for i in err_list:
msg0 += f" {i}\n"
-
+
repo_msg = (
f"推送消息:\n{msg}\n"
"————————\n"
diff --git a/ATRI/plugins/manage/modules/debug.py b/ATRI/plugins/manage/modules/debug.py
index 7bd3992..259e791 100644
--- a/ATRI/plugins/manage/modules/debug.py
+++ b/ATRI/plugins/manage/modules/debug.py
@@ -11,7 +11,7 @@ from ATRI.utils.ub_paste import paste
from ATRI.exceptions import load_error
-level_list = ["info", "warning", "error", "debug"]
+level_list = ['info', 'warning', 'error', 'debug']
__doc__ = """
@@ -25,47 +25,48 @@ __doc__ = """
get_console = sv.on_command(
cmd="获取log",
- aliases={"获取LOG", "获取控制台", "获取控制台信息"},
+ aliases={'获取LOG', '获取控制台', '获取控制台信息'},
docs=__doc__,
- permission=SUPERUSER,
+ permission=SUPERUSER
)
-
@get_console.handle()
async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
+ msg = str(event.message).split(' ')
if msg:
- state["level"] = msg
+ state['level'] = msg[0]
+ try:
+ state['line'] = msg[1]
+ except Exception:
+ pass
-
-@get_console.got("level", prompt="需要获取的等级是?")
+@get_console.got('level', prompt='需要获取的等级是?')
async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None:
- quit_list = ["算了", "罢了", "不了"]
- if state["level"] in quit_list:
- await get_console.finish("好吧...")
-
+ quit_list = ['算了', '罢了', '不了']
+ if state['level'] in quit_list:
+ await get_console.finish('好吧...')
-@get_console.got("line", prompt="需要获取的行数是?")
+@get_console.got('line', prompt='需要获取的行数是?')
async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None:
- level = state["level"]
- line = state["line"]
+ level = state['level']
+ line = state['line']
repo = str()
-
+
path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log"
- logs = await open_file(path, "readlines")
-
+ logs = await open_file(path, 'readlines')
+
try:
- content = logs[int(line) :] # type: ignore
+ content = logs[int(line):] # type: ignore
repo = "\n".join(content).replace("ATRI", "ATRI")
except IndexError:
- await get_console.finish(f"行数错误...max: {len(logs)}") # type: ignore
-
+ await get_console.finish(f'行数错误...max: {len(logs)}') # type: ignore
+
data = FormData()
- data.add_field("poster", "ATRI running log")
- data.add_field("syntax", "text")
- data.add_field("expiration", "day")
- data.add_field("content", repo)
-
+ data.add_field('poster', 'ATRI running log')
+ data.add_field('syntax', 'text')
+ data.add_field('expiration', 'day')
+ data.add_field('content', repo)
+
msg0 = f"> {event.sender.nickname}\n"
msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}"
await track_error.finish(msg0)
@@ -79,39 +80,41 @@ __doc__ = """
"""
track_error = sv.on_command(
- cmd="track", aliases={"追踪"}, docs=__doc__, permission=SUPERUSER
+ cmd="track",
+ aliases={'追踪'},
+ docs=__doc__,
+ permission=SUPERUSER
)
-
@track_error.args_parser # type: ignore
-async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _track_error_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ["算了", "罢了"]
+ cancel = ['算了', '罢了']
if msg in cancel:
- await track_error.finish("好吧...")
+ await track_error.finish('好吧...')
if not msg:
- await track_error.reject("欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ")
+ await track_error.reject('欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ')
else:
- state["track"] = msg
-
+ state['track'] = msg
@track_error.handle()
async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["track"] = msg
-
+ state['track'] = msg
-@track_error.got("track", prompt="欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ")
+@track_error.got('track', prompt='欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ')
async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None:
- track_id = state["track"]
+ track_id = state['track']
data = dict()
-
+
try:
data = load_error(track_id)
except BaseException:
- await track_error.finish("未发现对应ID呢...(⇀‸↼‶)")
-
+ await track_error.finish('未发现对应ID呢...(⇀‸↼‶)')
+
msg = (
f"ID: [{track_id}]\n"
f"Time: [{data['time']}]\n"
@@ -119,13 +122,13 @@ async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None:
"——————\n"
f"{data['content']}"
)
-
+
data = FormData()
- data.add_field("poster", track_id)
- data.add_field("syntax", "text")
- data.add_field("expiration", "day")
- data.add_field("content", msg)
-
+ data.add_field('poster', track_id)
+ data.add_field('syntax', 'text')
+ data.add_field('expiration', 'day')
+ data.add_field('content', msg)
+
msg0 = f"> {event.sender.nickname}\n"
msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}"
await track_error.finish(msg0)
diff --git a/ATRI/plugins/manage/modules/dormant.py b/ATRI/plugins/manage/modules/dormant.py
index cd72d47..d5ae321 100644
--- a/ATRI/plugins/manage/modules/dormant.py
+++ b/ATRI/plugins/manage/modules/dormant.py
@@ -13,10 +13,12 @@ __doc__ = """
"""
dormant_enabled = sv.on_command(
- cmd="休眠", docs=__doc__, rule=to_bot(), permission=SUPERUSER
+ cmd='休眠',
+ docs=__doc__,
+ rule=to_bot(),
+ permission=SUPERUSER
)
-
@dormant_enabled.handle()
async def _dormant_enabled(bot: Bot, event: MessageEvent) -> None:
sv.Dormant.control_dormant(True)
@@ -32,10 +34,12 @@ __doc__ = """
"""
dormant_disabled = sv.on_command(
- cmd="苏醒", docs=__doc__, rule=to_bot(), permission=SUPERUSER
+ cmd='苏醒',
+ docs=__doc__,
+ rule=to_bot(),
+ permission=SUPERUSER
)
-
@dormant_disabled.handle()
async def _dormant_disabled(bot: Bot, event: MessageEvent) -> None:
sv.Dormant.control_dormant(False)
diff --git a/ATRI/plugins/manage/modules/request.py b/ATRI/plugins/manage/modules/request.py
index 762e918..a09b721 100644
--- a/ATRI/plugins/manage/modules/request.py
+++ b/ATRI/plugins/manage/modules/request.py
@@ -9,7 +9,7 @@ from ATRI.service import Service as sv
from ATRI.exceptions import ReadFileError, FormatError
-ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential"
+ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
__doc__ = """
@@ -19,13 +19,16 @@ __doc__ = """
查看申请列表
"""
-req_list = sv.on_command(cmd="申请列表", docs=__doc__, permission=SUPERUSER)
-
+req_list = sv.on_command(
+ cmd="申请列表",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@req_list.handle()
async def _req_list(bot: Bot, event: MessageEvent) -> None:
- path_f = ESSENTIAL_DIR / "request_friend.json"
- path_g = ESSENTIAL_DIR / "request_group.json"
+ path_f = ESSENTIAL_DIR / 'request_friend.json'
+ path_g = ESSENTIAL_DIR / 'request_group.json'
data_f, data_g = dict()
try:
data_f = json.loads(path_f.read_bytes())
@@ -35,7 +38,7 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None:
data_g = json.loads(path_g.read_bytes())
except ReadFileError:
msg_g = "[读取文件失败]"
-
+
msg_f = str()
for i in data_f.keys():
msg_f += f"{i} | {data_f[i]['user_id']} | {data_f[i]['comment']}\n"
@@ -43,65 +46,82 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None:
msg_g = str()
for i in data_g.keys():
msg_g += f"{i} | {data_g[i]['sub_type']} | {data_g[i]['user_id']} | {data_g[i]['comment']}\n"
-
- msg = "好友/群申请列表如下:\n" "· 好友:\n" f"{msg_f}" "· 群:\n" f"{msg_g}"
+
+ msg = (
+ "好友/群申请列表如下:\n"
+ "· 好友:\n"
+ f"{msg_f}"
+ "· 群:\n"
+ f"{msg_g}"
+ )
await req_list.finish(msg)
-req_deal = sv.on_regex(r"[同意|拒绝][好友|群]申请", permission=SUPERUSER)
-
+req_deal = sv.on_regex(
+ r'(同意|拒绝)(好友|群)申请',
+ permission=SUPERUSER
+)
@req_deal.handle()
async def _req_deal(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(" ")
- arg = re.findall(r"[同意|拒绝][好友|群]申请", msg[0])
+ msg = str(event.message).split(' ')
+ arg = re.findall(r'(同意|拒绝)(好友|群)申请', msg[0])[0]
app = arg[0]
_type = arg[1]
-
+
if not msg[1]:
- await req_deal.finish(f"正确用法!速看\n{app}{_type}申请 (reqid)")
-
- path = ESSENTIAL_DIR / "request_group.json"
- data_g = dict()
- try:
- data_g = json.loads(path.read_bytes())
- except FileExistsError:
- await req_deal.finish("读取群数据失败,可能并没有请求...")
-
+ await req_deal.finish(f'正确用法!速看\n{app}{_type}申请 (reqid)')
+
reqid = msg[1]
if app == "同意":
if _type == "好友":
try:
- await bot.set_friend_add_request(flag=reqid, approve=True)
- await req_deal.finish("完成~!已同意申请")
+ await bot.set_friend_add_request(flag=reqid,
+ approve=True)
+ await req_deal.finish('完成~!已同意申请')
except FormatError:
- await req_deal.finish("请检查输入的值是否正确——!")
+ await req_deal.finish('请检查输入的值是否正确——!')
elif _type == "群":
+ path = ESSENTIAL_DIR / "request_group.json"
+ data_g = dict()
try:
- await bot.set_group_add_request(
- flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=True
- )
- await req_deal.finish("完成~!已同意申请")
+ data_g = json.loads(path.read_bytes())
+ except FileExistsError:
+ await req_deal.finish("读取群数据失败,可能并没有请求...")
+
+ try:
+ await bot.set_group_add_request(flag=reqid,
+ sub_type=data_g[reqid]['sub_type'],
+ approve=True)
+ await req_deal.finish('完成~!已同意申请')
except FormatError:
- await req_deal.finish("请检查输入的值是否正确——!")
+ await req_deal.finish('请检查输入的值是否正确——!')
else:
- await req_deal.finish("请检查输入的值是否正确——!")
+ await req_deal.finish('请检查输入的值是否正确——!')
elif app == "拒绝":
if _type == "好友":
try:
- await bot.set_friend_add_request(flag=reqid, approve=False)
- await req_deal.finish("完成~!已拒绝申请")
+ await bot.set_friend_add_request(flag=reqid,
+ approve=False)
+ await req_deal.finish('完成~!已拒绝申请')
except FormatError:
- await req_deal.finish("请检查输入的值是否正确——!")
+ await req_deal.finish('请检查输入的值是否正确——!')
elif _type == "群":
+ path = ESSENTIAL_DIR / "request_group.json"
+ data_g = dict()
+ try:
+ data_g = json.loads(path.read_bytes())
+ except FileExistsError:
+ await req_deal.finish("读取群数据失败,可能并没有请求...")
+
try:
- await bot.set_group_add_request(
- flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=False
- )
- await req_deal.finish("完成~!已拒绝申请")
+ await bot.set_group_add_request(flag=reqid,
+ sub_type=data_g[reqid]['sub_type'],
+ approve=False)
+ await req_deal.finish('完成~!已拒绝申请')
except FormatError:
- await req_deal.finish("请检查输入的值是否正确——!")
+ await req_deal.finish('请检查输入的值是否正确——!')
else:
- await req_deal.finish("请检查输入的值是否正确——!")
+ await req_deal.finish('请检查输入的值是否正确——!')
else:
- await req_deal.finish("请检查输入的值是否正确——!")
+ await req_deal.finish('请检查输入的值是否正确——!')
diff --git a/ATRI/plugins/manage/modules/service.py b/ATRI/plugins/manage/modules/service.py
index a3624df..6489ad6 100644
--- a/ATRI/plugins/manage/modules/service.py
+++ b/ATRI/plugins/manage/modules/service.py
@@ -5,7 +5,7 @@ from nonebot.adapters.cqhttp import (
Bot,
MessageEvent,
GroupMessageEvent,
- PrivateMessageEvent,
+ PrivateMessageEvent
)
from ATRI.service import Service as sv
@@ -21,40 +21,46 @@ __doc__ = """
"""
cur_service_ena = sv.on_command(
- cmd="启用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER
+ cmd="启用功能",
+ docs=__doc__,
+ permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER
)
-
@cur_service_ena.args_parser # type: ignore
-async def _cur_ena_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+async def _cur_ena_load(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "取消"]
+ quit_list = ['算了', '罢了', '取消']
if msg in quit_list:
- await cur_service_ena.finish("好吧...")
+ await cur_service_ena.finish('好吧...')
if not msg:
- await cur_service_ena.reject("请告诉咱目标命令!")
+ await cur_service_ena.reject('请告诉咱目标命令!')
else:
- state["service_e"] = msg
-
+ state['service_e'] = msg
@cur_service_ena.handle()
-async def _cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+async def _cur_ena(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["service_e"] = msg
+ state['service_e'] = msg
-
-@cur_service_ena.got("service_e", prompt="请告诉咱目标命令!")
-async def _deal_cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
- cmd = state["service_e"]
+@cur_service_ena.got('service_e', prompt='请告诉咱目标命令!')
+async def _deal_cur_ena(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
+ cmd = state['service_e']
group = str(event.group_id)
sv.control_service(cmd, False, True, group=group)
- await cur_service_ena.finish(f"成功!本群已启用:{cmd}")
-
+ await cur_service_ena.finish(f'成功!本群已启用:{cmd}')
@cur_service_ena.handle()
-async def _refuse_cur_ena(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None:
- await cur_service_ena.finish("只能在群聊中决定哦...")
+async def _refuse_cur_ena(bot: Bot,
+ event: PrivateMessageEvent,
+ state: T_State) -> None:
+ await cur_service_ena.finish('只能在群聊中决定哦...')
__doc__ = """
@@ -67,40 +73,46 @@ __doc__ = """
"""
cur_service_dis = sv.on_command(
- cmd="禁用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER
+ cmd="禁用功能",
+ docs=__doc__,
+ permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER
)
-
@cur_service_dis.args_parser # type: ignore
-async def _cur_dis_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+async def _cur_dis_load(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "取消"]
+ quit_list = ['算了', '罢了', '取消']
if msg in quit_list:
- await cur_service_dis.finish("好吧...")
+ await cur_service_dis.finish('好吧...')
if not msg:
- await cur_service_dis.reject("请告诉咱目标命令!")
+ await cur_service_dis.reject('请告诉咱目标命令!')
else:
- state["service_d"] = msg
-
+ state['service_d'] = msg
@cur_service_dis.handle()
-async def _cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+async def _cur_dis(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["service_d"] = msg
+ state['service_d'] = msg
-
-@cur_service_dis.got("service_d", prompt="请告诉咱目标命令!")
-async def _deal_cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
- cmd = state["service_d"]
+@cur_service_dis.got('service_d', prompt='请告诉咱目标命令!')
+async def _deal_cur_dis(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
+ cmd = state['service_d']
group = str(event.group_id)
sv.control_service(cmd, False, False, group=group)
- await cur_service_dis.finish(f"成功!本群已禁用:{cmd}")
-
+ await cur_service_dis.finish(f'成功!本群已禁用:{cmd}')
@cur_service_dis.handle()
-async def _refuse_cur_dis(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None:
- await cur_service_dis.finish("只能在群聊中决定哦...")
+async def _refuse_cur_dis(bot: Bot,
+ event: PrivateMessageEvent,
+ state: T_State) -> None:
+ await cur_service_dis.finish('只能在群聊中决定哦...')
__doc__ = """
@@ -112,33 +124,40 @@ __doc__ = """
全局启用 以图搜图
"""
-glo_service_ena = sv.on_command(cmd="全局启用", docs=__doc__, permission=SUPERUSER)
-
+glo_service_ena = sv.on_command(
+ cmd="全局启用",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@glo_service_ena.args_parser # type: ignore
-async def _glo_ena_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _glo_ena_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "取消"]
+ quit_list = ['算了', '罢了', '取消']
if msg in quit_list:
- await glo_service_ena.finish("好吧...")
+ await glo_service_ena.finish('好吧...')
if not msg:
- await glo_service_ena.reject("请告诉咱目标命令!")
+ await glo_service_ena.reject('请告诉咱目标命令!')
else:
- state["service_e_g"] = msg
-
+ state['service_e_g'] = msg
@glo_service_ena.handle()
-async def _glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _glo_ena(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["service_e_g"] = msg
-
+ state['service_e_g'] = msg
-@glo_service_ena.got("service_e_g", prompt="请告诉咱目标命令!")
-async def _deal_glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None:
- cmd = state["service_e_g"]
+@glo_service_ena.got('service_e_g', prompt='请告诉咱目标命令!')
+async def _deal_glo_ena(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ cmd = state['service_e_g']
sv.control_service(cmd, True, True)
- await glo_service_ena.finish(f"成功!已全局启用:{cmd}")
+ await glo_service_ena.finish(f'成功!已全局启用:{cmd}')
__doc__ = """
@@ -150,30 +169,37 @@ __doc__ = """
全局禁用 以图搜图
"""
-glo_service_dis = sv.on_command(cmd="全局禁用", docs=__doc__, permission=SUPERUSER)
-
+glo_service_dis = sv.on_command(
+ cmd="全局禁用",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@glo_service_dis.args_parser # type: ignore
-async def _glo_dis_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _glo_dis_load(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ["算了", "罢了", "取消"]
+ quit_list = ['算了', '罢了', '取消']
if msg in quit_list:
- await glo_service_dis.finish("好吧...")
+ await glo_service_dis.finish('好吧...')
if not msg:
- await glo_service_dis.reject("请告诉咱目标命令!")
+ await glo_service_dis.reject('请告诉咱目标命令!')
else:
- state["service_d_g"] = msg
-
+ state['service_d_g'] = msg
@glo_service_dis.handle()
-async def _glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _glo_dis(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["service_d_g"] = msg
-
+ state['service_d_g'] = msg
-@glo_service_dis.got("service_d_g", prompt="请告诉咱目标命令!")
-async def _deal_glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None:
- cmd = state["service_d_g"]
+@glo_service_dis.got('service_d_g', prompt='请告诉咱目标命令!')
+async def _deal_glo_dis(bot: Bot,
+ event: MessageEvent,
+ state: T_State) -> None:
+ cmd = state['service_d_g']
sv.control_service(cmd, True, False)
- await glo_service_dis.finish(f"成功!已全局禁用:{cmd}")
+ await glo_service_dis.finish(f'成功!已全局禁用:{cmd}')
diff --git a/ATRI/plugins/manage/modules/shutdown.py b/ATRI/plugins/manage/modules/shutdown.py
index 78f23e8..11b2b1b 100644
--- a/ATRI/plugins/manage/modules/shutdown.py
+++ b/ATRI/plugins/manage/modules/shutdown.py
@@ -12,8 +12,11 @@ __doc__ = """
@ 关机
"""
-shutdown = sv.on_command(cmd="关机", docs=__doc__, permission=SUPERUSER)
-
+shutdown = sv.on_command(
+ cmd="关机",
+ docs=__doc__,
+ permission=SUPERUSER
+)
@shutdown.handle()
async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
@@ -21,10 +24,9 @@ async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
if msg:
state["msg"] = msg
-
@shutdown.got("msg", prompt="[WARNING]此项操作将强行终止bot运行,是否继续(y/n)")
async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
- t = ["y", "Y", "是"]
+ t = ['y', 'Y', '是']
if state["msg"] in t:
await bot.send(event, "咱还会醒来的,一定")
exit(0)
diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py
index 076ce03..34ea61d 100644
--- a/ATRI/plugins/nsfw.py
+++ b/ATRI/plugins/nsfw.py
@@ -5,46 +5,43 @@ from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
from nonebot.typing import T_State
from ATRI.log import logger as log
-from ATRI.config import Config
+from ATRI.config import BotSelfConfig, NsfwCheck
from ATRI.service import Service as sv
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
from ATRI.rule import is_in_service
from ATRI.utils.request import get_bytes
from ATRI.utils.cqcode import coolq_code_check
-nsfw_url = f"http://{Config.NsfwCheck.host}:" f"{Config.NsfwCheck.port}/?url="
+nsfw_url = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url="
nsfw_checking = sv.on_message()
-
@nsfw_checking.handle()
async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None:
- if Config.NsfwCheck.enabled:
+ if NsfwCheck.enabled:
msg = str(event.message)
user = event.user_id
group = event.group_id
check = await coolq_code_check(msg, user, group)
-
+
if check:
if "image" not in msg:
return
-
+
url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0]
try:
data = json.loads(await get_bytes(url))
except:
- log.warning("检测涩图失败,请查阅文档以获取帮助")
+ log.warning('检测涩图失败,请查阅文档以获取帮助')
return
- if round(data["score"], 4) > Config.NsfwCheck.passing_rate:
- score = "{:.2%}".format(round(data["score"], 4))
- log.debug(f"截获涩图,得分:{score}")
- await bot.send(event, f"好涩哦!涩值:{score}\n不行了咱要发给主人看!")
- for sup in Config.BotSelfConfig.superusers:
- await bot.send_private_msg(
- user_id=sup, message=f"{msg}\n涩值: {score}"
- )
+ if round(data['score'], 4)*100 >= NsfwCheck.passing_rate:
+ score = "{:.2%}".format(round(data['score'], 4))
+ log.debug(f'截获涩图,得分:{score}')
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=f"{msg}\n涩值: {score}")
+ await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!')
else:
pass
@@ -60,57 +57,60 @@ __doc__ = """
/nsfw 然后Bot会向你索取图片
"""
-nsfw_reading = sv.on_command(cmd="/nsfw", docs=__doc__, rule=is_in_service("nsfw"))
-
+nsfw_reading = sv.on_command(
+ cmd="/nsfw",
+ docs=__doc__,
+ rule=is_in_service('nsfw')
+)
@nsfw_reading.args_parser # type: ignore
async def _nsfw(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message)
- quit_list = ["算了", "罢了", "不搜了"]
+ quit_list = ['算了', '罢了', '不搜了']
if msg in quit_list:
- await nsfw_reading.finish("好吧")
-
+ await nsfw_reading.finish('好吧')
+
if not msg:
- await nsfw_reading.reject("图呢?")
+ await nsfw_reading.reject('图呢?')
else:
- state["pic_nsfw"] = msg
-
+ state['pic_nsfw'] = msg
@nsfw_reading.handle()
-async def _nsfw_r(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+async def _nsfw_r(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
user = event.user_id
group = event.group_id
msg = str(event.message).strip()
check = await coolq_code_check(msg, user, group)
if check and msg:
- state["pic_nsfw"] = msg
-
+ state['pic_nsfw'] = msg
-@nsfw_reading.got("pic_nsfw", prompt="图呢?")
-async def _nsfw_reading(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
- msg = state["pic_nsfw"]
+@nsfw_reading.got('pic_nsfw', prompt='图呢?')
+async def _nsfw_reading(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
+ msg = state['pic_nsfw']
pic = re.findall(r"url=(.*?)]", msg)
if not pic:
- await nsfw_reading.reject("请发送图片而不是其它东西!!")
-
+ await nsfw_reading.reject('请发送图片而不是其它东西!!')
+
url = nsfw_url + pic[0]
try:
data = json.loads(await get_bytes(url))
- except RequestTimeOut:
- raise RequestTimeOut("Time out!")
-
- score = round(data["score"], 4)
- result = "{:.2%}".format(round(data["score"], 4))
- if score > 0.9:
+ except RequestError:
+ raise RequestError('Time out!')
+
+ score = round(data['score'], 4)
+ result = "{:.2%}".format(round(data['score'], 4))
+ if score >= 0.9:
level = "hso! 我要发给主人看!"
- for sup in Config.BotSelfConfig.superusers:
- await bot.send_private_msg(
- user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}"
- )
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}")
elif 0.9 > score >= 0.6:
level = "嗯,可冲"
else:
level = "?能不能换张55完全冲不起来"
-
+
repo = f"涩值:{result}\n{level}"
await nsfw_reading.finish(repo)
diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py
index 14b9534..ce5d422 100644
--- a/ATRI/plugins/rich/__init__.py
+++ b/ATRI/plugins/rich/__init__.py
@@ -8,7 +8,7 @@ from nonebot.adapters.cqhttp.message import MessageSegment
from ATRI.service import Service as sv
from ATRI.utils.request import get_bytes
-from ATRI.utils.list import count_list, del_list_aim
+from ATRI.utils.limit import is_too_exciting
from .data_source import dec
@@ -16,23 +16,22 @@ from .data_source import dec
temp_list = []
img_url = [
"https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/fkrich.png",
- "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg",
+ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg"
]
bilibili_rich = sv.on_message()
-
@bilibili_rich.handle()
async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
global temp_list
try:
msg = str(event.raw_message).replace("\\", "")
bv = False
-
+
if "qqdocurl" not in msg:
if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace("av", "")
+ av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
else:
bv = re.findall(r"(BV\w+)", msg)
av = str(dec(bv[0]))
@@ -44,29 +43,29 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
async with session.get(url=bv_url) as r:
bv = re.findall(r"(BV\w+)", str(r.url))
av = dec(bv[0])
-
+
if not bv:
if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace("av", "")
+ av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
else:
return
-
- if count_list(temp_list, av) == 4:
- await bot.send(event, "你是怕别人看不到么发这么多次?")
- temp_list = del_list_aim(temp_list, av)
+
+ user = event.user_id
+ check = is_too_exciting(user, 1, 10)
+ if not check:
return
-
- temp_list.append(av)
-
+
URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}"
- data = json.loads(await get_bytes(URL))["data"]
+ data = json.loads(await get_bytes(URL))['data']
repo = (
f"{data['bvid']} INFO:\n"
f"Title: {data['title']}\n"
f"Link: {data['short_link']}\n"
"にまねげぴのTencent rich!"
)
- await bot.send(event, MessageSegment.image(file=choice(img_url)))
+ await bot.send(
+ event,
+ MessageSegment.image(file=choice(img_url)))
await bilibili_rich.finish(repo)
except BaseException:
return
diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py
index 59474ff..32ac219 100644
--- a/ATRI/plugins/rich/data_source.py
+++ b/ATRI/plugins/rich/data_source.py
@@ -1,4 +1,4 @@
-table = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF"
+table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
tr = {}
for i in range(58):
tr[table[i]] = i
@@ -10,13 +10,13 @@ add = 8728348608
def dec(x) -> int:
r = 0
for i in range(6):
- r += tr[x[s[i]]] * 58 ** i
+ r += tr[x[s[i]]] * 58**i
return (r - add) ^ xor
def enc(x) -> str:
x = (x ^ xor) + add
- r = list("BV1 4 1 7 ")
+ r = list('BV1 4 1 7 ')
for i in range(6):
- r[s[i]] = table[x // 58 ** i % 58]
- return "".join(r)
+ r[s[i]] = table[x // 58**i % 58]
+ return ''.join(r)
diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py
index b4b1497..b1ab9ab 100644
--- a/ATRI/plugins/saucenao/__init__.py
+++ b/ATRI/plugins/saucenao/__init__.py
@@ -6,10 +6,10 @@ from nonebot.adapters.cqhttp import Bot, MessageEvent
from nonebot.adapters.cqhttp.message import Message, MessageSegment
from nonebot.typing import T_State
-from ATRI.config import Config
+from ATRI.config import SauceNAO
from ATRI.service import Service as sv
from ATRI.rule import is_in_service
-from ATRI.exceptions import RequestTimeOut
+from ATRI.exceptions import RequestError
from .data_source import SauceNao
@@ -21,53 +21,55 @@ __doc__ = """
以图搜图 (pic)
"""
-saucenao = sv.on_command(cmd="以图搜图", docs=__doc__, rule=is_in_service("以图搜图"))
-
+saucenao = sv.on_command(
+ cmd='以图搜图',
+ docs=__doc__,
+ rule=is_in_service('以图搜图')
+)
@saucenao.args_parser # type: ignore
-async def _load_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _load_saucenao(bot: Bot, event: MessageEvent,
+ state: T_State) -> None:
msg = str(event.message)
- quit_list = ["算了", "罢了", "不搜了"]
+ quit_list = ['算了', '罢了', '不搜了']
if msg in quit_list:
- await saucenao.finish("好吧...")
-
+ await saucenao.finish('好吧...')
+
if not msg:
- await saucenao.reject("图呢?")
+ await saucenao.reject('图呢?')
else:
- state["pic_sau"] = msg
-
+ state['pic_sau'] = msg
@saucenao.handle()
async def _sauce_nao(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state["pic_sau"] = msg
+ state['pic_sau'] = msg
-
[email protected]("pic_sau", prompt="图呢?")
[email protected]('pic_sau', prompt='图呢?')
async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = state["pic_sau"]
- img = re.findall(r"url=(.*?)]", msg)
+ msg = state['pic_sau']
+ img = re.findall(r'url=(.*?)]', msg)
if not img:
- await saucenao.finish("请发送图片而不是其他东西!!")
-
+ await saucenao.finish('请发送图片而不是其他东西!!')
+
try:
- task = SauceNao(api_key=Config.SauceNAO.key)
+ task = SauceNao(api_key=SauceNAO.key)
data = json.loads(await task.search(img[0]))
- except RequestTimeOut:
- raise RequestTimeOut("Request failed!")
-
- res = data["results"]
+ except RequestError:
+ raise RequestError('Request failed!')
+
+ res = data['results']
result = list()
for i in range(0, 3):
data = res[i]
-
+
_result = dict()
- _result["similarity"] = data["header"]["similarity"]
- _result["index_name"] = data["header"]["index_name"]
- _result["url"] = choice(data["data"].get("ext_urls", ["None"]))
+ _result['similarity'] = data['header']['similarity']
+ _result['index_name'] = data['header']['index_name']
+ _result['url'] = choice(data['data'].get('ext_urls', ['None']))
result.append(_result)
-
+
msg0 = f"> {MessageSegment.at(event.user_id)}"
for i in result:
msg0 = msg0 + (
@@ -76,5 +78,5 @@ async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
f"Name: {i['index_name']}\n"
f"URL: {i['url'].replace('https://', '')}"
)
-
+
await saucenao.finish(Message(msg0))
diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py
index efe8fe1..cd88554 100644
--- a/ATRI/plugins/saucenao/data_source.py
+++ b/ATRI/plugins/saucenao/data_source.py
@@ -5,19 +5,23 @@ URL = "https://saucenao.com/search.php"
class SauceNao:
- def __init__(
- self, api_key: str, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5
- ) -> None:
+ def __init__(self,
+ api_key: str,
+ output_type=2,
+ testmode=1,
+ dbmaski=32768,
+ db=5,
+ numres=5) -> None:
params = dict()
- params["api_key"] = api_key
- params["output_type"] = output_type
- params["testmode"] = testmode
- params["dbmaski"] = dbmaski
- params["db"] = db
- params["numres"] = numres
+ params['api_key'] = api_key
+ params['output_type'] = output_type
+ params['testmode'] = testmode
+ params['dbmaski'] = dbmaski
+ params['db'] = db
+ params['numres'] = numres
self.params = params
-
+
async def search(self, url: str):
- self.params["url"] = url
+ self.params['url'] = url
res = await post_bytes(url=URL, params=self.params)
return res
diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py
new file mode 100644
index 0000000..d9f1ffe
--- /dev/null
+++ b/ATRI/plugins/setu/__init__.py
@@ -0,0 +1,8 @@
+import nonebot
+from pathlib import Path
+
+
+_sub_plugins = set()
+
+_sub_plugins |= nonebot.load_plugins(
+ str((Path(__file__).parent / 'modules').resolve()))
diff --git a/ATRI/plugins/setu/modules/data_source.py b/ATRI/plugins/setu/modules/data_source.py
new file mode 100644
index 0000000..72e2270
--- /dev/null
+++ b/ATRI/plugins/setu/modules/data_source.py
@@ -0,0 +1,178 @@
+import os
+import json
+import string
+import aiosqlite
+from aiosqlite.core import Connection
+from pathlib import Path
+from random import sample, choice
+from aiohttp import ClientSession
+from nonebot.adapters.cqhttp.message import MessageSegment, Message
+
+from ATRI.log import logger as log
+from ATRI.config import NsfwCheck
+from ATRI.exceptions import RequestError, WriteError
+from ATRI.utils.request import get_bytes
+from ATRI.utils.img import compress_image
+
+
+TEMP_DIR: Path = Path('.') / 'ATRI' / 'data' / 'temp' / 'setu'
+SETU_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'setu'
+os.makedirs(TEMP_DIR, exist_ok=True)
+os.makedirs(SETU_DIR, exist_ok=True)
+NSFW_URL = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url="
+SIZE_REDUCE: bool = True
+
+
+class Hso:
+ @staticmethod
+ async def nsfw_check(url: str) -> float:
+ url = NSFW_URL + url
+ try:
+ data = json.loads(await get_bytes(url))
+ except RequestError:
+ raise RequestError('Request failed!')
+ return round(data['score'], 4)
+
+ @staticmethod
+ async def _comp_setu(url: str) -> str:
+ temp_id = ''.join(sample(string.ascii_letters + string.digits, 8))
+ file = TEMP_DIR / f'{temp_id}.png'
+
+ try:
+ async with ClientSession() as session:
+ async with session.get(url) as r:
+ data = await r.read()
+ except RequestError:
+ raise RequestError('Request img failed!')
+
+ try:
+ with open(file, 'wb') as r:
+ r.write(data)
+ except WriteError:
+ raise WriteError('Writing img failed!')
+
+ return compress_image(os.path.abspath(file))
+
+ @classmethod
+ async def setu(cls, data: dict) -> str:
+ pid = data['pid']
+ title = data['title']
+ if SIZE_REDUCE:
+ img = MessageSegment.image(
+ 'file:///' + await cls._comp_setu(data['url']), proxy=False)
+ else:
+ img = MessageSegment.image(data['url'], proxy=False)
+
+ msg = (
+ f"Pid: {pid}\n"
+ f"Title: {title}\n"
+ f"{img}"
+ )
+ return msg
+
+ @classmethod
+ async def acc_setu(cls, d: list) -> str:
+ data: dict = choice(d)
+
+ for i in data['tags']:
+ if i['name'] == "R-18":
+ return "太涩了不方便发w"
+
+ pid = data['id']
+ title = data['title']
+ try:
+ pic = data['meta_single_page']['original_image_url'] \
+ .replace('pximg.net', 'pixiv.cat')
+ except Exception:
+ pic = choice(data['meta_pages'])['original']['image_urls'] \
+ .replace('pximg.net', 'pixiv.cat')
+ if SIZE_REDUCE:
+ img = MessageSegment.image(
+ 'file:///' + await cls._comp_setu(pic), proxy=False)
+ else:
+ img = MessageSegment.image(pic, proxy=False)
+
+ msg = (
+ f"Pid: {pid}\n"
+ f"Title: {title}\n"
+ f"{img}"
+ )
+ return msg
+
+
+class SetuData:
+ SETU_DATA = SETU_DIR / 'setu.db'
+
+ @classmethod
+ async def _check_database(cls) -> bool:
+ if not cls.SETU_DATA.exists():
+ log.warning(f'未发现数据库\n-> {cls.SETU_DATA}\n将开始创建')
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ cur = await db.cursor()
+ await cur.execute(
+ """
+ CREATE TABLE setu(
+ pid PID, title TITLE, tags TAGS,
+ user_id USER_ID, user_name USER_NAME,
+ user_account USER_ACCOUNT, url URL,
+ UNIQUE(
+ pid, title, tags, user_id,
+ user_name, user_account, url
+ )
+ );
+ """
+ )
+ await db.commit()
+ log.warning(f'...创建数据库\n-> {cls.SETU_DATA}\n完成!')
+ return True
+ return True
+
+ @classmethod
+ async def add_data(cls, d: dict) -> None:
+ data = (
+ d['pid'], d['title'], d['tags'], d['user_id'],
+ d['user_name'], d['user_account'], d['url']
+ )
+
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ await db.execute(
+ """
+ INSERT INTO setu(
+ pid, title, tags, user_id,
+ user_name, user_account, url
+ ) VALUES(
+ ?, ?, ?, ?, ?, ?, ?
+ );
+ """,
+ data
+ )
+ await db.commit()
+
+ @classmethod
+ async def del_data(cls, pid: int) -> None:
+ if not isinstance(pid, int): # 防注入
+ raise ValueError('Please provide int.')
+
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ await db.execute(f"DELETE FROM setu WHERE pid = {str(pid)};")
+ await db.commit()
+
+ @classmethod
+ async def count(cls):
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ async with db.execute("SELECT * FROM setu") as cursor:
+ return len(await cursor.fetchall()) # type: ignore
+
+ @classmethod
+ async def get_setu(cls):
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ async with db.execute("SELECT * FROM setu ORDER BY RANDOM() limit 1;") as cursor:
+ return await cursor.fetchall()
diff --git a/ATRI/plugins/setu/modules/main_setu.py b/ATRI/plugins/setu/modules/main_setu.py
new file mode 100644
index 0000000..649cd6a
--- /dev/null
+++ b/ATRI/plugins/setu/modules/main_setu.py
@@ -0,0 +1,193 @@
+import re
+import json
+from random import choice, random
+
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.cqhttp.message import Message
+
+from ATRI.service import Service as sv
+from ATRI.rule import is_in_service
+from ATRI.utils.request import get_bytes, post_bytes
+from ATRI.utils.limit import is_too_exciting
+from ATRI.config import Setu, BotSelfConfig
+from ATRI.exceptions import RequestError
+
+from .data_source import Hso, SIZE_REDUCE, SetuData
+
+
+LOLICON_URL: str = "https://api.lolicon.app/setu/"
+PIXIV_URL:str = "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word="
+R18_ENABLED: int = 0
+USE_LOCAL_DATA: bool = False
+MIX_LOCAL_DATA: bool = False
+
+
+setu = sv.on_regex(r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]",
+ rule=is_in_service('setu'))
+
+async def _setu(bot: Bot, event: MessageEvent) -> None:
+ user = event.user_id
+ check = is_too_exciting(user, 3, hours=1)
+ if not check:
+ return
+
+ await bot.send(event, "别急,在找了!")
+ params = {
+ "apikey": Setu.key,
+ "r18": str(R18_ENABLED),
+ "size1200": "true"
+ }
+ try:
+ data = json.loads(await post_bytes(LOLICON_URL, params))['data'][0]
+ except RequestError:
+ raise RequestError('Request failed!')
+
+ check = await Hso.nsfw_check(data['url'])
+ score = "{:.2%}".format(check, 4)
+
+ if not MIX_LOCAL_DATA:
+ if USE_LOCAL_DATA:
+ data = (await SetuData.get_setu())[0] # type: ignore
+ data = {
+ "pid": data[0],
+ "title": data[1],
+ "url": data[6]
+ }
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ if check >= 0.9:
+ if random() <= 0.2:
+ repo = (
+ "我找到图了,但我发给主人了❤\n"
+ f"涩值:{score}"
+ )
+ await bot.send(event, repo)
+ msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ if random() <= 0.5:
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ data = (await SetuData.get_setu())[0] # type: ignore
+ data = {
+ "pid": data[0],
+ "title": data[1],
+ "url": data[6]
+ }
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+
+
+key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service('setu'))
+
+@key_setu.handle()
+async def _key_setu(bot: Bot, event: MessageEvent) -> None:
+ user = event.user_id
+ check = is_too_exciting(user, 10, hours=1)
+ if not check:
+ await setu.finish('休息一下吧❤')
+
+ await bot.send(event, "别急,在找了!")
+ msg = str(event.message).strip()
+ tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0]
+ URL = PIXIV_URL + tag
+
+ try:
+ data = json.loads(await get_bytes(URL))['illusts']
+ except RequestError:
+ raise RequestError('Request msg failed!')
+
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.acc_setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.acc_setu(data)))
+
+
+setu_config = sv.on_command(cmd='涩图设置', permission=SUPERUSER)
+
+@setu_config.handle()
+async def _setu_config(bot: Bot, event: MessageEvent) -> None:
+ global R18_ENABLED, SIZE_REDUCE, USE_LOCAL_DATA, MIX_LOCAL_DATA
+ msg = str(event.message).split(' ')
+ if msg[0] == "":
+ repo = (
+ "可用设置如下:\n"
+ "启用/禁用r18\n"
+ "启用/禁用压缩\n"
+ "启用/禁用本地涩图\n"
+ "启用/禁用混合本地涩图"
+ )
+ await setu_config.finish(repo)
+ elif msg[0] == "启用r18":
+ R18_ENABLED = 1
+ await setu_config.finish('已启用r18')
+ elif msg[0] == "禁用r18":
+ R18_ENABLED = 0
+ await setu_config.finish('已禁用r18')
+ elif msg[0] == "启用压缩":
+ SIZE_REDUCE = True
+ await setu_config.finish('已启用图片压缩')
+ elif msg[0] == "禁用压缩":
+ SIZE_REDUCE = False
+ await setu_config.finish('已禁用图片压缩')
+ elif msg[0] == "启用本地涩图":
+ USE_LOCAL_DATA = True
+ await setu_config.finish('已启用本地涩图')
+ elif msg[0] == "禁用本地涩图":
+ USE_LOCAL_DATA = False
+ await setu_config.finish('已禁用本地涩图')
+ elif msg[0] == "启用混合本地涩图":
+ MIX_LOCAL_DATA = True
+ await setu_config.finish('启用混合本地涩图')
+ elif msg[0] == "禁用混合本地涩图":
+ MIX_LOCAL_DATA = False
+ await setu_config.finish('禁用混合本地涩图')
+ else:
+ await setu_config.finish('阿!请检查拼写')
+
+
+not_get_se = sv.on_command("不够涩")
+
+@not_get_se.handle()
+async def _not_se(bot: Bot, event: MessageEvent) -> None:
+ user = event.user_id
+ check = is_too_exciting(user, 1, 120)
+ if check:
+ msg = choice([
+ "那你来发",
+ "那你来发❤"
+ ])
+ await not_get_se.finish(msg)
diff --git a/ATRI/plugins/setu/modules/scheduler.py b/ATRI/plugins/setu/modules/scheduler.py
new file mode 100644
index 0000000..3030881
--- /dev/null
+++ b/ATRI/plugins/setu/modules/scheduler.py
@@ -0,0 +1,19 @@
+import shutil
+from ATRI.log import logger as log
+from ATRI.utils.apscheduler import scheduler
+
+from .data_source import TEMP_DIR
+
+
+ 'interval',
+ days=7,
+ misfire_grace_time=10
+)
+async def clear_temp():
+ log.info('正在清除涩图缓存')
+ try:
+ shutil.rmtree(TEMP_DIR)
+ log.info('清除缓存成功!')
+ except Exception:
+ log.warn('清除图片缓存失败!')
diff --git a/ATRI/plugins/setu/modules/store.py b/ATRI/plugins/setu/modules/store.py
new file mode 100644
index 0000000..6f383f4
--- /dev/null
+++ b/ATRI/plugins/setu/modules/store.py
@@ -0,0 +1,138 @@
+import json
+from random import choice
+
+from nonebot.typing import T_State
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.cqhttp.message import Message, MessageSegment
+
+from ATRI.service import Service as sv
+from ATRI.utils.request import get_bytes
+from ATRI.exceptions import RequestError
+
+from .data_source import SetuData
+
+
+API_URL: str = "https://api.kyomotoi.moe/api/pixiv/illust?id="
+
+
+__doc__ = """
+为本地添加涩图!
+权限组:维护者
+用法:
+ 添加涩图 (pid)
+补充:
+ pid: Pixiv 作品id
+"""
+
+
+add_setu = sv.on_command(
+ cmd="添加涩图",
+ docs=__doc__,
+ permission=SUPERUSER
+)
+
+@add_setu.args_parser # type: ignore
+async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ cancel = ['算了', '罢了']
+ if msg in cancel:
+ await add_setu.finish('好吧...')
+ if not msg:
+ await add_setu.reject('涩图(pid)速发!')
+ else:
+ state['setu_add'] = msg
+
+@add_setu.handle()
+async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ if msg:
+ state['setu_add'] = msg
+
+@add_setu.got('setu_add', prompt='涩图(pid)速发!')
+async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ pid = state['setu_add']
+
+ URL = API_URL + pid
+ try:
+ data = json.loads(await get_bytes(URL))['illust']
+ except RequestError:
+ raise RequestError('Request failed!')
+
+ try:
+ pic = data['meta_single_page']['original_image_url'] \
+ .replace('pximg.net', 'pixiv.cat')
+ except Exception:
+ pic = choice(data['meta_pages'])['image_urls']['original'] \
+ .replace('pximg.net', 'pixiv.cat')
+
+ d = {
+ "pid": pid,
+ "title": data['title'],
+ "tags": str(data['tags']),
+ "user_id": data['user']['id'],
+ "user_name": data['user']['name'],
+ "user_account": data['user']['account'],
+ "url": pic
+ }
+ await SetuData.add_data(d)
+
+ show_img = data['image_urls']['medium'].replace('pximg.net', 'pixiv.cat')
+ msg = (
+ "好欸!是新涩图:\n"
+ f"Pid: {pid}\n"
+ f"Title: {data['title']}\n"
+ f"{MessageSegment.image(show_img)}"
+ )
+ await add_setu.finish(Message(msg))
+
+
+__doc__ = """
+删除涩图!
+权限组:维护者
+用法:
+ 删除涩图 (pid)
+补充:
+ pid: Pixiv 作品id
+"""
+
+
+del_setu = sv.on_command(
+ cmd="删除涩图",
+ docs=__doc__,
+ permission=SUPERUSER
+)
+
+@del_setu.args_parser # type: ignore
+async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ cancel = ['算了', '罢了']
+ if msg in cancel:
+ await add_setu.finish('好吧...')
+ if not msg:
+ await add_setu.reject('涩图(pid)速发!')
+ else:
+ state['setu_del'] = msg
+
+@del_setu.handle()
+async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ if msg:
+ state['setu_del'] = msg
+
+@del_setu.got('setu_del', prompt='涩图(pid)速发!')
+async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ pid = int(state['setu_del'])
+ await SetuData.del_data(pid)
+ await del_setu.finish(f'涩图({pid})已删除...')
+
+
+count_setu = sv.on_command(
+ cmd="涩图总量",
+ permission=SUPERUSER
+)
+
+@count_setu.handle()
+async def _count_setu(bot: Bot, event: MessageEvent) -> None:
+ msg = f"咱本地搭载了 {await SetuData.count()} 张涩图!"
+ await count_setu.finish(msg)
diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py
index bd5d9c4..efdd412 100644
--- a/ATRI/plugins/status.py
+++ b/ATRI/plugins/status.py
@@ -1,4 +1,6 @@
+import time
import psutil
+from datetime import datetime
from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.log import logger as log
@@ -6,7 +8,7 @@ from ATRI.service import Service as sv
from ATRI.rule import is_in_service
from ATRI.exceptions import GetStatusError
from ATRI.utils.apscheduler import scheduler
-from ATRI.config import Config
+from ATRI.config import BotSelfConfig
__doc__ = """
@@ -16,8 +18,10 @@ __doc__ = """
/ping
"""
-ping = sv.on_command(cmd="/ping", docs="测试机器人", rule=is_in_service("ping"))
-
+ping = sv.on_command(
+ cmd="/ping",
+ docs="测试机器人",
+ rule=is_in_service('ping'))
@ping.handle()
async def _ping(bot: Bot, event: MessageEvent) -> None:
@@ -31,8 +35,11 @@ __doc__ = """
/status
"""
-status = sv.on_command(cmd="/status", docs=__doc__, rule=is_in_service("status"))
-
+status = sv.on_command(
+ cmd="/status",
+ docs=__doc__,
+ rule=is_in_service('status')
+)
@status.handle()
async def _status(bot: Bot, event: MessageEvent) -> None:
@@ -40,22 +47,28 @@ async def _status(bot: Bot, event: MessageEvent) -> None:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
- inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
- inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+ inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
+ inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+ now = time.time()
+ boot = psutil.boot_time()
+ up_time = str(
+ datetime.utcfromtimestamp(now).replace(microsecond=0)
+ - datetime.utcfromtimestamp(boot).replace(microsecond=0)
+ )
except GetStatusError:
raise GetStatusError("Failed to get status.")
-
+
msg = "アトリは、高性能ですから!"
-
- if cpu > 80: # type: ignore
+
+ if cpu > 90: # type: ignore
msg = "咱感觉有些头晕..."
- if mem > 80:
+ if mem > 90:
msg = "咱感觉有点头晕并且有点累..."
- elif mem > 80:
+ elif mem > 90:
msg = "咱感觉有点累..."
- elif disk > 80:
+ elif disk > 90:
msg = "咱感觉身体要被塞满了..."
-
+
msg0 = (
"Self status:\n"
f"* CPU: {cpu}%\n"
@@ -63,36 +76,41 @@ async def _status(bot: Bot, event: MessageEvent) -> None:
f"* DISK: {disk}%\n"
f"* netSENT: {inteSENT}MB\n"
f"* netRECV: {inteRECV}MB\n"
+ f"* Runtime: {up_time}\n"
) + msg
-
+
await status.finish(msg0)
[email protected]_job("interval", minutes=5, misfire_grace_time=10)
+ 'interval',
+ minutes=5,
+ misfire_grace_time=10
+)
async def _():
log.info("开始自检")
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
- inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
- inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+ inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
+ inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
except GetStatusError:
raise GetStatusError("Failed to get status.")
-
- msg = ""
- if cpu > 80: # type: ignore
+
+ msg = str()
+ if cpu > 90: # type: ignore
msg = "咱感觉有些头晕..."
- if mem > 80:
+ if mem > 90:
msg = "咱感觉有点头晕并且有点累..."
- elif mem > 80:
+ elif mem > 90:
msg = "咱感觉有点累..."
- elif disk > 80:
+ elif disk > 90:
msg = "咱感觉身体要被塞满了..."
else:
log.info("运作正常")
return
-
+
msg0 = (
"Self status:\n"
f"* CPU: {cpu}%\n"
@@ -101,6 +119,9 @@ async def _():
f"* netSENT: {inteSENT}MB\n"
f"* netRECV: {inteRECV}MB\n"
) + msg
-
- for sup in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(user_id=sup, message=msg0)
+
+ for sup in BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=sup,
+ message=msg0
+ )
diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py
index ff28e59..fa59690 100644
--- a/ATRI/plugins/utils/__init__.py
+++ b/ATRI/plugins/utils/__init__.py
@@ -1,9 +1,12 @@
import re
+from random import random
+
+from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import Service as sv
from ATRI.rule import is_in_service
-from .data_source import roll_dice, Encrypt
+from .data_source import roll_dice, Encrypt, Yinglish
__doc__ = """
@@ -17,24 +20,37 @@ roll一下
/roll 1d10+10d9+4d5+2d3
"""
-roll = sv.on_command(cmd="/roll", docs=__doc__, rule=is_in_service("roll"))
-
+roll = sv.on_command(
+ cmd="/roll",
+ docs=__doc__,
+ rule=is_in_service('roll')
+)
+
[email protected]_parser # type: ignore
+async def _load_roll(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ quit_list = ['算了', '罢了', '取消']
+ if msg in quit_list:
+ await roll.finish('好吧...')
+ if not msg:
+ await roll.reject('点呢?(1d10+...)')
+ else:
+ state['resu'] = msg
@roll.handle()
-async def _roll(bot: Bot, event: MessageEvent, state: dict) -> None:
+async def _roll(bot: Bot, event: MessageEvent, state: T_State) -> None:
args = str(event.message).strip()
if args:
- state["resu"] = args
-
+ state['resu'] = args
@roll.got("resu", prompt="roll 参数不能为空~!\ndemo:1d10 或 2d10+2d10")
-async def _(bot: Bot, event: MessageEvent, state: dict) -> None:
- resu = state["resu"]
- match = re.match(r"^([\dd+\s]+?)$", resu)
-
+async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ resu = state['resu']
+ match = re.match(r'^([\dd+\s]+?)$', resu)
+
if not match:
await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10")
-
+
await roll.finish(roll_dice(resu))
@@ -50,19 +66,60 @@ __doc__ = """
/enc e アトリは高性能ですから!
"""
-encrypt = sv.on_command(cmd="/enc", docs=__doc__, rule=is_in_service("enc"))
-
+encrypt = sv.on_command(
+ cmd="/enc",
+ docs=__doc__,
+ rule=is_in_service('enc')
+)
@encrypt.handle()
async def _encrypt(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(" ")
+ msg = str(event.message).split(' ')
_type = msg[0]
s = msg[1]
e = Encrypt()
-
+
if _type == "e":
await encrypt.finish(e.encode(s))
elif _type == "d":
await encrypt.finish(e.decode(s))
else:
- await encrypt.finish("请检查输入~!")
+ await encrypt.finish('请检查输入~!')
+
+
+__doc__ = """
+涩批一下!
+权限组:所有人
+用法:
+ 涩批一下 (msg)
+"""
+
+sepi = sv.on_command(
+ cmd="涩批一下",
+ docs=__doc__,
+ rule=is_in_service('涩批一下')
+)
+
+async def _load_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ quit_list = ['算了', '罢了', '取消']
+ if msg in quit_list:
+ await sepi.finish('好吧...')
+ if not msg:
+ await sepi.reject('话呢?')
+ else:
+ state['sepi_msg'] = msg
+
+async def _sepi(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ if msg:
+ state['sepi_msg'] = msg
+
[email protected]('sepi_msg', prompt='话呢?')
+async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = state['sepi_msg']
+ if len(msg) < 4:
+ await sepi.finish('这么短?涩不起来!')
+ await sepi.finish(Yinglish.deal(msg, random())) \ No newline at end of file
diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py
index 4ef46a3..98b2b37 100644
--- a/ATRI/plugins/utils/data_source.py
+++ b/ATRI/plugins/utils/data_source.py
@@ -1,46 +1,47 @@
import re
-import random
from math import floor
-from typing import Union
+import jieba.posseg as pseg
+from typing import Union, Optional
+from random import random, choice, randint
def roll_dice(par: str) -> str:
result = 0
- proc = ""
+ proc = ''
proc_list = []
p = par.split("+")
-
+
for i in p:
args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i)
args = list(args[0])
-
+
args[0] = args[0] or 1
if int(args[0]) >= 5000 or int(args[2]) >= 5000:
return "阿...好大......"
-
+
for a in range(1, int(args[0]) + 1):
- rd = random.randint(1, int(args[2]))
+ rd = randint(1, int(args[2]))
result = result + rd
-
+
if len(proc_list) <= 10:
proc_list.append(rd)
-
+
if len(proc_list) <= 10:
proc += "+".join(map(str, proc_list))
elif len(proc_list) > 10:
proc += "太长了不展示了就酱w"
else:
proc += str(result)
-
+
result = f"{par}=({proc})={result}"
return result
class Encrypt:
- cr = "ĀāĂ㥹ÀÁÂÃÄÅ"
- cc = "ŢţŤťŦŧṪṫṬṭṮṯṰṱ"
- cn = "ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr"
- cb = "ĨĩĪīĬĭĮįİı"
+ cr = 'ĀāĂ㥹ÀÁÂÃÄÅ'
+ cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ'
+ cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr'
+ cb = 'ĨĩĪīĬĭĮįİı'
sr = len(cr)
sc = len(cc)
@@ -55,7 +56,7 @@ class Encrypt:
def _encodeByte(self, i) -> Union[str, None]:
if i > 0xFF:
- raise ValueError("ERROR! at/ri overflow")
+ raise ValueError('ERROR! at/ri overflow')
if i > 0x7F:
i = i & 0x7F
@@ -65,7 +66,7 @@ class Encrypt:
def _encodeShort(self, i) -> str:
if i > 0xFFFF:
- raise ValueError("ERROR! atri overflow")
+ raise ValueError('ERROR! atri overflow')
reverse = False
if i > 0x7FFF:
@@ -75,15 +76,17 @@ class Encrypt:
char = [
self._div(i, self.scnb),
self._div(i % self.scnb, self.snb),
- self._div(i % self.snb, self.sb),
- i % self.sb,
+ self._div(i % self.snb, self.sb), i % self.sb
+ ]
+ char = [
+ self.cr[char[0]], self.cc[char[1]], self.cn[char[2]],
+ self.cb[char[3]]
]
- char = [self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], self.cb[char[3]]]
if reverse:
return char[2] + char[3] + char[0] + char[1]
- return "".join(char)
+ return ''.join(char)
def _decodeByte(self, c) -> int:
nb = False
@@ -91,11 +94,12 @@ class Encrypt:
if idx[0] < 0 or idx[1] < 0:
idx = [self.cn.index(c[0]), self.cb.index(c[1])]
nb = True
- raise ValueError("ERROR! at/ri overflow")
+ raise ValueError('ERROR! at/ri overflow')
- result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1]
+ result = idx[0] * self.sb + idx[1] \
+ if nb else idx[0] * self.sc + idx[1]
if result > 0x7F:
- raise ValueError("ERROR! at/ri overflow")
+ raise ValueError('ERROR! at/ri overflow')
return result | 0x80 if nb else 0
@@ -106,22 +110,23 @@ class Encrypt:
self.cr.index(c[0]),
self.cc.index(c[1]),
self.cn.index(c[2]),
- self.cb.index(c[3]),
+ self.cb.index(c[3])
]
else:
idx = [
self.cr.index(c[2]),
self.cc.index(c[3]),
self.cn.index(c[0]),
- self.cb.index(c[1]),
+ self.cb.index(c[1])
]
if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0:
- raise ValueError("ERROR! not atri")
+ raise ValueError('ERROR! not atri')
- result = idx[0] * self.scnb + idx[1] * self.snb + idx[2] * self.sb + idx[3]
+ result = idx[0] * self.scnb + idx[1] * self.snb + idx[
+ 2] * self.sb + idx[3]
if result > 0x7FFF:
- raise ValueError("ERROR! atri overflow")
+ raise ValueError('ERROR! atri overflow')
result |= 0x8000 if reverse else 0
return result
@@ -134,36 +139,64 @@ class Encrypt:
if len(b) & 1 == 1:
result.append(self._encodeByte(b[-1]))
- return "".join(result)
+ return ''.join(result)
- def encode(self, s: str, encoding: str = "utf-8"):
+ def encode(self, s: str, encoding: str = 'utf-8'):
if not isinstance(s, str):
- raise ValueError("Please enter str instead of other")
+ raise ValueError('Please enter str instead of other')
return self._encodeBytes(s.encode(encoding))
def _decodeBytes(self, s: str):
if not isinstance(s, str):
- raise ValueError("Please enter str instead of other")
+ raise ValueError('Please enter str instead of other')
if len(s) & 1:
- raise ValueError("ERROR length")
+ raise ValueError('ERROR length')
result = []
for i in range(0, (len(s) >> 2)):
- result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) >> 8]))
- result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) & 0xFF]))
+ result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8]))
+ result.append(bytes([
+ self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF]))
if (len(s) & 2) == 2:
result.append(bytes([self._decodeByte(s[-2:])]))
- return b"".join(result)
+ return b''.join(result)
- def decode(self, s: str, encoding: str = "utf-8") -> str:
+ def decode(self, s: str, encoding: str = 'utf-8') -> str:
if not isinstance(s, str):
- raise ValueError("Please enter str instead of other")
+ raise ValueError('Please enter str instead of other')
try:
return self._decodeBytes(s).decode(encoding)
except UnicodeDecodeError:
- raise ValueError("Decoding failed")
+ raise ValueError('Decoding failed')
+
+
+class Yinglish:
+ @staticmethod
+ def _to_ying(x, y, ying) -> str:
+ if random() > ying:
+ return x
+ if x in [',', '。']:
+ return str(choice(['..', '...', '....', '......']))
+ if x in ['!', '!']:
+ return "❤"
+ if len(x) > 1 and random() < 0.5:
+ return str(choice([
+ f"{x[0]}..{x}", f"{x[0]}...{x}",
+ f"{x[0]}....{x}", f"{x[0]}......{x}"
+ ]))
+ else:
+ if y == "n" and random() < 0.5:
+ x = "〇" * len(x)
+ return str(choice([
+ f"...{x}", f"....{x}",
+ f".....{x}", f"......{x}"
+ ]))
+
+ @classmethod
+ def deal(cls, text, ying: Optional[float] = 0.5) -> str:
+ return "".join([cls._to_ying(x, y, ying) for x, y in pseg.cut(text)])
diff --git a/ATRI/plugins/wife/__init__.py b/ATRI/plugins/wife/__init__.py
new file mode 100644
index 0000000..3f89214
--- /dev/null
+++ b/ATRI/plugins/wife/__init__.py
@@ -0,0 +1,129 @@
+import re
+import asyncio
+from random import choice
+
+from nonebot.typing import T_State
+from nonebot.adapters.cqhttp import (
+ Bot,
+ MessageEvent,
+ GroupMessageEvent,
+ PrivateMessageEvent
+)
+
+from ATRI.service import Service as sv
+from ATRI.rule import is_in_service
+from ATRI.utils.limit import is_too_exciting
+
+from .data_source import Tsuma
+
+
+__doc__ = """
+好欸!是老婆!
+权限组:所有人
+用法:
+ 抽老婆 # 获取一位老婆
+ 查老婆 # 查询老婆,如+at对象可查询对方
+ 我要离婚 # 离婚...
+"""
+
+roll_wife = sv.on_command(
+ cmd='抽老婆',
+ docs=__doc__,
+ rule=is_in_service('抽老婆')
+)
+
+@roll_wife.handle()
+async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None:
+ user = event.user_id
+ gender = event.sender.sex
+ group = event.group_id
+ user_name = await bot.get_group_member_info(group_id=group,
+ user_id=user)
+ user_name = user_name['nickname']
+ run = is_too_exciting(user, 1, seconds=5)
+ if not run:
+ return
+
+ check_repo, if_h = Tsuma.check_tsuma(str(user))
+ if if_h:
+ await roll_wife.finish(check_repo)
+
+ msg = (
+ "5秒后咱将随机抽取一位群友成为\n"
+ f"{user_name} 的老婆!究竟是谁呢~?"
+ )
+ await bot.send(event, msg)
+ await asyncio.sleep(5)
+
+ async def get_luck_user():
+ luck_list = await bot.get_group_member_list(group_id=group)
+ return choice(luck_list)
+
+ while True:
+ luck_user = await get_luck_user()
+ luck_qq = luck_user['user_id']
+ if user != luck_qq:
+ break
+
+ luck_gender = luck_user['sex']
+ luck_user = luck_user['nickname']
+ d = {
+ "nickname": user_name,
+ "gender": gender,
+ "lassie": {
+ "nickname": luck_user,
+ "qq": luck_qq,
+ "gender": luck_gender
+ }
+ }
+
+ if str(luck_qq) == str(event.self_id):
+ Tsuma.got_tsuma(str(user), d)
+ msg = "老婆竟是我自己~❤"
+ else:
+ msg = Tsuma.got_tsuma(str(user), d)
+
+ await roll_wife.finish(msg)
+
+@roll_wife.handle()
+async def _no_pr(bot: Bot, event: PrivateMessageEvent) -> None:
+ await roll_wife.finish('对8起...该功能只对群聊开放(')
+
+
+inquire_wife = sv.on_command(
+ cmd="查老婆",
+ rule=is_in_service('抽老婆')
+)
+
+@inquire_wife.handle()
+async def _inq_wife(bot: Bot, event: MessageEvent) -> None:
+ msg = str(event.message).split(' ')
+ if msg[0] == "":
+ user = str(event.user_id)
+ await inquire_wife.finish(Tsuma.inquire_tsuma(user))
+ else:
+ aim = re.findall(r"qq=(.*?)]", msg[0])[0]
+ await inquire_wife.finish(Tsuma.inquire_tsuma(aim).replace('你', 'ta'))
+
+
+want_divorce = sv.on_command(
+ cmd="我要离婚",
+ rule=is_in_service('抽老婆')
+)
+
+@want_divorce.handle()
+async def _want_div(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ if msg:
+ state['is_d'] = msg
+
+@want_divorce.got('is_d', prompt="你确定吗?(是/否)")
+async def _deal_div(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = state['is_d']
+ user = str(event.user_id)
+ name = event.sender.nickname
+
+ if msg in ['是', '确定']:
+ await want_divorce.finish(Tsuma.divorce(user))
+ else:
+ await want_divorce.finish(f'({name})回心转意了!')
diff --git a/ATRI/plugins/wife/data_source.py b/ATRI/plugins/wife/data_source.py
new file mode 100644
index 0000000..f01985a
--- /dev/null
+++ b/ATRI/plugins/wife/data_source.py
@@ -0,0 +1,90 @@
+import os
+import json
+from pathlib import Path
+
+
+WIFE_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'wife'
+MERRY_LIST_PATH = WIFE_DIR / 'merry_list.json'
+os.makedirs(WIFE_DIR, exist_ok=True)
+
+
+class Tsuma:
+ @staticmethod
+ def _load_tsuma() -> dict:
+ try:
+ return json.loads(MERRY_LIST_PATH.read_bytes())
+ except FileNotFoundError:
+ with open(MERRY_LIST_PATH, 'w') as r:
+ r.write(json.dumps({}, indent=4))
+ return dict()
+
+ @staticmethod
+ def _store_tsuma(data: dict) -> None:
+ with open(MERRY_LIST_PATH, 'w') as r:
+ r.write(json.dumps(data, indent=4))
+
+ @classmethod
+ def check_tsuma(cls, user: str):
+ data = cls._load_tsuma()
+ if user in data:
+ msg = (
+ "阿,你已经有老婆惹!"
+ f"ta是:{data[user]['lassie']['nickname']}"
+ )
+ return msg, True
+ else:
+ return "悲——你还没老婆...", False
+
+ @classmethod
+ def inquire_tsuma(cls, user: str) -> str:
+ data = cls._load_tsuma()
+ if user in data:
+ return f"你的老婆是:{data[user]['lassie']['nickname']} 哦~❤"
+ else:
+ return "悲——你还没老婆..."
+
+ @classmethod
+ def got_tsuma(cls, user: str, d: dict) -> str:
+ check_repo, if_h = cls.check_tsuma(user) # 防止出现多人同时操作导致 NTR 事件
+ if if_h:
+ return check_repo
+ else:
+ data = cls._load_tsuma()
+ data[user] = {
+ "nickname": d['nickname'],
+ "gender": d['gender'],
+ "lassie": {
+ "nickname": d['lassie']['nickname'],
+ "qq": d['lassie']['qq'],
+ "gender": d['lassie']['gender']
+ }
+ }
+ cls._store_tsuma(data)
+
+ data[d['lassie']['qq']] = {
+ "nickname": d['lassie']['nickname'],
+ "gender": d['lassie']['gender'],
+ "lassie": {
+ "nickname": d['nickname'],
+ "qq": user,
+ "gender": d['gender']
+ }
+ }
+ cls._store_tsuma(data)
+
+ msg = (
+ f"> {d['lassie']['nickname']}({d['lassie']['qq']})\n"
+ f"恭喜成为 {d['nickname']} 的老婆~⭐"
+ )
+ return msg
+
+ @classmethod
+ def divorce(cls, user: str) -> str:
+ data = cls._load_tsuma()
+ if not user in data:
+ return "悲——你还没老婆。。"
+
+ msg = f"悲——,({data[user]['nickname']})抛弃了({data[user]['lassie']['nickname']})"
+ del data[user]
+ cls._store_tsuma(data)
+ return msg