summaryrefslogtreecommitdiff
path: root/ATRI/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins')
-rw-r--r--ATRI/plugins/anime_search.py65
-rw-r--r--ATRI/plugins/call_owner.py46
-rw-r--r--ATRI/plugins/code_runner.py95
-rw-r--r--ATRI/plugins/curse/__init__.py12
-rw-r--r--ATRI/plugins/essential.py182
-rw-r--r--ATRI/plugins/funny.py113
-rw-r--r--ATRI/plugins/github.py11
-rw-r--r--ATRI/plugins/help.py22
-rw-r--r--ATRI/plugins/hitokoto.py15
-rw-r--r--ATRI/plugins/manage/__init__.py3
-rw-r--r--ATRI/plugins/manage/modules/block.py92
-rw-r--r--ATRI/plugins/manage/modules/broadcast.py42
-rw-r--r--ATRI/plugins/manage/modules/debug.py91
-rw-r--r--ATRI/plugins/manage/modules/dormant.py12
-rw-r--r--ATRI/plugins/manage/modules/request.py83
-rw-r--r--ATRI/plugins/manage/modules/service.py160
-rw-r--r--ATRI/plugins/manage/modules/shutdown.py10
-rw-r--r--ATRI/plugins/nsfw.py75
-rw-r--r--ATRI/plugins/rich/__init__.py23
-rw-r--r--ATRI/plugins/rich/data_source.py10
-rw-r--r--ATRI/plugins/saucenao/__init__.py52
-rw-r--r--ATRI/plugins/saucenao/data_source.py26
-rw-r--r--ATRI/plugins/status.py46
-rw-r--r--ATRI/plugins/utils/__init__.py31
-rw-r--r--ATRI/plugins/utils/data_source.py79
25 files changed, 590 insertions, 806 deletions
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py
index a62ed5e..1c7e5a3 100644
--- a/ATRI/plugins/anime_search.py
+++ b/ATRI/plugins/anime_search.py
@@ -24,45 +24,40 @@ __doc__ = """
以图搜番 (pic)
"""
-anime_search = sv.on_command(
- cmd="以图搜番",
- docs=__doc__,
- rule=is_in_service('以图搜番')
-)
+anime_search = sv.on_command(cmd="以图搜番", docs=__doc__, rule=is_in_service("以图搜番"))
+
@anime_search.args_parser # type: ignore
async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
- quit_list = ['算了', '罢了', '不搜了', '取消']
+ quit_list = ["算了", "罢了", "不搜了", "取消"]
if msg in quit_list:
- await anime_search.finish('好吧...')
+ await anime_search.finish("好吧...")
if not msg:
- await anime_search.reject('图呢?')
+ await anime_search.reject("图呢?")
else:
- state['pic_anime'] = msg
+ state["pic_anime"] = msg
+
@anime_search.handle()
-async def _anime_search(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _anime_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['pic_anime'] = msg
+ state["pic_anime"] = msg
+
-@anime_search.got('pic_anime', prompt='图呢?')
-async def _deal_search(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- msg = state['pic_anime']
+@anime_search.got("pic_anime", prompt="图呢?")
+async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = state["pic_anime"]
img = re.findall(r"url=(.*?)]", msg)
if not img:
await anime_search.reject("请发送图片而不是其它东西!!")
-
+
try:
req = await get_bytes(URL + img[0])
except RequestTimeOut:
raise RequestTimeOut("Request failed!")
-
+
data = json.loads(req)["docs"]
try:
d = dict()
@@ -72,28 +67,24 @@ async def _deal_search(bot: Bot,
else:
m = data[i]["at"] / 60
s = data[i]["at"] % 60
-
+
if not data[i]["episode"]:
n = 1
else:
n = data[i]["episode"]
-
+
d[to_simple_string(data[i]["title_chinese"])] = [
data[i]["similarity"],
f"第{n}集",
- f"{int(m)}分{int(s)}秒处"
+ f"{int(m)}分{int(s)}秒处",
]
except Exception as err:
raise Exception(f"Invalid data.\n{err}")
-
- result = sorted(
- d.items(),
- key=lambda x:x[1],
- reverse=True
- )
-
+
+ result = sorted(d.items(), key=lambda x: x[1], reverse=True)
+
t = 0
-
+
msg0 = f"> {event.sender.nickname}"
for i in result:
t += 1
@@ -104,16 +95,16 @@ async def _deal_search(bot: Bot,
f"Name: {i[0]}\n"
f"Time: {i[1][1]} {i[1][2]}"
)
-
+
if len(result) == 2:
await anime_search.finish(Message(msg0))
else:
data = FormData()
- data.add_field('poster', 'ATRI running log')
- data.add_field('syntax', 'text')
- data.add_field('expiration', 'day')
- data.add_field('content', msg0)
+ data.add_field("poster", "ATRI running log")
+ data.add_field("syntax", "text")
+ data.add_field("expiration", "day")
+ data.add_field("content", msg0)
repo = f"> {event.sender.nickname}\n"
repo = repo + f"详细请移步此处~\n{await paste(data)}"
- await anime_search.finish(repo) \ No newline at end of file
+ await anime_search.finish(repo)
diff --git a/ATRI/plugins/call_owner.py b/ATRI/plugins/call_owner.py
index cd79d1c..d282824 100644
--- a/ATRI/plugins/call_owner.py
+++ b/ATRI/plugins/call_owner.py
@@ -1,9 +1,6 @@
from nonebot.permission import SUPERUSER
from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import (
- Bot,
- MessageEvent
-)
+from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.service import Service as sv
from ATRI.config import Config
@@ -21,48 +18,44 @@ __doc__ = """
repo = sv.on_command(cmd="来杯红茶", docs=__doc__)
+
@repo.args_parser # type: ignore
async def _repo_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
if msg == "算了":
- await repo.finish('好吧')
-
+ await repo.finish("好吧")
+
if not msg:
- await repo.reject('话呢?')
+ await repo.reject("话呢?")
else:
- state['msg_repo'] = msg
+ state["msg_repo"] = msg
+
@repo.handle()
async def _repo(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['msg_repo'] = msg
+ state["msg_repo"] = msg
+
[email protected]('msg_repo', prompt="请告诉咱需要反馈的内容~!")
[email protected]("msg_repo", prompt="请告诉咱需要反馈的内容~!")
async def _repo_deal(bot: Bot, event: MessageEvent, state: T_State) -> None:
global repo_list
- msg = state['msg_repo']
+ msg = state["msg_repo"]
user = event.user_id
-
+
if count_list(repo_list, user) == 5:
await repo.finish("吾辈已经喝了五杯红茶啦!明天再来吧。")
-
+
repo_list.append(user)
for sup in Config.BotSelfConfig.superusers:
- await bot.send_private_msg(
- user_id=sup,
- message=f"来自用户[{user}]反馈:\n{msg}"
- )
-
+ await bot.send_private_msg(user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}")
+
await repo.finish("吾辈的心愿已由咱转告给咱的维护者了~!")
- "cron",
- hour=0,
- misfire_grace_time=60
-)
[email protected]_job("cron", hour=0, misfire_grace_time=60)
async def _() -> None:
global repo_list
repo_list.clear()
@@ -75,11 +68,8 @@ __doc__ = """
/重置红茶
"""
-reset_repo = sv.on_command(
- cmd="重置红茶",
- docs=__doc__,
- permission=SUPERUSER
-)
+reset_repo = sv.on_command(cmd="重置红茶", docs=__doc__, permission=SUPERUSER)
+
@reset_repo.handle()
async def _reset_repo(bot: Bot, event: MessageEvent) -> None:
diff --git a/ATRI/plugins/code_runner.py b/ATRI/plugins/code_runner.py
index b8f1106..d34872f 100644
--- a/ATRI/plugins/code_runner.py
+++ b/ATRI/plugins/code_runner.py
@@ -10,33 +10,33 @@ from ATRI.utils.request import post_bytes
from ATRI.exceptions import RequestTimeOut
-RUN_API_URL_FORMAT = 'https://glot.io/run/{}?version=latest'
+RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest"
SUPPORTED_LANGUAGES = {
- 'assembly': {'ext': 'asm'},
- 'bash': {'ext': 'sh'},
- 'c': {'ext': 'c'},
- 'clojure': {'ext': 'clj'},
- 'coffeescript': {'ext': 'coffe'},
- 'cpp': {'ext': 'cpp'},
- 'csharp': {'ext': 'cs'},
- 'erlang': {'ext': 'erl'},
- 'fsharp': {'ext': 'fs'},
- 'go': {'ext': 'go'},
- 'groovy': {'ext': 'groovy'},
- 'haskell': {'ext': 'hs'},
- 'java': {'ext': 'java', 'name': 'Main'},
- 'javascript': {'ext': 'js'},
- 'julia': {'ext': 'jl'},
- 'kotlin': {'ext': 'kt'},
- 'lua': {'ext': 'lua'},
- 'perl': {'ext': 'pl'},
- 'php': {'ext': 'php'},
- 'python': {'ext': 'py'},
- 'ruby': {'ext': 'rb'},
- 'rust': {'ext': 'rs'},
- 'scala': {'ext': 'scala'},
- 'swift': {'ext': 'swift'},
- 'typescript': {'ext': 'ts'},
+ "assembly": {"ext": "asm"},
+ "bash": {"ext": "sh"},
+ "c": {"ext": "c"},
+ "clojure": {"ext": "clj"},
+ "coffeescript": {"ext": "coffe"},
+ "cpp": {"ext": "cpp"},
+ "csharp": {"ext": "cs"},
+ "erlang": {"ext": "erl"},
+ "fsharp": {"ext": "fs"},
+ "go": {"ext": "go"},
+ "groovy": {"ext": "groovy"},
+ "haskell": {"ext": "hs"},
+ "java": {"ext": "java", "name": "Main"},
+ "javascript": {"ext": "js"},
+ "julia": {"ext": "jl"},
+ "kotlin": {"ext": "kt"},
+ "lua": {"ext": "lua"},
+ "perl": {"ext": "pl"},
+ "php": {"ext": "php"},
+ "python": {"ext": "py"},
+ "ruby": {"ext": "rb"},
+ "rust": {"ext": "rs"},
+ "scala": {"ext": "scala"},
+ "swift": {"ext": "swift"},
+ "typescript": {"ext": "ts"},
}
@@ -50,54 +50,55 @@ __doc__ = """
print('Hello world!')
"""
-code_runner = sv.on_command(
- cmd="/code",
- docs=__doc__,
- rule=is_in_service('code')
-)
+code_runner = sv.on_command(cmd="/code", docs=__doc__, rule=is_in_service("code"))
+
@code_runner.handle()
async def _code_runner(bot: Bot, event: MessageEvent) -> None:
msg = str(event.message).split("\n")
-
+
if msg[0] == "list":
msg0 = "咱现在支持的语言如下:\n"
msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys()))
-
+
await code_runner.finish(msg0)
elif not msg[0]:
await code_runner.finish("请键入/help以获取更多支持...")
-
+
laug = msg[0].replace("\r", "")
if laug not in SUPPORTED_LANGUAGES:
await code_runner.finish("该语言暂不支持...")
-
+
del msg[0]
code = "\n".join(map(str, msg))
try:
req = await post_bytes(
RUN_API_URL_FORMAT.format(laug),
json={
- "files": [{
- "name": (SUPPORTED_LANGUAGES[laug].get("name", "main") +
- f".{SUPPORTED_LANGUAGES[laug]['ext']}"),
- "content": code
- }],
+ "files": [
+ {
+ "name": (
+ SUPPORTED_LANGUAGES[laug].get("name", "main")
+ + f".{SUPPORTED_LANGUAGES[laug]['ext']}"
+ ),
+ "content": code,
+ }
+ ],
"stdin": "",
- "command": ""
- }
+ "command": "",
+ },
)
except RequestTimeOut:
raise RequestTimeOut("Failed to request!")
-
+
payload = json.loads(req)
sent = False
- for k in ['stdout', 'stderr', 'error']:
+ for k in ["stdout", "stderr", "error"]:
v = payload.get(k)
lines = v.splitlines()
lines, remained_lines = lines[:10], lines[10:]
- out = '\n'.join(lines)
- out, remained_out = out[:60 * 10], out[60 * 10:]
+ out = "\n".join(lines)
+ out, remained_out = out[: 60 * 10], out[60 * 10 :]
if remained_lines or remained_out:
out += f"\n(太多了太多了...)"
@@ -105,6 +106,6 @@ async def _code_runner(bot: Bot, event: MessageEvent) -> None:
if out:
await bot.send(event, f"{k}:\n\n{out}")
sent = True
-
+
if not sent:
await code_runner.finish("Running success! Nothing print.")
diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py
index 5085dbf..acfaa9b 100644
--- a/ATRI/plugins/curse/__init__.py
+++ b/ATRI/plugins/curse/__init__.py
@@ -19,23 +19,17 @@ __doc__ = """
"""
curse = sv.on_command(
- cmd='口臭',
- docs=__doc__,
- aliases={'口臭一下,骂我'},
- rule=is_in_service('口臭')
+ cmd="口臭", docs=__doc__, aliases={"口臭一下,骂我"}, rule=is_in_service("口臭")
)
+
@curse.handle()
async def _curse(bot: Bot, event: MessageEvent) -> None:
global sick_list
user = event.get_user_id()
if count_list(sick_list, user) == 3:
sick_list.append(user)
- repo = (
- "不是??你这么想被咱骂的嘛??"
- "被咱骂就这么舒服的吗?!"
- "该......你该不会是.....M吧!"
- )
+ repo = "不是??你这么想被咱骂的嘛??" "被咱骂就这么舒服的吗?!" "该......你该不会是.....M吧!"
await curse.finish(repo)
elif count_list(sick_list, user) == 6:
sick_list = del_list_aim(sick_list, user)
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
index 3ebaa36..7f14882 100644
--- a/ATRI/plugins/essential.py
+++ b/ATRI/plugins/essential.py
@@ -24,7 +24,7 @@ from nonebot.adapters.cqhttp import (
LuckyKingNotifyEvent,
GroupUploadNoticeEvent,
GroupRecallNoticeEvent,
- FriendRecallNoticeEvent
+ FriendRecallNoticeEvent,
)
import ATRI
@@ -35,8 +35,8 @@ from ATRI.service import Service as sv
from ATRI.utils.cqcode import coolq_code_check
-PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services'
-ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
+PLUGIN_INFO_DIR = Path(".") / "ATRI" / "data" / "service" / "services"
+ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential"
os.makedirs(PLUGIN_INFO_DIR, exist_ok=True)
os.makedirs(ESSENTIAL_DIR, exist_ok=True)
@@ -57,11 +57,7 @@ async def shutdown() -> None:
shutil.rmtree(PLUGIN_INFO_DIR)
logger.debug("成功!")
except Exception:
- repo = (
- '清理插件信息失败',
- '请前往 ATRI/data/service/services 下',
- '将 services 整个文件夹删除'
- )
+ repo = ("清理插件信息失败", "请前往 ATRI/data/service/services 下", "将 services 整个文件夹删除")
time.sleep(10)
raise Exception(repo)
@@ -69,59 +65,49 @@ async def shutdown() -> None:
@driver.on_bot_connect
async def connect(bot) -> None:
for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- int(superuser),
- "WebSocket 成功连接,数据开始传输。"
- )
+ await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 成功连接,数据开始传输。")
@driver.on_bot_disconnect
async def disconnect(bot) -> None:
for superuser in Config.BotSelfConfig.superusers:
try:
- await sv.NetworkPost.send_private_msg(
- int(superuser),
- "WebSocket 貌似断开了呢..."
- )
+ await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 貌似断开了呢...")
except:
logger.error("WebSocket 已断开,等待重连")
@run_preprocessor # type: ignore
-async def _check_block(matcher: Matcher,
- bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _check_block(
+ matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State
+) -> None:
user = str(event.user_id)
try:
msg = str(event.message)
except:
msg = ""
if not sv.BlockSystem.auth_user(user):
- raise IgnoredException(f'Block user: {user}')
-
+ raise IgnoredException(f"Block user: {user}")
+
if not sv.Dormant.is_dormant():
if "/dormant" not in msg:
- raise IgnoredException('Bot has been dormant.')
-
+ raise IgnoredException("Bot has been dormant.")
+
if isinstance(event, GroupMessageEvent):
group = str(event.group_id)
if not sv.BlockSystem.auth_group(group):
- raise IgnoredException(f'Block group: {group}')
+ raise IgnoredException(f"Block group: {group}")
@run_preprocessor # type: ignore
-async def _store_message(matcher: Matcher,
- bot: Bot,
- event,
- state: T_State) -> None:
+async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> None:
if isinstance(event, GroupMessageEvent):
if event.sub_type == "normal":
- now_time = datetime.now().strftime('%Y-%m-%d')
- GROUP_DIR = ESSENTIAL_DIR / 'chat_history' / f'{event.group_id}'
+ now_time = datetime.now().strftime("%Y-%m-%d")
+ GROUP_DIR = ESSENTIAL_DIR / "chat_history" / f"{event.group_id}"
os.makedirs(GROUP_DIR, exist_ok=True)
path = GROUP_DIR / f"{now_time}.chat.json"
- now_time = datetime.now().strftime('%Y%m%d-%H%M%S')
+ now_time = datetime.now().strftime("%Y%m%d-%H%M%S")
try:
data = json.loads(path.read_bytes())
@@ -147,12 +133,12 @@ async def _store_message(matcher: Matcher,
"area": event.sender.area,
"level": event.sender.level,
"role": event.sender.role,
- "title": event.sender.title
+ "title": event.sender.title,
},
- "to_me": str(event.to_me)
+ "to_me": str(event.to_me),
}
try:
- with open(path, 'w', encoding='utf-8') as r:
+ with open(path, "w", encoding="utf-8") as r:
r.write(json.dumps(data, indent=4))
logger.debug(f"写入消息成功,id: {event.message_id}")
except WriteError:
@@ -168,30 +154,24 @@ async def _store_message(matcher: Matcher,
# 处理:好友请求
request_friend_event = sv.on_request()
+
@request_friend_event.handle()
async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
file_name = "request_friend.json"
path = ESSENTIAL_DIR / file_name
path.parent.mkdir(exist_ok=True, parents=True)
-
+
try:
data = json.loads(path.read_bytes())
except:
data = dict()
- data[event.flag] = {
- "user_id": event.user_id,
- "comment": event.comment
- }
+ data[event.flag] = {"user_id": event.user_id, "comment": event.comment}
try:
- with open(path, 'w', encoding='utf-8') as r:
- r.write(
- json.dumps(
- data, indent=4
- )
- )
+ with open(path, "w", encoding="utf-8") as r:
+ r.write(json.dumps(data, indent=4))
except WriteError:
raise WriteError("Writing file failed!")
-
+
for superuser in Config.BotSelfConfig.superusers:
msg = (
"主人,收到一条好友请求:\n"
@@ -199,21 +179,19 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
f"申请信息:{event.comment}\n"
f"申请码:{event.flag}"
)
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
+ await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
# 处理:邀请入群,如身为管理,还附有入群请求
request_group_event = sv.on_request()
+
@request_group_event.handle()
async def _request_group_event(bot, event: GroupRequestEvent) -> None:
file_name = "request_group.json"
path = ESSENTIAL_DIR / file_name
path.parent.mkdir(exist_ok=True, parents=True)
-
+
try:
data = json.loads(path.read_bytes())
except:
@@ -222,18 +200,14 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None:
"user_id": event.user_id,
"group_id": event.group_id,
"sub_type": event.sub_type,
- "comment": event.comment
+ "comment": event.comment,
}
try:
- with open(path, 'w', encoding='utf-8') as r:
- r.write(
- json.dumps(
- data, indent=4
- )
- )
+ with open(path, "w", encoding="utf-8") as r:
+ r.write(json.dumps(data, indent=4))
except WriteError:
raise WriteError("Writing file failed!")
-
+
for superuser in Config.BotSelfConfig.superusers:
msg = (
"主人,收到一条入群请求:\n"
@@ -241,37 +215,27 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None:
f"申请信息:{event.comment}\n"
f"申请码:{event.flag}"
)
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
-
+ await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+
# 处理群成员变动
group_member_event = sv.on_notice()
+
@group_member_event.handle()
async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None:
- msg = (
- "好欸!事新人!\n"
- f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!"
- )
+ msg = "好欸!事新人!\n" f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!"
await group_member_event.finish(msg)
+
@group_member_event.handle()
async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None:
if event.is_tome():
if event.user_id != event.self_id:
return
- msg = (
- "呜呜呜,主人"
- f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..."
- )
+ msg = "呜呜呜,主人" f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..."
for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
+ await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
else:
await group_member_event.finish("阿!有人离开了我们...")
@@ -279,26 +243,27 @@ async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None:
# 处理群管理事件
group_admin_event = sv.on_notice()
+
@group_admin_event.handle()
async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None:
if not event.is_tome():
return
-
+
for superuser in Config.BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
+ user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
)
# 处理群禁言事件
group_ban_event = sv.on_notice()
+
@group_ban_event.handle()
async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
if not event.is_tome():
return
-
+
if event.duration:
msg = (
"那个..。,主人\n"
@@ -306,71 +271,54 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
f"时长...是 {event.duration} 秒"
)
for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
+ await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
else:
- msg = (
- "好欸!主人\n"
- f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!"
- )
+ msg = "好欸!主人\n" f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!"
for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
+ await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
# 处理群红包运气王事件
lucky_read_bag_event = sv.on_notice()
+
@lucky_read_bag_event.handle()
async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None:
- msg = (
- "8行,这可忍?"
- f"gkd [CQ:at,qq={event.user_id}] 发一个!"
- )
+ msg = "8行,这可忍?" f"gkd [CQ:at,qq={event.user_id}] 发一个!"
await lucky_read_bag_event.finish(Message(msg))
# 处理群文件上传事件
group_file_upload_event = sv.on_notice()
+
@group_file_upload_event.handle()
-async def _group_file_upload_event(bot,
- event: GroupUploadNoticeEvent) -> None:
+async def _group_file_upload_event(bot, event: GroupUploadNoticeEvent) -> None:
await group_file_upload_event.finish("让我康康传了啥好东西")
# 处理撤回事件
recall_event = sv.on_notice()
+
@recall_event.handle()
async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None:
try:
repo = await bot.get_msg(message_id=event.message_id)
except:
return
-
+
group = event.group_id
repo = str(repo["message"])
check = await coolq_code_check(repo, group=group)
if not check:
repo = repo.replace("CQ", "QC")
- msg = (
- "主人,咱拿到了一条撤回信息!\n"
- f"{event.user_id}@[群:{event.group_id}]\n"
- "撤回了\n"
- f"{repo}"
- )
+ msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[群:{event.group_id}]\n" "撤回了\n" f"{repo}"
for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
+ await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
+
@recall_event.handle()
async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None:
@@ -378,23 +326,15 @@ async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None:
repo = await bot.get_msg(message_id=event.message_id)
except:
return
-
+
user = event.user_id
repo = str(repo["message"])
check = await coolq_code_check(repo, user)
if not check:
repo = repo.replace("CQ", "QC")
- msg = (
- "主人,咱拿到了一条撤回信息!\n"
- f"{event.user_id}@[私聊]"
- "撤回了\n"
- f"{repo}"
- )
+ msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[私聊]" "撤回了\n" f"{repo}"
await bot.send(event, "咱看到惹~!")
for superuser in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
+ await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg)
diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py
index 03b844a..72e5abb 100644
--- a/ATRI/plugins/funny.py
+++ b/ATRI/plugins/funny.py
@@ -22,28 +22,26 @@ __doc__ = """
来句笑话
"""
-get_laugh = sv.on_command(
- cmd='来句笑话',
- docs=__doc__,
- rule=is_in_service('来句笑话')
-)
+get_laugh = sv.on_command(cmd="来句笑话", docs=__doc__, rule=is_in_service("来句笑话"))
+
@get_laugh.handle()
async def _get_laugh(bot: Bot, event: MessageEvent) -> None:
user_name = event.sender.nickname
laugh_list = []
-
- FILE = Path('.') / 'ATRI' / 'data' / 'database' / 'funny' / 'laugh.txt'
- with open(FILE, 'r', encoding='utf-8') as r:
+
+ FILE = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt"
+ with open(FILE, "r", encoding="utf-8") as r:
for line in r:
- laugh_list.append(line.strip('\n'))
-
+ laugh_list.append(line.strip("\n"))
+
result = choice(laugh_list)
await get_laugh.finish(result.replace("%name", user_name))
me_to_you = sv.on_message()
+
@me_to_you.handle()
async def _me_to_you(bot: Bot, event: MessageEvent) -> None:
if randint(0, 15) == 5:
@@ -59,38 +57,28 @@ __doc__ = """
抽老婆
"""
-roll_wife = sv.on_command(
- cmd='抽老婆',
- docs=__doc__,
- rule=is_in_service('抽老婆')
-)
+roll_wife = sv.on_command(cmd="抽老婆", docs=__doc__, rule=is_in_service("抽老婆"))
+
@roll_wife.handle()
async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None:
user = event.user_id
group = event.group_id
-
- user_name = await bot.get_group_member_info(group_id=group,
- user_id=user)
- user_name = user_name['nickname']
+
+ user_name = await bot.get_group_member_info(group_id=group, user_id=user)
+ user_name = user_name["nickname"]
run = await is_too_exciting(user, group, 5, True)
if not run:
return
-
+
luck_list = await bot.get_group_member_list(group_id=group)
luck_user = choice(luck_list)
- luck_qq = luck_user['user_id']
- luck_user = luck_user['nickname']
- msg = (
- "5秒后咱将随机抽取一位群友成为\n"
- f"{user_name} 的老婆!究竟是谁呢~?"
- )
+ luck_qq = luck_user["user_id"]
+ luck_user = luck_user["nickname"]
+ msg = "5秒后咱将随机抽取一位群友成为\n" f"{user_name} 的老婆!究竟是谁呢~?"
await bot.send(event, msg)
await asyncio.sleep(5)
- msg = (
- f"> {luck_user}({luck_qq})\n"
- f"恭喜成为 {user_name} 的老婆~⭐"
- )
+ msg = f"> {luck_user}({luck_qq})\n" f"恭喜成为 {user_name} 的老婆~⭐"
await bot.send(event, msg)
@@ -107,43 +95,34 @@ __doc__ = """
/fakemsg 123456789*生草人*草 114514*仙贝*臭死了
"""
-fake_msg = sv.on_command(
- cmd="/fakemsg",
- docs=__doc__,
- rule=is_in_service('fakemsg')
-)
+fake_msg = sv.on_command(cmd="/fakemsg", docs=__doc__, rule=is_in_service("fakemsg"))
+
@fake_msg.handle()
async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None:
- msg = str(event.message).split(' ')
+ msg = str(event.message).split(" ")
user = event.user_id
group = event.group_id
node = []
check = await is_too_exciting(user, group, 1, True)
-
+
if check:
for i in msg:
- args = i.split('*')
+ args = i.split("*")
qq = args[0]
- name = args[1].replace('[', '[')
- name = name.replace(']', ']')
- repo = args[2].replace('[', '[')
- repo = repo.replace(']', ']')
- dic = {
- "type": "node",
- "data": {
- "name": name,
- "uin": qq,
- "content": repo
- }
- }
+ name = args[1].replace("[", "[")
+ name = name.replace("]", "]")
+ repo = args[2].replace("[", "[")
+ repo = repo.replace("]", "]")
+ dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}}
node.append(dic)
await bot.send_group_forward_msg(group_id=group, messages=node)
EAT_URL = "https://wtf.hiigara.net/api/run/{}"
-eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service('今天吃什么'))
+eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service("今天吃什么"))
+
@eat_wat.handle()
async def _eat(bot: Bot, event: MessageEvent) -> None:
@@ -152,7 +131,7 @@ async def _eat(bot: Bot, event: MessageEvent) -> None:
user_n = event.sender.nickname
arg = re.findall(r"[今|明|后|大后]天(.*?)吃什么", msg)[0]
nd = re.match(r"[今|明|后|大后][天]", msg)[0]
-
+
if arg == "中午":
a = f"LdS4K6/{randint(0, 999999)}"
url = EAT_URL.format(a)
@@ -160,12 +139,12 @@ async def _eat(bot: Bot, event: MessageEvent) -> None:
try:
data = json.loads(await post_bytes(url, params))
except RequestTimeOut:
- raise RequestTimeOut('Request failed!')
-
- text = to_simple_string(data['text']).replace('今天', nd)
+ raise RequestTimeOut("Request failed!")
+
+ text = to_simple_string(data["text"]).replace("今天", nd)
get_a = re.search(r"非常(.*?)的", text)[0]
- result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, '')
-
+ result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, "")
+
elif arg == "晚上":
a = f"KaTMS/{randint(0, 999999)}"
url = EAT_URL.format(a)
@@ -173,11 +152,11 @@ async def _eat(bot: Bot, event: MessageEvent) -> None:
try:
data = json.loads(await post_bytes(url, params))
except RequestTimeOut:
- raise RequestTimeOut('Request failed!')
-
- text = to_simple_string(data['text']).replace('今天', '')
+ raise RequestTimeOut("Request failed!")
+
+ text = to_simple_string(data["text"]).replace("今天", "")
result = f"> {MessageSegment.at(user)}\n" + text
-
+
else:
rd = randint(1, 10)
if rd == 5:
@@ -189,10 +168,12 @@ async def _eat(bot: Bot, event: MessageEvent) -> None:
try:
data = json.loads(await post_bytes(url, params))
except RequestTimeOut:
- raise RequestTimeOut('Request failed!')
-
- text = to_simple_string(data['text']).replace('今天', nd)
+ raise RequestTimeOut("Request failed!")
+
+ text = to_simple_string(data["text"]).replace("今天", nd)
get_a = re.match(r"(.*?)的智商", text)[0]
- result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, f'{user_n}的智商')
-
+ result = f"> {MessageSegment.at(user)}\n" + text.replace(
+ get_a, f"{user_n}的智商"
+ )
+
await eat_wat.finish(Message(result))
diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py
index c238b69..e7c0d80 100644
--- a/ATRI/plugins/github.py
+++ b/ATRI/plugins/github.py
@@ -12,6 +12,7 @@ URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}"
github_issues = sv.on_message()
+
@github_issues.handle()
async def _github_issues(bot: Bot, event: MessageEvent) -> None:
msg = str(event.message)
@@ -19,21 +20,19 @@ async def _github_issues(bot: Bot, event: MessageEvent) -> None:
need_info = re.findall(patt, msg)
if not need_info:
return
-
+
for i in need_info:
need_info = list(i)
owner = need_info[0]
repo = need_info[1]
issue_number = need_info[2]
- url = URL.format(owner=owner,
- repo=repo,
- issue_number=issue_number)
-
+ url = URL.format(owner=owner, repo=repo, issue_number=issue_number)
+
try:
data = await get_bytes(url)
except RequestTimeOut:
return
-
+
data = json.loads(data)
msg0 = (
f"{repo}: #{issue_number} {data['state']}\n"
diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py
index 32d3f65..d2754b1 100644
--- a/ATRI/plugins/help.py
+++ b/ATRI/plugins/help.py
@@ -7,7 +7,7 @@ from ATRI.service import SERVICE_DIR
from ATRI.service import Service as sv
-SERVICE_DIR = SERVICE_DIR / 'services'
+SERVICE_DIR = SERVICE_DIR / "services"
__doc__ = """
@@ -19,14 +19,12 @@ __doc__ = """
/help info (cmd)
"""
-help = sv.on_command(
- cmd="/help",
- docs=__doc__
-)
+help = sv.on_command(cmd="/help", docs=__doc__)
+
@help.handle()
async def _help(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(' ')
+ msg = str(event.message).split(" ")
if msg[0] == "":
msg = (
"呀?找不到路了?\n"
@@ -41,7 +39,7 @@ async def _help(bot: Bot, event: MessageEvent) -> None:
for _, _, i in os.walk(SERVICE_DIR):
for a in i:
f = SERVICE_DIR / a
- files.append(json.loads(f.read_bytes())['command'])
+ files.append(json.loads(f.read_bytes())["command"])
cmds = " | ".join(map(str, files))
msg = "咱能做很多事!比如:\n" + cmds
msg0 = msg + "\n没反应可能是没权限...或者为探测类型...不属于可直接触发命令..."
@@ -53,13 +51,9 @@ async def _help(bot: Bot, event: MessageEvent) -> None:
try:
data = json.loads(path.read_bytes())
except:
- await help.finish('未找到相关命令...')
+ await help.finish("未找到相关命令...")
- msg = (
- f"{cmd} INFO:\n"
- f"Enabled: {data['enabled']}\n"
- f"{data['docs']}"
- )
+ msg = f"{cmd} INFO:\n" f"Enabled: {data['enabled']}\n" f"{data['docs']}"
await help.finish(msg)
else:
- await help.finish('请检查输入...')
+ await help.finish("请检查输入...")
diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py
index 2b8d81f..1078ca3 100644
--- a/ATRI/plugins/hitokoto.py
+++ b/ATRI/plugins/hitokoto.py
@@ -11,7 +11,7 @@ from ATRI.utils.request import get_bytes
URL = [
"https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/a.json",
"https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/b.json",
- "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json"
+ "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json",
]
sick_list = []
@@ -24,12 +24,10 @@ __doc__ = """
"""
hitokoto = sv.on_command(
- cmd='一言',
- aliases={'抑郁一下', '网抑云'},
- docs=__doc__,
- rule=is_in_service('一言') & to_bot()
+ cmd="一言", aliases={"抑郁一下", "网抑云"}, docs=__doc__, rule=is_in_service("一言") & to_bot()
)
+
@hitokoto.handle()
async def _hitokoto(bot: Bot, event: MessageEvent) -> None:
global sick_list
@@ -41,10 +39,7 @@ async def _hitokoto(bot: Bot, event: MessageEvent) -> None:
await hitokoto.finish("额......需要咱安慰一下嘛~?")
elif count_list(sick_list, user) == 6:
sick_list = del_list_aim(sick_list, user)
- msg = (
- "如果心里感到难受就赶快去睡觉!别再憋自己了!\n"
- "我...我会守在你身边的!...嗯..一定"
- )
+ msg = "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" "我...我会守在你身边的!...嗯..一定"
await hitokoto.finish(msg)
else:
sick_list.append(user)
@@ -53,4 +48,4 @@ async def _hitokoto(bot: Bot, event: MessageEvent) -> None:
data = json.loads(await get_bytes(url))
except RequestTimeOut:
raise RequestTimeOut("Request failed!")
- await hitokoto.finish(data[randint(1, len(data) - 1)]['hitokoto'])
+ await hitokoto.finish(data[randint(1, len(data) - 1)]["hitokoto"])
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py
index d9f1ffe..b316020 100644
--- a/ATRI/plugins/manage/__init__.py
+++ b/ATRI/plugins/manage/__init__.py
@@ -4,5 +4,4 @@ from pathlib import Path
_sub_plugins = set()
-_sub_plugins |= nonebot.load_plugins(
- str((Path(__file__).parent / 'modules').resolve()))
+_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve()))
diff --git a/ATRI/plugins/manage/modules/block.py b/ATRI/plugins/manage/modules/block.py
index 65a6a86..0c2fcec 100644
--- a/ATRI/plugins/manage/modules/block.py
+++ b/ATRI/plugins/manage/modules/block.py
@@ -12,28 +12,24 @@ __doc__ = """
封禁用户 QQ号
"""
-block_user = sv.on_command(
- cmd="封禁用户",
- docs=__doc__,
- permission=SUPERUSER
-)
+block_user = sv.on_command(cmd="封禁用户", docs=__doc__, permission=SUPERUSER)
+
@block_user.args_parser # type: ignore
-async def _block_user_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _block_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await block_user.finish('好吧...')
+ await block_user.finish("好吧...")
if not msg:
- await block_user.reject('是谁呢?!GKD!')
+ await block_user.reject("是谁呢?!GKD!")
else:
- state['noob'] = msg
+ state["noob"] = msg
+
@block_user.handle()
async def _block_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
- noob = state['noob']
+ noob = state["noob"]
sv.BlockSystem.control_list(True, user=noob)
msg = f"用户[{noob}]已被封禁(;′⌒`)"
await block_user.finish(msg)
@@ -46,28 +42,24 @@ __doc__ = """
解封用户 QQ号
"""
-unblock_user = sv.on_command(
- cmd="解封用户",
- docs=__doc__,
- permission=SUPERUSER
-)
+unblock_user = sv.on_command(cmd="解封用户", docs=__doc__, permission=SUPERUSER)
+
@unblock_user.args_parser # type: ignore
-async def _unblock_user_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _unblock_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await unblock_user.finish('好吧...')
+ await unblock_user.finish("好吧...")
if not msg:
- await unblock_user.reject('要原谅谁呢...')
+ await unblock_user.reject("要原谅谁呢...")
else:
- state['forgive'] = msg
+ state["forgive"] = msg
+
@unblock_user.handle()
async def _unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None:
- forgive = state['forgive']
+ forgive = state["forgive"]
sv.BlockSystem.control_list(False, user=forgive)
msg = f"用户[{forgive}]已被解封ヾ(´・ω・`)ノ"
await unblock_user.finish(msg)
@@ -80,28 +72,24 @@ __doc__ = """
封禁群 群号
"""
-block_group = sv.on_command(
- cmd="封禁群",
- docs=__doc__,
- permission=SUPERUSER
-)
+block_group = sv.on_command(cmd="封禁群", docs=__doc__, permission=SUPERUSER)
+
@block_group.args_parser # type: ignore
-async def _block_group_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _block_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await block_user.finish('好吧...')
+ await block_user.finish("好吧...")
if not msg:
- await block_user.reject('是哪个群?!GKD!')
+ await block_user.reject("是哪个群?!GKD!")
else:
- state['noob_g'] = msg
+ state["noob_g"] = msg
+
@block_group.handle()
async def _block_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
- noob_g = state['noob_g']
+ noob_g = state["noob_g"]
sv.BlockSystem.control_list(True, group=noob_g)
msg = f"群[{noob_g}]已被封禁(;′⌒`)"
await block_user.finish(msg)
@@ -114,30 +102,24 @@ __doc__ = """
解封 群号
"""
-unblock_group = sv.on_command(
- cmd="解封群",
- docs=__doc__,
- permission=SUPERUSER
-)
+unblock_group = sv.on_command(cmd="解封群", docs=__doc__, permission=SUPERUSER)
+
@unblock_group.args_parser # type: ignore
-async def _unblock_group_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _unblock_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await block_user.finish('好吧...')
+ await block_user.finish("好吧...")
if not msg:
- await block_user.reject('要原谅哪个群呢...')
+ await block_user.reject("要原谅哪个群呢...")
else:
- state['forgive_g'] = msg
+ state["forgive_g"] = msg
+
@unblock_group.handle()
-async def _unblock_group(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- forgive_g = state['forgive_g']
+async def _unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ forgive_g = state["forgive_g"]
sv.BlockSystem.control_list(False, group=forgive_g)
msg = f"群[{forgive_g}]已被解封ヾ(´・ω・`)ノ"
await unblock_user.finish(msg)
diff --git a/ATRI/plugins/manage/modules/broadcast.py b/ATRI/plugins/manage/modules/broadcast.py
index 5086fcf..7f7816d 100644
--- a/ATRI/plugins/manage/modules/broadcast.py
+++ b/ATRI/plugins/manage/modules/broadcast.py
@@ -15,52 +15,46 @@ __doc__ = """
广播 内容
"""
-broadcast = sv.on_command(
- cmd="广播",
- docs=__doc__,
- permission=SUPERUSER
-)
+broadcast = sv.on_command(cmd="广播", docs=__doc__, permission=SUPERUSER)
+
@broadcast.args_parser # type: ignore
-async def _broadcast_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _broadcast_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await broadcast.finish('好吧...')
+ await broadcast.finish("好吧...")
if not msg:
- await broadcast.reject('想群发啥呢0w0')
+ await broadcast.reject("想群发啥呢0w0")
else:
- state['msg'] = msg
+ state["msg"] = msg
+
@broadcast.handle()
async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['msg'] = msg
-
[email protected]('msg', prompt='请告诉咱需要群发的内容~!')
-async def _deal_broadcast(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- msg = state['msg']
+ state["msg"] = msg
+
+
[email protected]("msg", prompt="请告诉咱需要群发的内容~!")
+async def _deal_broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = state["msg"]
group_list = await bot.get_group_list()
succ_list = []
err_list = []
-
+
for group in group_list:
await asyncio.sleep(randint(0, 2))
try:
- await bot.send_group_msg(group_id=group["group_id"],
- message=msg)
+ await bot.send_group_msg(group_id=group["group_id"], message=msg)
except BaseException:
err_list.append(group["group_id"])
-
+
msg0 = ""
for i in err_list:
msg0 += f" {i}\n"
-
+
repo_msg = (
f"推送消息:\n{msg}\n"
"————————\n"
diff --git a/ATRI/plugins/manage/modules/debug.py b/ATRI/plugins/manage/modules/debug.py
index 53b75b7..7bd3992 100644
--- a/ATRI/plugins/manage/modules/debug.py
+++ b/ATRI/plugins/manage/modules/debug.py
@@ -11,7 +11,7 @@ from ATRI.utils.ub_paste import paste
from ATRI.exceptions import load_error
-level_list = ['info', 'warning', 'error', 'debug']
+level_list = ["info", "warning", "error", "debug"]
__doc__ = """
@@ -25,44 +25,47 @@ __doc__ = """
get_console = sv.on_command(
cmd="获取log",
- aliases={'获取LOG', '获取控制台', '获取控制台信息'},
+ aliases={"获取LOG", "获取控制台", "获取控制台信息"},
docs=__doc__,
- permission=SUPERUSER
+ permission=SUPERUSER,
)
+
@get_console.handle()
async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['level'] = msg
+ state["level"] = msg
+
-@get_console.got('level', prompt='需要获取的等级是?')
+@get_console.got("level", prompt="需要获取的等级是?")
async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None:
- quit_list = ['算了', '罢了', '不了']
- if state['level'] in quit_list:
- await get_console.finish('好吧...')
+ quit_list = ["算了", "罢了", "不了"]
+ if state["level"] in quit_list:
+ await get_console.finish("好吧...")
-@get_console.got('line', prompt='需要获取的行数是?')
+
+@get_console.got("line", prompt="需要获取的行数是?")
async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None:
- level = state['level']
- line = state['line']
+ level = state["level"]
+ line = state["line"]
repo = str()
-
+
path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log"
- logs = await open_file(path, 'readlines')
-
+ logs = await open_file(path, "readlines")
+
try:
- content = logs[int(line):] # type: ignore
+ content = logs[int(line) :] # type: ignore
repo = "\n".join(content).replace("ATRI", "ATRI")
except IndexError:
- await get_console.finish(f'行数错误...max: {len(logs)}') # type: ignore
-
+ await get_console.finish(f"行数错误...max: {len(logs)}") # type: ignore
+
data = FormData()
- data.add_field('poster', 'ATRI running log')
- data.add_field('syntax', 'text')
- data.add_field('expiration', 'day')
- data.add_field('content', repo)
-
+ data.add_field("poster", "ATRI running log")
+ data.add_field("syntax", "text")
+ data.add_field("expiration", "day")
+ data.add_field("content", repo)
+
msg0 = f"> {event.sender.nickname}\n"
msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}"
await track_error.finish(msg0)
@@ -76,41 +79,39 @@ __doc__ = """
"""
track_error = sv.on_command(
- cmd="track",
- aliases={'追踪'},
- docs=__doc__,
- permission=SUPERUSER
+ cmd="track", aliases={"追踪"}, docs=__doc__, permission=SUPERUSER
)
+
@track_error.args_parser # type: ignore
-async def _track_error_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await track_error.finish('好吧...')
+ await track_error.finish("好吧...")
if not msg:
- await track_error.reject('欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ')
+ await track_error.reject("欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ")
else:
- state['track'] = msg
+ state["track"] = msg
+
@track_error.handle()
async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['track'] = msg
+ state["track"] = msg
-@track_error.got('track', prompt='欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ')
+
+@track_error.got("track", prompt="欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ")
async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None:
- track_id = state['track']
+ track_id = state["track"]
data = dict()
-
+
try:
data = load_error(track_id)
except BaseException:
- await track_error.finish('未发现对应ID呢...(⇀‸↼‶)')
-
+ await track_error.finish("未发现对应ID呢...(⇀‸↼‶)")
+
msg = (
f"ID: [{track_id}]\n"
f"Time: [{data['time']}]\n"
@@ -118,13 +119,13 @@ async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None:
"——————\n"
f"{data['content']}"
)
-
+
data = FormData()
- data.add_field('poster', track_id)
- data.add_field('syntax', 'text')
- data.add_field('expiration', 'day')
- data.add_field('content', msg)
-
+ data.add_field("poster", track_id)
+ data.add_field("syntax", "text")
+ data.add_field("expiration", "day")
+ data.add_field("content", msg)
+
msg0 = f"> {event.sender.nickname}\n"
msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}"
await track_error.finish(msg0)
diff --git a/ATRI/plugins/manage/modules/dormant.py b/ATRI/plugins/manage/modules/dormant.py
index d5ae321..cd72d47 100644
--- a/ATRI/plugins/manage/modules/dormant.py
+++ b/ATRI/plugins/manage/modules/dormant.py
@@ -13,12 +13,10 @@ __doc__ = """
"""
dormant_enabled = sv.on_command(
- cmd='休眠',
- docs=__doc__,
- rule=to_bot(),
- permission=SUPERUSER
+ cmd="休眠", docs=__doc__, rule=to_bot(), permission=SUPERUSER
)
+
@dormant_enabled.handle()
async def _dormant_enabled(bot: Bot, event: MessageEvent) -> None:
sv.Dormant.control_dormant(True)
@@ -34,12 +32,10 @@ __doc__ = """
"""
dormant_disabled = sv.on_command(
- cmd='苏醒',
- docs=__doc__,
- rule=to_bot(),
- permission=SUPERUSER
+ cmd="苏醒", docs=__doc__, rule=to_bot(), permission=SUPERUSER
)
+
@dormant_disabled.handle()
async def _dormant_disabled(bot: Bot, event: MessageEvent) -> None:
sv.Dormant.control_dormant(False)
diff --git a/ATRI/plugins/manage/modules/request.py b/ATRI/plugins/manage/modules/request.py
index 843a94e..762e918 100644
--- a/ATRI/plugins/manage/modules/request.py
+++ b/ATRI/plugins/manage/modules/request.py
@@ -9,7 +9,7 @@ from ATRI.service import Service as sv
from ATRI.exceptions import ReadFileError, FormatError
-ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
+ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential"
__doc__ = """
@@ -19,16 +19,13 @@ __doc__ = """
查看申请列表
"""
-req_list = sv.on_command(
- cmd="申请列表",
- docs=__doc__,
- permission=SUPERUSER
-)
+req_list = sv.on_command(cmd="申请列表", docs=__doc__, permission=SUPERUSER)
+
@req_list.handle()
async def _req_list(bot: Bot, event: MessageEvent) -> None:
- path_f = ESSENTIAL_DIR / 'request_friend.json'
- path_g = ESSENTIAL_DIR / 'request_group.json'
+ path_f = ESSENTIAL_DIR / "request_friend.json"
+ path_g = ESSENTIAL_DIR / "request_group.json"
data_f, data_g = dict()
try:
data_f = json.loads(path_f.read_bytes())
@@ -38,7 +35,7 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None:
data_g = json.loads(path_g.read_bytes())
except ReadFileError:
msg_g = "[读取文件失败]"
-
+
msg_f = str()
for i in data_f.keys():
msg_f += f"{i} | {data_f[i]['user_id']} | {data_f[i]['comment']}\n"
@@ -46,75 +43,65 @@ async def _req_list(bot: Bot, event: MessageEvent) -> None:
msg_g = str()
for i in data_g.keys():
msg_g += f"{i} | {data_g[i]['sub_type']} | {data_g[i]['user_id']} | {data_g[i]['comment']}\n"
-
- msg = (
- "好友/群申请列表如下:\n"
- "· 好友:\n"
- f"{msg_f}"
- "· 群:\n"
- f"{msg_g}"
- )
+
+ msg = "好友/群申请列表如下:\n" "· 好友:\n" f"{msg_f}" "· 群:\n" f"{msg_g}"
await req_list.finish(msg)
-req_deal = sv.on_regex(
- r'[同意|拒绝][好友|群]申请',
- permission=SUPERUSER
-)
+req_deal = sv.on_regex(r"[同意|拒绝][好友|群]申请", permission=SUPERUSER)
+
@req_deal.handle()
async def _req_deal(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(' ')
- arg = re.findall(r'[同意|拒绝][好友|群]申请', msg[0])
+ msg = str(event.message).split(" ")
+ arg = re.findall(r"[同意|拒绝][好友|群]申请", msg[0])
app = arg[0]
_type = arg[1]
-
+
if not msg[1]:
- await req_deal.finish(f'正确用法!速看\n{app}{_type}申请 (reqid)')
-
+ await req_deal.finish(f"正确用法!速看\n{app}{_type}申请 (reqid)")
+
path = ESSENTIAL_DIR / "request_group.json"
data_g = dict()
try:
data_g = json.loads(path.read_bytes())
except FileExistsError:
await req_deal.finish("读取群数据失败,可能并没有请求...")
-
+
reqid = msg[1]
if app == "同意":
if _type == "好友":
try:
- await bot.set_friend_add_request(flag=reqid,
- approve=True)
- await req_deal.finish('完成~!已同意申请')
+ await bot.set_friend_add_request(flag=reqid, approve=True)
+ await req_deal.finish("完成~!已同意申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
elif _type == "群":
try:
- await bot.set_group_add_request(flag=reqid,
- sub_type=data_g[reqid]['sub_type'],
- approve=True)
- await req_deal.finish('完成~!已同意申请')
+ await bot.set_group_add_request(
+ flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=True
+ )
+ await req_deal.finish("完成~!已同意申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
else:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
elif app == "拒绝":
if _type == "好友":
try:
- await bot.set_friend_add_request(flag=reqid,
- approve=False)
- await req_deal.finish('完成~!已拒绝申请')
+ await bot.set_friend_add_request(flag=reqid, approve=False)
+ await req_deal.finish("完成~!已拒绝申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
elif _type == "群":
try:
- await bot.set_group_add_request(flag=reqid,
- sub_type=data_g[reqid]['sub_type'],
- approve=False)
- await req_deal.finish('完成~!已拒绝申请')
+ await bot.set_group_add_request(
+ flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=False
+ )
+ await req_deal.finish("完成~!已拒绝申请")
except FormatError:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
else:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
else:
- await req_deal.finish('请检查输入的值是否正确——!')
+ await req_deal.finish("请检查输入的值是否正确——!")
diff --git a/ATRI/plugins/manage/modules/service.py b/ATRI/plugins/manage/modules/service.py
index 6489ad6..a3624df 100644
--- a/ATRI/plugins/manage/modules/service.py
+++ b/ATRI/plugins/manage/modules/service.py
@@ -5,7 +5,7 @@ from nonebot.adapters.cqhttp import (
Bot,
MessageEvent,
GroupMessageEvent,
- PrivateMessageEvent
+ PrivateMessageEvent,
)
from ATRI.service import Service as sv
@@ -21,46 +21,40 @@ __doc__ = """
"""
cur_service_ena = sv.on_command(
- cmd="启用功能",
- docs=__doc__,
- permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER
+ cmd="启用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER
)
+
@cur_service_ena.args_parser # type: ignore
-async def _cur_ena_load(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_ena_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await cur_service_ena.finish('好吧...')
+ await cur_service_ena.finish("好吧...")
if not msg:
- await cur_service_ena.reject('请告诉咱目标命令!')
+ await cur_service_ena.reject("请告诉咱目标命令!")
else:
- state['service_e'] = msg
+ state["service_e"] = msg
+
@cur_service_ena.handle()
-async def _cur_ena(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_e'] = msg
+ state["service_e"] = msg
-@cur_service_ena.got('service_e', prompt='请告诉咱目标命令!')
-async def _deal_cur_ena(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
- cmd = state['service_e']
+
+@cur_service_ena.got("service_e", prompt="请告诉咱目标命令!")
+async def _deal_cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+ cmd = state["service_e"]
group = str(event.group_id)
sv.control_service(cmd, False, True, group=group)
- await cur_service_ena.finish(f'成功!本群已启用:{cmd}')
+ await cur_service_ena.finish(f"成功!本群已启用:{cmd}")
+
@cur_service_ena.handle()
-async def _refuse_cur_ena(bot: Bot,
- event: PrivateMessageEvent,
- state: T_State) -> None:
- await cur_service_ena.finish('只能在群聊中决定哦...')
+async def _refuse_cur_ena(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None:
+ await cur_service_ena.finish("只能在群聊中决定哦...")
__doc__ = """
@@ -73,46 +67,40 @@ __doc__ = """
"""
cur_service_dis = sv.on_command(
- cmd="禁用功能",
- docs=__doc__,
- permission=SUPERUSER|GROUP_ADMIN|GROUP_OWNER
+ cmd="禁用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER
)
+
@cur_service_dis.args_parser # type: ignore
-async def _cur_dis_load(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_dis_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await cur_service_dis.finish('好吧...')
+ await cur_service_dis.finish("好吧...")
if not msg:
- await cur_service_dis.reject('请告诉咱目标命令!')
+ await cur_service_dis.reject("请告诉咱目标命令!")
else:
- state['service_d'] = msg
+ state["service_d"] = msg
+
@cur_service_dis.handle()
-async def _cur_dis(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_d'] = msg
+ state["service_d"] = msg
-@cur_service_dis.got('service_d', prompt='请告诉咱目标命令!')
-async def _deal_cur_dis(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
- cmd = state['service_d']
+
+@cur_service_dis.got("service_d", prompt="请告诉咱目标命令!")
+async def _deal_cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+ cmd = state["service_d"]
group = str(event.group_id)
sv.control_service(cmd, False, False, group=group)
- await cur_service_dis.finish(f'成功!本群已禁用:{cmd}')
+ await cur_service_dis.finish(f"成功!本群已禁用:{cmd}")
+
@cur_service_dis.handle()
-async def _refuse_cur_dis(bot: Bot,
- event: PrivateMessageEvent,
- state: T_State) -> None:
- await cur_service_dis.finish('只能在群聊中决定哦...')
+async def _refuse_cur_dis(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None:
+ await cur_service_dis.finish("只能在群聊中决定哦...")
__doc__ = """
@@ -124,40 +112,33 @@ __doc__ = """
全局启用 以图搜图
"""
-glo_service_ena = sv.on_command(
- cmd="全局启用",
- docs=__doc__,
- permission=SUPERUSER
-)
+glo_service_ena = sv.on_command(cmd="全局启用", docs=__doc__, permission=SUPERUSER)
+
@glo_service_ena.args_parser # type: ignore
-async def _glo_ena_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_ena_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await glo_service_ena.finish('好吧...')
+ await glo_service_ena.finish("好吧...")
if not msg:
- await glo_service_ena.reject('请告诉咱目标命令!')
+ await glo_service_ena.reject("请告诉咱目标命令!")
else:
- state['service_e_g'] = msg
+ state["service_e_g"] = msg
+
@glo_service_ena.handle()
-async def _glo_ena(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_e_g'] = msg
+ state["service_e_g"] = msg
+
-@glo_service_ena.got('service_e_g', prompt='请告诉咱目标命令!')
-async def _deal_glo_ena(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- cmd = state['service_e_g']
+@glo_service_ena.got("service_e_g", prompt="请告诉咱目标命令!")
+async def _deal_glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ cmd = state["service_e_g"]
sv.control_service(cmd, True, True)
- await glo_service_ena.finish(f'成功!已全局启用:{cmd}')
+ await glo_service_ena.finish(f"成功!已全局启用:{cmd}")
__doc__ = """
@@ -169,37 +150,30 @@ __doc__ = """
全局禁用 以图搜图
"""
-glo_service_dis = sv.on_command(
- cmd="全局禁用",
- docs=__doc__,
- permission=SUPERUSER
-)
+glo_service_dis = sv.on_command(cmd="全局禁用", docs=__doc__, permission=SUPERUSER)
+
@glo_service_dis.args_parser # type: ignore
-async def _glo_dis_load(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_dis_load(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- quit_list = ['算了', '罢了', '取消']
+ quit_list = ["算了", "罢了", "取消"]
if msg in quit_list:
- await glo_service_dis.finish('好吧...')
+ await glo_service_dis.finish("好吧...")
if not msg:
- await glo_service_dis.reject('请告诉咱目标命令!')
+ await glo_service_dis.reject("请告诉咱目标命令!")
else:
- state['service_d_g'] = msg
+ state["service_d_g"] = msg
+
@glo_service_dis.handle()
-async def _glo_dis(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
+async def _glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['service_d_g'] = msg
+ state["service_d_g"] = msg
+
-@glo_service_dis.got('service_d_g', prompt='请告诉咱目标命令!')
-async def _deal_glo_dis(bot: Bot,
- event: MessageEvent,
- state: T_State) -> None:
- cmd = state['service_d_g']
+@glo_service_dis.got("service_d_g", prompt="请告诉咱目标命令!")
+async def _deal_glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ cmd = state["service_d_g"]
sv.control_service(cmd, True, False)
- await glo_service_dis.finish(f'成功!已全局禁用:{cmd}')
+ await glo_service_dis.finish(f"成功!已全局禁用:{cmd}")
diff --git a/ATRI/plugins/manage/modules/shutdown.py b/ATRI/plugins/manage/modules/shutdown.py
index 11b2b1b..78f23e8 100644
--- a/ATRI/plugins/manage/modules/shutdown.py
+++ b/ATRI/plugins/manage/modules/shutdown.py
@@ -12,11 +12,8 @@ __doc__ = """
@ 关机
"""
-shutdown = sv.on_command(
- cmd="关机",
- docs=__doc__,
- permission=SUPERUSER
-)
+shutdown = sv.on_command(cmd="关机", docs=__doc__, permission=SUPERUSER)
+
@shutdown.handle()
async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
@@ -24,9 +21,10 @@ async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
if msg:
state["msg"] = msg
+
@shutdown.got("msg", prompt="[WARNING]此项操作将强行终止bot运行,是否继续(y/n)")
async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
- t = ['y', 'Y', '是']
+ t = ["y", "Y", "是"]
if state["msg"] in t:
await bot.send(event, "咱还会醒来的,一定")
exit(0)
diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py
index fe5bf78..076ce03 100644
--- a/ATRI/plugins/nsfw.py
+++ b/ATRI/plugins/nsfw.py
@@ -13,14 +13,12 @@ from ATRI.utils.request import get_bytes
from ATRI.utils.cqcode import coolq_code_check
-nsfw_url = (
- f"http://{Config.NsfwCheck.host}:"
- f"{Config.NsfwCheck.port}/?url="
-)
+nsfw_url = f"http://{Config.NsfwCheck.host}:" f"{Config.NsfwCheck.port}/?url="
nsfw_checking = sv.on_message()
+
@nsfw_checking.handle()
async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None:
if Config.NsfwCheck.enabled:
@@ -28,23 +26,25 @@ async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None:
user = event.user_id
group = event.group_id
check = await coolq_code_check(msg, user, group)
-
+
if check:
if "image" not in msg:
return
-
+
url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0]
try:
data = json.loads(await get_bytes(url))
except:
- log.warning('检测涩图失败,请查阅文档以获取帮助')
+ log.warning("检测涩图失败,请查阅文档以获取帮助")
return
- if round(data['score'], 4) > Config.NsfwCheck.passing_rate:
- score = "{:.2%}".format(round(data['score'], 4))
- log.debug(f'截获涩图,得分:{score}')
- await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!')
+ if round(data["score"], 4) > Config.NsfwCheck.passing_rate:
+ score = "{:.2%}".format(round(data["score"], 4))
+ log.debug(f"截获涩图,得分:{score}")
+ await bot.send(event, f"好涩哦!涩值:{score}\n不行了咱要发给主人看!")
for sup in Config.BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=f"{msg}\n涩值: {score}")
+ await bot.send_private_msg(
+ user_id=sup, message=f"{msg}\n涩值: {score}"
+ )
else:
pass
@@ -60,60 +60,57 @@ __doc__ = """
/nsfw 然后Bot会向你索取图片
"""
-nsfw_reading = sv.on_command(
- cmd="/nsfw",
- docs=__doc__,
- rule=is_in_service('nsfw')
-)
+nsfw_reading = sv.on_command(cmd="/nsfw", docs=__doc__, rule=is_in_service("nsfw"))
+
@nsfw_reading.args_parser # type: ignore
async def _nsfw(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
msg = str(event.message)
- quit_list = ['算了', '罢了', '不搜了']
+ quit_list = ["算了", "罢了", "不搜了"]
if msg in quit_list:
- await nsfw_reading.finish('好吧')
-
+ await nsfw_reading.finish("好吧")
+
if not msg:
- await nsfw_reading.reject('图呢?')
+ await nsfw_reading.reject("图呢?")
else:
- state['pic_nsfw'] = msg
+ state["pic_nsfw"] = msg
+
@nsfw_reading.handle()
-async def _nsfw_r(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
+async def _nsfw_r(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
user = event.user_id
group = event.group_id
msg = str(event.message).strip()
check = await coolq_code_check(msg, user, group)
if check and msg:
- state['pic_nsfw'] = msg
+ state["pic_nsfw"] = msg
+
-@nsfw_reading.got('pic_nsfw', prompt='图呢?')
-async def _nsfw_reading(bot: Bot,
- event: GroupMessageEvent,
- state: T_State) -> None:
- msg = state['pic_nsfw']
+@nsfw_reading.got("pic_nsfw", prompt="图呢?")
+async def _nsfw_reading(bot: Bot, event: GroupMessageEvent, state: T_State) -> None:
+ msg = state["pic_nsfw"]
pic = re.findall(r"url=(.*?)]", msg)
if not pic:
- await nsfw_reading.reject('请发送图片而不是其它东西!!')
-
+ await nsfw_reading.reject("请发送图片而不是其它东西!!")
+
url = nsfw_url + pic[0]
try:
data = json.loads(await get_bytes(url))
except RequestTimeOut:
- raise RequestTimeOut('Time out!')
-
- score = round(data['score'], 4)
- result = "{:.2%}".format(round(data['score'], 4))
+ raise RequestTimeOut("Time out!")
+
+ score = round(data["score"], 4)
+ result = "{:.2%}".format(round(data["score"], 4))
if score > 0.9:
level = "hso! 我要发给主人看!"
for sup in Config.BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}")
+ await bot.send_private_msg(
+ user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}"
+ )
elif 0.9 > score >= 0.6:
level = "嗯,可冲"
else:
level = "?能不能换张55完全冲不起来"
-
+
repo = f"涩值:{result}\n{level}"
await nsfw_reading.finish(repo)
diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py
index cd4c5bd..14b9534 100644
--- a/ATRI/plugins/rich/__init__.py
+++ b/ATRI/plugins/rich/__init__.py
@@ -16,22 +16,23 @@ from .data_source import dec
temp_list = []
img_url = [
"https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/fkrich.png",
- "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg"
+ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg",
]
bilibili_rich = sv.on_message()
+
@bilibili_rich.handle()
async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
global temp_list
try:
msg = str(event.raw_message).replace("\\", "")
bv = False
-
+
if "qqdocurl" not in msg:
if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
+ av = re.findall(r"(av\d+)", msg)[0].replace("av", "")
else:
bv = re.findall(r"(BV\w+)", msg)
av = str(dec(bv[0]))
@@ -43,31 +44,29 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
async with session.get(url=bv_url) as r:
bv = re.findall(r"(BV\w+)", str(r.url))
av = dec(bv[0])
-
+
if not bv:
if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
+ av = re.findall(r"(av\d+)", msg)[0].replace("av", "")
else:
return
-
+
if count_list(temp_list, av) == 4:
await bot.send(event, "你是怕别人看不到么发这么多次?")
temp_list = del_list_aim(temp_list, av)
return
-
+
temp_list.append(av)
-
+
URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}"
- data = json.loads(await get_bytes(URL))['data']
+ data = json.loads(await get_bytes(URL))["data"]
repo = (
f"{data['bvid']} INFO:\n"
f"Title: {data['title']}\n"
f"Link: {data['short_link']}\n"
"にまねげぴのTencent rich!"
)
- await bot.send(
- event,
- MessageSegment.image(file=choice(img_url)))
+ await bot.send(event, MessageSegment.image(file=choice(img_url)))
await bilibili_rich.finish(repo)
except BaseException:
return
diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py
index 32ac219..59474ff 100644
--- a/ATRI/plugins/rich/data_source.py
+++ b/ATRI/plugins/rich/data_source.py
@@ -1,4 +1,4 @@
-table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
+table = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF"
tr = {}
for i in range(58):
tr[table[i]] = i
@@ -10,13 +10,13 @@ add = 8728348608
def dec(x) -> int:
r = 0
for i in range(6):
- r += tr[x[s[i]]] * 58**i
+ r += tr[x[s[i]]] * 58 ** i
return (r - add) ^ xor
def enc(x) -> str:
x = (x ^ xor) + add
- r = list('BV1 4 1 7 ')
+ r = list("BV1 4 1 7 ")
for i in range(6):
- r[s[i]] = table[x // 58**i % 58]
- return ''.join(r)
+ r[s[i]] = table[x // 58 ** i % 58]
+ return "".join(r)
diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py
index f158cf9..b4b1497 100644
--- a/ATRI/plugins/saucenao/__init__.py
+++ b/ATRI/plugins/saucenao/__init__.py
@@ -21,55 +21,53 @@ __doc__ = """
以图搜图 (pic)
"""
-saucenao = sv.on_command(
- cmd='以图搜图',
- docs=__doc__,
- rule=is_in_service('以图搜图')
-)
+saucenao = sv.on_command(cmd="以图搜图", docs=__doc__, rule=is_in_service("以图搜图"))
+
@saucenao.args_parser # type: ignore
-async def _load_saucenao(bot: Bot, event: MessageEvent,
- state: T_State) -> None:
+async def _load_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
- quit_list = ['算了', '罢了', '不搜了']
+ quit_list = ["算了", "罢了", "不搜了"]
if msg in quit_list:
- await saucenao.finish('好吧...')
-
+ await saucenao.finish("好吧...")
+
if not msg:
- await saucenao.reject('图呢?')
+ await saucenao.reject("图呢?")
else:
- state['pic_sau'] = msg
+ state["pic_sau"] = msg
+
@saucenao.handle()
async def _sauce_nao(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['pic_sau'] = msg
+ state["pic_sau"] = msg
[email protected]('pic_sau', prompt='图呢?')
+
[email protected]("pic_sau", prompt="图呢?")
async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = state['pic_sau']
- img = re.findall(r'url=(.*?)]', msg)
+ msg = state["pic_sau"]
+ img = re.findall(r"url=(.*?)]", msg)
if not img:
- await saucenao.finish('请发送图片而不是其他东西!!')
-
+ await saucenao.finish("请发送图片而不是其他东西!!")
+
try:
task = SauceNao(api_key=Config.SauceNAO.key)
data = json.loads(await task.search(img[0]))
except RequestTimeOut:
- raise RequestTimeOut('Request failed!')
-
- res = data['results']
+ raise RequestTimeOut("Request failed!")
+
+ res = data["results"]
result = list()
for i in range(0, 3):
data = res[i]
-
+
_result = dict()
- _result['similarity'] = data['header']['similarity']
- _result['index_name'] = data['header']['index_name']
- _result['url'] = choice(data['data'].get('ext_urls', ['None']))
+ _result["similarity"] = data["header"]["similarity"]
+ _result["index_name"] = data["header"]["index_name"]
+ _result["url"] = choice(data["data"].get("ext_urls", ["None"]))
result.append(_result)
-
+
msg0 = f"> {MessageSegment.at(event.user_id)}"
for i in result:
msg0 = msg0 + (
@@ -78,5 +76,5 @@ async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
f"Name: {i['index_name']}\n"
f"URL: {i['url'].replace('https://', '')}"
)
-
+
await saucenao.finish(Message(msg0))
diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py
index cd88554..efe8fe1 100644
--- a/ATRI/plugins/saucenao/data_source.py
+++ b/ATRI/plugins/saucenao/data_source.py
@@ -5,23 +5,19 @@ URL = "https://saucenao.com/search.php"
class SauceNao:
- def __init__(self,
- api_key: str,
- output_type=2,
- testmode=1,
- dbmaski=32768,
- db=5,
- numres=5) -> None:
+ def __init__(
+ self, api_key: str, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5
+ ) -> None:
params = dict()
- params['api_key'] = api_key
- params['output_type'] = output_type
- params['testmode'] = testmode
- params['dbmaski'] = dbmaski
- params['db'] = db
- params['numres'] = numres
+ params["api_key"] = api_key
+ params["output_type"] = output_type
+ params["testmode"] = testmode
+ params["dbmaski"] = dbmaski
+ params["db"] = db
+ params["numres"] = numres
self.params = params
-
+
async def search(self, url: str):
- self.params['url'] = url
+ self.params["url"] = url
res = await post_bytes(url=URL, params=self.params)
return res
diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py
index 5e48fc3..bd5d9c4 100644
--- a/ATRI/plugins/status.py
+++ b/ATRI/plugins/status.py
@@ -16,10 +16,8 @@ __doc__ = """
/ping
"""
-ping = sv.on_command(
- cmd="/ping",
- docs="测试机器人",
- rule=is_in_service('ping'))
+ping = sv.on_command(cmd="/ping", docs="测试机器人", rule=is_in_service("ping"))
+
@ping.handle()
async def _ping(bot: Bot, event: MessageEvent) -> None:
@@ -33,11 +31,8 @@ __doc__ = """
/status
"""
-status = sv.on_command(
- cmd="/status",
- docs=__doc__,
- rule=is_in_service('status')
-)
+status = sv.on_command(cmd="/status", docs=__doc__, rule=is_in_service("status"))
+
@status.handle()
async def _status(bot: Bot, event: MessageEvent) -> None:
@@ -45,13 +40,13 @@ async def _status(bot: Bot, event: MessageEvent) -> None:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
- inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
- inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+ inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
+ inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
except GetStatusError:
raise GetStatusError("Failed to get status.")
-
+
msg = "アトリは、高性能ですから!"
-
+
if cpu > 80: # type: ignore
msg = "咱感觉有些头晕..."
if mem > 80:
@@ -60,7 +55,7 @@ async def _status(bot: Bot, event: MessageEvent) -> None:
msg = "咱感觉有点累..."
elif disk > 80:
msg = "咱感觉身体要被塞满了..."
-
+
msg0 = (
"Self status:\n"
f"* CPU: {cpu}%\n"
@@ -69,26 +64,22 @@ async def _status(bot: Bot, event: MessageEvent) -> None:
f"* netSENT: {inteSENT}MB\n"
f"* netRECV: {inteRECV}MB\n"
) + msg
-
+
await status.finish(msg0)
- 'interval',
- minutes=5,
- misfire_grace_time=10
-)
[email protected]_job("interval", minutes=5, misfire_grace_time=10)
async def _():
log.info("开始自检")
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
- inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
- inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+ inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
+ inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
except GetStatusError:
raise GetStatusError("Failed to get status.")
-
+
msg = ""
if cpu > 80: # type: ignore
msg = "咱感觉有些头晕..."
@@ -101,7 +92,7 @@ async def _():
else:
log.info("运作正常")
return
-
+
msg0 = (
"Self status:\n"
f"* CPU: {cpu}%\n"
@@ -110,9 +101,6 @@ async def _():
f"* netSENT: {inteSENT}MB\n"
f"* netRECV: {inteRECV}MB\n"
) + msg
-
+
for sup in Config.BotSelfConfig.superusers:
- await sv.NetworkPost.send_private_msg(
- user_id=sup,
- message=msg0
- )
+ await sv.NetworkPost.send_private_msg(user_id=sup, message=msg0)
diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py
index 5d1860a..ff28e59 100644
--- a/ATRI/plugins/utils/__init__.py
+++ b/ATRI/plugins/utils/__init__.py
@@ -17,26 +17,24 @@ roll一下
/roll 1d10+10d9+4d5+2d3
"""
-roll = sv.on_command(
- cmd="/roll",
- docs=__doc__,
- rule=is_in_service('roll')
-)
+roll = sv.on_command(cmd="/roll", docs=__doc__, rule=is_in_service("roll"))
+
@roll.handle()
async def _roll(bot: Bot, event: MessageEvent, state: dict) -> None:
args = str(event.message).strip()
if args:
- state['resu'] = args
+ state["resu"] = args
+
@roll.got("resu", prompt="roll 参数不能为空~!\ndemo:1d10 或 2d10+2d10")
async def _(bot: Bot, event: MessageEvent, state: dict) -> None:
- resu = state['resu']
- match = re.match(r'^([\dd+\s]+?)$', resu)
-
+ resu = state["resu"]
+ match = re.match(r"^([\dd+\s]+?)$", resu)
+
if not match:
await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10")
-
+
await roll.finish(roll_dice(resu))
@@ -52,22 +50,19 @@ __doc__ = """
/enc e アトリは高性能ですから!
"""
-encrypt = sv.on_command(
- cmd="/enc",
- docs=__doc__,
- rule=is_in_service('enc')
-)
+encrypt = sv.on_command(cmd="/enc", docs=__doc__, rule=is_in_service("enc"))
+
@encrypt.handle()
async def _encrypt(bot: Bot, event: MessageEvent) -> None:
- msg = str(event.message).split(' ')
+ msg = str(event.message).split(" ")
_type = msg[0]
s = msg[1]
e = Encrypt()
-
+
if _type == "e":
await encrypt.finish(e.encode(s))
elif _type == "d":
await encrypt.finish(e.decode(s))
else:
- await encrypt.finish('请检查输入~!')
+ await encrypt.finish("请检查输入~!")
diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py
index 18c0492..4ef46a3 100644
--- a/ATRI/plugins/utils/data_source.py
+++ b/ATRI/plugins/utils/data_source.py
@@ -6,41 +6,41 @@ from typing import Union
def roll_dice(par: str) -> str:
result = 0
- proc = ''
+ proc = ""
proc_list = []
p = par.split("+")
-
+
for i in p:
args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i)
args = list(args[0])
-
+
args[0] = args[0] or 1
if int(args[0]) >= 5000 or int(args[2]) >= 5000:
return "阿...好大......"
-
+
for a in range(1, int(args[0]) + 1):
rd = random.randint(1, int(args[2]))
result = result + rd
-
+
if len(proc_list) <= 10:
proc_list.append(rd)
-
+
if len(proc_list) <= 10:
proc += "+".join(map(str, proc_list))
elif len(proc_list) > 10:
proc += "太长了不展示了就酱w"
else:
proc += str(result)
-
+
result = f"{par}=({proc})={result}"
return result
-class Encrypt():
- cr = 'ĀāĂ㥹ÀÁÂÃÄÅ'
- cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ'
- cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr'
- cb = 'ĨĩĪīĬĭĮįİı'
+class Encrypt:
+ cr = "ĀāĂ㥹ÀÁÂÃÄÅ"
+ cc = "ŢţŤťŦŧṪṫṬṭṮṯṰṱ"
+ cn = "ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr"
+ cb = "ĨĩĪīĬĭĮįİı"
sr = len(cr)
sc = len(cc)
@@ -55,7 +55,7 @@ class Encrypt():
def _encodeByte(self, i) -> Union[str, None]:
if i > 0xFF:
- raise ValueError('ERROR! at/ri overflow')
+ raise ValueError("ERROR! at/ri overflow")
if i > 0x7F:
i = i & 0x7F
@@ -65,7 +65,7 @@ class Encrypt():
def _encodeShort(self, i) -> str:
if i > 0xFFFF:
- raise ValueError('ERROR! atri overflow')
+ raise ValueError("ERROR! atri overflow")
reverse = False
if i > 0x7FFF:
@@ -75,17 +75,15 @@ class Encrypt():
char = [
self._div(i, self.scnb),
self._div(i % self.scnb, self.snb),
- self._div(i % self.snb, self.sb), i % self.sb
- ]
- char = [
- self.cr[char[0]], self.cc[char[1]], self.cn[char[2]],
- self.cb[char[3]]
+ self._div(i % self.snb, self.sb),
+ i % self.sb,
]
+ char = [self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], self.cb[char[3]]]
if reverse:
return char[2] + char[3] + char[0] + char[1]
- return ''.join(char)
+ return "".join(char)
def _decodeByte(self, c) -> int:
nb = False
@@ -93,12 +91,11 @@ class Encrypt():
if idx[0] < 0 or idx[1] < 0:
idx = [self.cn.index(c[0]), self.cb.index(c[1])]
nb = True
- raise ValueError('ERROR! at/ri overflow')
+ raise ValueError("ERROR! at/ri overflow")
- result = idx[0] * self.sb + idx[1] \
- if nb else idx[0] * self.sc + idx[1]
+ result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1]
if result > 0x7F:
- raise ValueError('ERROR! at/ri overflow')
+ raise ValueError("ERROR! at/ri overflow")
return result | 0x80 if nb else 0
@@ -109,23 +106,22 @@ class Encrypt():
self.cr.index(c[0]),
self.cc.index(c[1]),
self.cn.index(c[2]),
- self.cb.index(c[3])
+ self.cb.index(c[3]),
]
else:
idx = [
self.cr.index(c[2]),
self.cc.index(c[3]),
self.cn.index(c[0]),
- self.cb.index(c[1])
+ self.cb.index(c[1]),
]
if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0:
- raise ValueError('ERROR! not atri')
+ raise ValueError("ERROR! not atri")
- result = idx[0] * self.scnb + idx[1] * self.snb + idx[
- 2] * self.sb + idx[3]
+ result = idx[0] * self.scnb + idx[1] * self.snb + idx[2] * self.sb + idx[3]
if result > 0x7FFF:
- raise ValueError('ERROR! atri overflow')
+ raise ValueError("ERROR! atri overflow")
result |= 0x8000 if reverse else 0
return result
@@ -138,37 +134,36 @@ class Encrypt():
if len(b) & 1 == 1:
result.append(self._encodeByte(b[-1]))
- return ''.join(result)
+ return "".join(result)
- def encode(self, s: str, encoding: str = 'utf-8'):
+ def encode(self, s: str, encoding: str = "utf-8"):
if not isinstance(s, str):
- raise ValueError('Please enter str instead of other')
+ raise ValueError("Please enter str instead of other")
return self._encodeBytes(s.encode(encoding))
def _decodeBytes(self, s: str):
if not isinstance(s, str):
- raise ValueError('Please enter str instead of other')
+ raise ValueError("Please enter str instead of other")
if len(s) & 1:
- raise ValueError('ERROR length')
+ raise ValueError("ERROR length")
result = []
for i in range(0, (len(s) >> 2)):
- result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8]))
- result.append(bytes([
- self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF]))
+ result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) >> 8]))
+ result.append(bytes([self._decodeShort(s[i * 4 : i * 4 + 4]) & 0xFF]))
if (len(s) & 2) == 2:
result.append(bytes([self._decodeByte(s[-2:])]))
- return b''.join(result)
+ return b"".join(result)
- def decode(self, s: str, encoding: str = 'utf-8') -> str:
+ def decode(self, s: str, encoding: str = "utf-8") -> str:
if not isinstance(s, str):
- raise ValueError('Please enter str instead of other')
+ raise ValueError("Please enter str instead of other")
try:
return self._decodeBytes(s).decode(encoding)
except UnicodeDecodeError:
- raise ValueError('Decoding failed')
+ raise ValueError("Decoding failed")