summaryrefslogtreecommitdiff
path: root/ATRI/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins')
-rw-r--r--ATRI/plugins/admin.py116
-rw-r--r--ATRI/plugins/anime-search/__init__.py4
-rw-r--r--ATRI/plugins/anti-rubbish.py166
-rw-r--r--ATRI/plugins/call-owner.py8
-rw-r--r--ATRI/plugins/code-runner.py2
-rw-r--r--ATRI/plugins/curse/__init__.py2
-rw-r--r--ATRI/plugins/drifting-bottle/__init__.py5
-rw-r--r--ATRI/plugins/essential.py152
-rw-r--r--ATRI/plugins/funny.py111
-rw-r--r--ATRI/plugins/github.py1
-rw-r--r--ATRI/plugins/help.py67
-rw-r--r--ATRI/plugins/hitokoto.py15
-rw-r--r--ATRI/plugins/key-repo/__init__.py280
-rw-r--r--ATRI/plugins/key-repo/data_source.py116
-rw-r--r--ATRI/plugins/nsfw.py103
-rw-r--r--ATRI/plugins/rich/__init__.py18
-rw-r--r--ATRI/plugins/status.py8
-rw-r--r--ATRI/plugins/utils/__init__.py50
-rw-r--r--ATRI/plugins/utils/data_source.py139
19 files changed, 945 insertions, 418 deletions
diff --git a/ATRI/plugins/admin.py b/ATRI/plugins/admin.py
index 9465130..cd902e4 100644
--- a/ATRI/plugins/admin.py
+++ b/ATRI/plugins/admin.py
@@ -12,6 +12,7 @@ from nonebot.adapters.cqhttp import (
)
from nonebot.typing import T_State
+from ATRI.config import Config
from ATRI.service import Service as sv
from ATRI.exceptions import WriteError, load_error
from ATRI.utils.file import open_file
@@ -80,11 +81,12 @@ async def _chat_monitor(bot: Bot, event: GroupMessageEvent) -> None:
else:
pass
+
ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
request_friend = sv.on_command(
- name="好友申请处理",
cmd="好友申请",
+ docs="好友申请处理",
permission=SUPERUSER
)
@@ -123,8 +125,8 @@ async def _request_friend(bot: Bot, event: MessageEvent) -> None:
request_group = sv.on_command(
- name="群聊申请处理",
cmd="群聊申请",
+ docs="群聊申请处理",
permission=SUPERUSER
)
@@ -173,8 +175,8 @@ async def _request_group(bot: Bot, event: MessageEvent) -> None:
broadcast = sv.on_command(
- name="广播",
- cmd="/broadcast",
+ cmd="/bc",
+ docs="广播\n用法:/bc 广播内容",
permission=SUPERUSER
)
@@ -214,8 +216,8 @@ async def _bd(bot: Bot, event: MessageEvent, state: T_State) -> None:
track_error = sv.on_command(
- name="错误堆栈查看",
cmd="/track",
+ docs="报错堆栈查看\n用法:/track 追踪ID",
permission=SUPERUSER
)
@@ -246,8 +248,8 @@ async def _(bot: Bot, event: MessageEvent, state: T_State) -> None:
get_log = sv.on_command(
- name="获取控制台信息",
cmd="/getlog",
+ docs="获取控制台信息\n用法:/getlog 等级:info,warning,debug 行数:比如-20即最近20行",
permission=SUPERUSER
)
@@ -282,8 +284,8 @@ async def _get_log(bot: Bot, event: MessageEvent) -> None:
shutdown = sv.on_command(
- name="紧急停机",
- cmd="/shutdown",
+ cmd="/st",
+ docs="紧急停机",
permission=SUPERUSER
)
@@ -300,3 +302,101 @@ async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None:
exit(0)
else:
await shutdown.finish("再考虑下先吧 ;w;")
+
+
+__doc__ = """
+懒得和你废话,block了
+权限组:维护者
+用法:
+ /b user,group 0,1
+补充:
+ user:QQ号
+ group:QQ群号
+ 0,1:对应布尔值False, True
+ 范围为全局
+"""
+
+block = sv.on_command(
+ cmd="/b",
+ docs="懒得和你废话,block了\n用法:/b u,g 0,1",
+ permission=SUPERUSER
+)
+
+async def _block(bot: Bot, event: MessageEvent) -> None:
+ msg = str(event.message).split(' ')
+ _type = msg[0]
+ arg = int(msg[1])
+ is_enabled = bool(int(msg[2]))
+ b_type = ""
+
+ status = "封禁" if is_enabled else "解封"
+
+ if _type == "g":
+ sv.BlockSystem.control_list(is_enabled=is_enabled, group=arg)
+ b_type = "Group"
+ elif _type == "u":
+ sv.BlockSystem.control_list(is_enabled, user=arg)
+ b_type = "User"
+ else:
+ await block.finish("请检查输入...")
+
+ await block.finish(f"已成功将[{b_type}@{arg}]{status}")
+
+
+__doc__ = """
+功能开关控制
+权限组:维护者,群管理
+用法:
+ 对于维护者:
+ /s 目标指令 u+int,g+int,global 0,1
+ 对于群管理:
+ /s 目标指令 0,1
+补充:
+ user:QQ号
+ group:QQ群号
+ global:全局
+ 0,1:对应布尔值False, True
+示例:
+ 对于维护者:
+ /s /status u123456789 0
+ 对于群管理:
+ /s /status 0
+"""
+
+service_control = sv.on_command(
+ cmd="/s",
+ docs=__doc__,
+ permission=SUPERUSER
+)
+
+@service_control.handle()
+async def _service_control(bot: Bot, event: MessageEvent) -> None:
+ msg = str(event.message).split(' ')
+ user = event.user_id
+ cmd = msg[0]
+ _type = msg[1]
+ is_enabled = bool(msg[2])
+
+ status = "封禁" if is_enabled else "解封"
+
+ if user in Config.BotSelfConfig.superusers:
+ if _type == "global":
+ sv.control_service(cmd, True, is_enabled)
+ else:
+ if "u" in _type:
+ qq = _type.replace('u', '')
+ sv.control_service(cmd, False, is_enabled, user=int(qq))
+ elif "g" in _type:
+ group = _type.replace('g', '')
+ sv.control_service(cmd, False, is_enabled, group=int(group))
+ else:
+ await service_control.finish("请检查输入~!")
+ else:
+ if isinstance(event, GroupMessageEvent):
+ group = event.group_id
+ sv.control_service(cmd, False, bool(_type), group=group)
+ else:
+ await service_control.finish("此功能仅在群聊中触发")
+
+ await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]")
diff --git a/ATRI/plugins/anime-search/__init__.py b/ATRI/plugins/anime-search/__init__.py
index 07cc8dc..5759f19 100644
--- a/ATRI/plugins/anime-search/__init__.py
+++ b/ATRI/plugins/anime-search/__init__.py
@@ -18,8 +18,8 @@ URL = "https://trace.moe/api/search?url="
anime_search = sv.on_command(
- name="以图搜番",
cmd="/anime",
+ docs="以图搜番",
rule=is_block() & is_in_dormant()
)
@@ -31,7 +31,7 @@ async def _anime_search(bot: Bot,
if msg:
state["msg"] = msg
-@anime_search.got("msg", prompt="请发送咱一张图片~!")
+@anime_search.got("msg", prompt="请告诉咱目标图片~!")
async def _(bot: Bot,
event: MessageEvent,
state: T_State) -> None:
diff --git a/ATRI/plugins/anti-rubbish.py b/ATRI/plugins/anti-rubbish.py
deleted file mode 100644
index de8e1e1..0000000
--- a/ATRI/plugins/anti-rubbish.py
+++ /dev/null
@@ -1,166 +0,0 @@
-import json
-from pathlib import Path
-from datetime import datetime
-from typing import Optional
-
-from nonebot.plugin import on_command, on_message
-from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
-
-from ATRI.log import logger
-from ATRI.rule import is_block
-from ATRI.config import nonebot_config
-from ATRI.utils.file import write_file
-from ATRI.utils.apscheduler import scheduler
-from ATRI.exceptions import WriteError
-
-
-RUBBISH_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'anti-rubbish'
-now_time = datetime.now().strftime('%Y%m%d-%H%M%S')
-
-
-# 365 x 24 不间断监听啥b发屎人
-# 日您🐎的,自己在厕所吃完自助餐还不够,还要分享给群友
-# 👴就搁这公示公示这些啥b
-# 每日0点如有发屎记录则全群通报
-anti_rubbish = on_message()
-
-@anti_rubbish.handle()
-async def _anti_rubbish(bot: Bot, event: GroupMessageEvent) -> None:
- msg = str(event.message).strip()
- user = int(event.user_id)
- group = event.group_id
- key_word: dict = NoobRubbish.get_keyword()
- noob_data: dict = NoobRubbish.read_noobs(group)
- noob_data.setdefault(user, {})
-
- for key in key_word.keys():
- if key in msg:
- noob_data[user].setdefault(key, 1)
- noob_data[user][key] += 1
- await write_file(NoobRubbish._get_noob(group), json.dumps(noob_data))
- logger.info(
- f"GET 吃屎人 {user}[@群{group}] 第{noob_data[user][key]}次: {msg}")
-
-
-rubbish = on_command("/rubbish", rule=is_block())
-
-async def _rubbish(bot: Bot, event: GroupMessageEvent) -> None:
- cmd = str(event.message).split(" ")
- user = int(event.user_id)
- group = event.group_id
-
- if cmd[0] == "list":
- noob_data: dict = NoobRubbish.read_noobs(group)
- noob_list = ""
- for key in noob_data.keys():
- noob_list += f"{key}\n"
-
- if not noob_list:
- await rubbish.finish("此群很干净呢~!")
- else:
- msg = (
- f"截至{now_time}\n"
- f"吃过厕所自助餐的有:\n"
- ) + noob_list
- await rubbish.finish(msg)
-
- elif cmd[0] == "read":
- try:
- user = cmd[1]
- except:
- await rubbish.finish("格式/rubbish read qq")
-
- noob_data: dict = NoobRubbish.read_noob(group, int(user))
- if not noob_data:
- await rubbish.finish("该群友很干净!")
- else:
- noob_keys = ""
- for key in noob_data.keys():
- noob_keys += f"{key}-{noob_data[key]}次\n"
- msg = (
- f"截至{now_time}\n"
- f"此群友吃的屎的种类,以及次数:\n"
- ) + noob_keys
- await rubbish.finish(msg)
-
- elif cmd[0] == "update":
- if user not in nonebot_config["superusers"]:
- await rubbish.finish("没权限呢...")
-
- key_word = cmd[1]
- data = NoobRubbish.get_keyword()
- data[key_word] = now_time
- await NoobRubbish.store_keyword(data)
- await rubbish.finish(f"勉強しました!\n[{key_word}]")
-
- elif cmd[0] == "del":
- if user not in nonebot_config["superusers"]:
- await rubbish.finish("没权限呢...")
-
- key_word = cmd[1]
- data = NoobRubbish.get_keyword()
- del data[key_word]
- await NoobRubbish.store_keyword(data)
- await rubbish.finish(f"清除~![{key_word}]")
-
- else:
- await rubbish.finish("请检查格式~!详细请查阅文档")
-
-
-# @scheduler.scheduled_job(
-# "cron",
-# hour=0,
-# misfire_grace_time=120
-# )
-# async def _():
-# group = GroupMessageEvent.group_id
-# noob_data = NoobRubbish.read_noobs(group)
-
-
-class NoobRubbish:
- @staticmethod
- def _get_noob(group: Optional[int] = None) -> Path:
- file_name = f"{now_time}.noob.json"
- GROUP_DIR = RUBBISH_DIR / f"{group}"
- path = GROUP_DIR / file_name
-
- if not GROUP_DIR.exists():
- GROUP_DIR.mkdir()
- return path
-
- @classmethod
- def read_noobs(cls, group: int) -> dict:
- try:
- data = json.loads(cls._get_noob(group).read_bytes())
- except:
- data = {}
- return data
-
- @classmethod
- def read_noob(cls, group: int, user: Optional[int]) -> dict:
- try:
- data = json.loads(cls._get_noob(group).read_bytes())
- except:
- data = {}
- data.setdefault(user, {})
- return data
-
- @staticmethod
- def get_keyword() -> dict:
- file_name = "keyword.json"
- path = RUBBISH_DIR / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
- return data
-
- @staticmethod
- async def store_keyword(data: dict) -> None:
- file_name = "keyword.json"
- path = RUBBISH_DIR / file_name
- try:
- await write_file(path, json.dumps(data))
- except WriteError:
- raise WriteError("Writing file failed!")
diff --git a/ATRI/plugins/call-owner.py b/ATRI/plugins/call-owner.py
index af8e83d..2e12a1e 100644
--- a/ATRI/plugins/call-owner.py
+++ b/ATRI/plugins/call-owner.py
@@ -7,7 +7,7 @@ from nonebot.adapters.cqhttp import (
from ATRI.service import Service as sv
from ATRI.rule import is_block
-from ATRI.config import nonebot_config
+from ATRI.config import Config
from ATRI.utils.apscheduler import scheduler
from ATRI.utils.list import count_list
@@ -16,8 +16,8 @@ repo_list = []
repo = sv.on_command(
- name="给维护者留言",
cmd="来杯红茶",
+ docs="给维护者留言",
rule=is_block()
)
@@ -38,7 +38,7 @@ async def _repo_(bot: Bot, event: MessageEvent, state: T_State) -> None:
repo_list.append(user)
- for sup in nonebot_config["superusers"]:
+ for sup in Config.BotSelfConfig.superusers:
await bot.send_private_msg(
user_id=sup,
message=f"来自用户[{user}]反馈:\n{msg}"
@@ -58,8 +58,8 @@ async def _() -> None:
reset_repo = sv.on_command(
- name="重置给维护者留言次数",
cmd="重置红茶",
+ docs="重置给维护者的留言次数",
permission=SUPERUSER
)
diff --git a/ATRI/plugins/code-runner.py b/ATRI/plugins/code-runner.py
index 731abfc..6da97b0 100644
--- a/ATRI/plugins/code-runner.py
+++ b/ATRI/plugins/code-runner.py
@@ -41,8 +41,8 @@ SUPPORTED_LANGUAGES = {
code_runner = sv.on_command(
- name="运行代码",
cmd="/code",
+ docs="在线运行代码",
rule=is_block() & is_in_dormant()
)
diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py
index 03da205..6db80cc 100644
--- a/ATRI/plugins/curse/__init__.py
+++ b/ATRI/plugins/curse/__init__.py
@@ -18,8 +18,8 @@ sick_list = []
__plugin_name__ = 'curse'
curse = sv.on_command(
- name="口臭",
cmd="口臭一下",
+ docs="口臭",
aliases={"口臭", "骂我"},
rule=is_block() & is_in_dormant()
& is_in_service(__plugin_name__)
diff --git a/ATRI/plugins/drifting-bottle/__init__.py b/ATRI/plugins/drifting-bottle/__init__.py
index 5bd514c..4d3182e 100644
--- a/ATRI/plugins/drifting-bottle/__init__.py
+++ b/ATRI/plugins/drifting-bottle/__init__.py
@@ -1,2 +1,5 @@
from ATRI.service import Service as sv
-from ATRI.rule import is_block, is_in_service \ No newline at end of file
+from ATRI.rule import is_block, is_in_service
+
+
+# 等待撰写 \ No newline at end of file
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
index 020a413..80a1c97 100644
--- a/ATRI/plugins/essential.py
+++ b/ATRI/plugins/essential.py
@@ -23,9 +23,10 @@ from nonebot.adapters.cqhttp import (
import ATRI
from ATRI.log import logger
from ATRI.exceptions import WriteError
-from ATRI.config import nonebot_config
+from ATRI.config import Config
from ATRI.rule import is_block
from ATRI.service import Service as sv
+from ATRI.utils.cqcode import coolq_code_check
PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services'
@@ -59,7 +60,7 @@ async def shutdown() -> None:
@driver.on_bot_connect
async def connect(bot) -> None:
- for superuser in nonebot_config["superusers"]:
+ for superuser in Config.BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(
int(superuser),
"WebSocket 成功连接,数据开始传输。"
@@ -68,7 +69,7 @@ async def connect(bot) -> None:
@driver.on_bot_disconnect
async def disconnect(bot) -> None:
- for superuser in nonebot_config["superusers"]:
+ for superuser in Config.BotSelfConfig.superusers:
try:
await sv.NetworkPost.send_private_msg(
int(superuser),
@@ -82,7 +83,7 @@ ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential'
os.makedirs(ESSENTIAL_DIR, exist_ok=True)
# 处理:好友请求
-request_friend_event = sv.on_request("Friends request", rule=is_block())
+request_friend_event = sv.on_request(rule=is_block())
@request_friend_event.handle()
async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
@@ -108,7 +109,7 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
except WriteError:
raise WriteError("Writing file failed!")
- for superuser in nonebot_config["superusers"]:
+ for superuser in Config.BotSelfConfig.superusers:
msg = (
"主人,收到一条好友请求:\n"
f"请求人:{event.get_user_id()}\n"
@@ -122,7 +123,7 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None:
# 处理:邀请入群,如身为管理,还附有入群请求
-request_group_event = sv.on_request("Group request",rule=is_block())
+request_group_event = sv.on_request(rule=is_block())
@request_group_event.handle()
async def _request_group_event(bot, event: GroupRequestEvent) -> None:
@@ -150,7 +151,7 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None:
except WriteError:
raise WriteError("Writing file failed!")
- for superuser in nonebot_config["superusers"]:
+ for superuser in Config.BotSelfConfig.superusers:
msg = (
"主人,收到一条入群请求:\n"
f"请求人:{event.get_user_id()}\n"
@@ -164,39 +165,39 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None:
# 处理群成员变动
-group_member_event = sv.on_notice("Group member change")
+group_member_event = sv.on_notice()
@group_member_event.handle()
-async def _group_member_event(bot: Bot, event) -> None:
- if isinstance(event, GroupIncreaseNoticeEvent):
+async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None:
+ msg = (
+ "好欸!事新人!\n"
+ f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!"
+ )
+ await group_member_event.finish(msg)
+
+@group_member_event.handle()
+async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None:
+ if event.is_tome():
msg = (
- "好欸!事新人!\n"
- f"在下 {choice(list(nonebot_config['nickname']))} 哒!w!"
+ "呜呜呜,主人"
+ f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..."
)
- await group_member_event.finish(msg)
-
- elif isinstance(event, GroupDecreaseNoticeEvent):
- if event.is_tome():
- msg = (
- "呜呜呜,主人"
- f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..."
+ for superuser in Config.BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
)
- for superuser in nonebot_config["superusers"]:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
- else:
- await group_member_event.finish(f"{event.user_id} 离开了我们...")
+ else:
+ await group_member_event.finish(f"{event.user_id} 离开了我们...")
# 处理群管理事件
-group_admin_event = sv.on_notice("Group admin change")
+group_admin_event = sv.on_notice()
@group_admin_event.handle()
async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None:
if event.is_tome():
- for superuser in nonebot_config["superusers"]:
+ for superuser in Config.BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(
user_id=int(superuser),
message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
@@ -204,7 +205,7 @@ async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None:
# 处理群禁言事件
-group_ban_event = sv.on_notice("Group ban change")
+group_ban_event = sv.on_notice()
@group_ban_event.handle()
async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
@@ -215,7 +216,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n"
f"时长...是 {event.duration} 秒"
)
- for superuser in nonebot_config["superusers"]:
+ for superuser in Config.BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(
user_id=int(superuser),
message=msg
@@ -225,7 +226,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
"好欸!主人\n"
f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!"
)
- for superuser in nonebot_config["superusers"]:
+ for superuser in Config.BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(
user_id=int(superuser),
message=msg
@@ -233,7 +234,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None:
# 处理群红包运气王事件
-lucky_read_bag_event = sv.on_notice("Group read bag winner")
+lucky_read_bag_event = sv.on_notice()
@lucky_read_bag_event.handle()
async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None:
@@ -245,7 +246,7 @@ async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None:
# 处理群文件上传事件
-group_file_upload_event = sv.on_notice("Group file change")
+group_file_upload_event = sv.on_notice()
@group_file_upload_event.handle()
async def _group_file_upload_event(bot,
@@ -254,51 +255,56 @@ async def _group_file_upload_event(bot,
# 处理撤回事件
-recall_event = sv.on_notice("Group member recall")
+recall_event = sv.on_notice()
@recall_event.handle()
-async def _recall_event(bot: Bot, event) -> None:
- if isinstance(event, GroupRecallNoticeEvent):
- repo = await bot.call_api(
- "get_msg",
- message_id=event.message_id
- )
- repo = str(repo["message"])
- if "CQ" in repo:
- repo = repo.replace("CQ", "QC")
+async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None:
+ group = event.group_id
+ repo = await bot.call_api(
+ "get_msg",
+ message_id=event.message_id
+ )
+ repo = str(repo["message"])
+ check = await coolq_code_check(repo, group=group)
+ if not check:
+ repo = repo.replace("CQ", "QC")
- msg = (
- "主人,咱拿到了一条撤回信息!\n"
- f"{event.user_id}@[群:{event.group_id}]\n"
- "撤回了\n"
- f"{repo}"
+ msg = (
+ "主人,咱拿到了一条撤回信息!\n"
+ f"{event.user_id}@[群:{event.group_id}]\n"
+ "撤回了\n"
+ f"{repo}"
+ )
+
+ await bot.send(event, "咱看到惹~!")
+ for superuser in Config.BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
)
- await bot.send(event, "咱看到惹~!")
- for superuser in nonebot_config["superusers"]:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
+@recall_event.handle()
+async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None:
+ user = event.user_id
+ repo = await bot.call_api(
+ "get_msg",
+ message_id=event.message_id
+ )
+ repo = str(repo["message"])
+ check = await coolq_code_check(repo, user)
+ if not check:
+ repo = repo.replace("CQ", "QC")
- elif isinstance(event, FriendRecallNoticeEvent):
- repo = await bot.call_api(
- "get_msg",
- message_id=event.message_id
- )
- repo = str(repo["message"])
- if "CQ" in repo:
- repo = repo.replace("CQ", "QC")
+ msg = (
+ "主人,咱拿到了一条撤回信息!\n"
+ f"{event.user_id}@[私聊]"
+ "撤回了\n"
+ f"{repo}"
+ )
- msg = (
- "主人,咱拿到了一条撤回信息!\n"
- f"{event.user_id}@[私聊]"
- "撤回了\n"
- f"{repo}"
+ await bot.send(event, "咱看到惹~!")
+ for superuser in Config.BotSelfConfig.superusers:
+ await sv.NetworkPost.send_private_msg(
+ user_id=int(superuser),
+ message=msg
)
-
- for superuser in nonebot_config["superusers"]:
- await sv.NetworkPost.send_private_msg(
- user_id=int(superuser),
- message=msg
- )
diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py
index 472c77c..f334bcf 100644
--- a/ATRI/plugins/funny.py
+++ b/ATRI/plugins/funny.py
@@ -1,9 +1,12 @@
+import asyncio
from pathlib import Path
from random import choice, randint
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
+from nonebot.adapters.cqhttp.message import Message
from ATRI.service import Service as sv
+from ATRI.utils.limit import is_too_exciting
from ATRI.rule import (
is_block,
is_in_dormant,
@@ -11,13 +14,18 @@ from ATRI.rule import (
)
-__plugin_name__ = "laugh"
+__doc__ = """
+看不懂的笑话
+权限组:所有人
+用法:
+ 来句笑话
+"""
get_laugh = sv.on_command(
- name="看不懂的笑话",
cmd="来句笑话",
+ docs=__doc__,
rule=is_block() & is_in_dormant()
- & is_in_service(__plugin_name__)
+ & is_in_service('来句笑话')
)
@get_laugh.handle()
@@ -34,12 +42,10 @@ async def _get_laugh(bot: Bot, event: MessageEvent) -> None:
await get_laugh.finish(result.replace("%name", user_name))
-__plugin_name__ = "wty"
-
me_to_you = sv.on_message(
- rule=is_block() & is_in_dormant() & is_in_service(__plugin_name__)
+ rule=is_block() & is_in_dormant() & is_in_service('你又行了')
)
-sv.manual_reg_service("你又彳亍了")
+sv.manual_reg_service('你又行了')
@me_to_you.handle()
async def _me_to_you(bot: Bot, event: MessageEvent) -> None:
@@ -47,3 +53,92 @@ async def _me_to_you(bot: Bot, event: MessageEvent) -> None:
msg = str(event.message)
if "我" in msg and "CQ" not in msg:
await me_to_you.finish(msg.replace("我", "你"))
+
+
+__doc__ = """
+随机选择一位群友成为我的老婆!
+权限组:所有人
+用法:
+ 抽老婆
+"""
+
+roll_wife = sv.on_command(
+ cmd="抽老婆",
+ docs=__doc__,
+ rule=is_block() & is_in_dormant() & is_in_service('抽老婆')
+)
+
+@roll_wife.handle()
+async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None:
+ user = event.user_id
+ group = event.group_id
+ user_name = await bot.get_group_member_info(group_id=group,
+ user_id=user)
+ user_name = user_name['nickname']
+ run = await is_too_exciting(user, group, 5, True)
+ if not run:
+ return
+
+ luck_list = await bot.get_group_member_list(group_id=group)
+ luck_user = choice(luck_list)
+ luck_qq = luck_user['user_id']
+ luck_user = luck_user['nickname']
+ msg = (
+ "5秒后咱将随机抽取一位群友成为\n"
+ f"{user_name} 的老婆!究竟是谁呢~?"
+ )
+ await bot.send(event, msg)
+ await asyncio.sleep(5)
+ msg = (
+ f"> {luck_user}({luck_qq})\n"
+ f"恭喜成为 {user_name} 的老婆~⭐"
+ )
+ await bot.send(event, msg)
+
+
+__doc__ = """
+伪造转发
+权限组:所有人
+用法:
+ /fm qq-name-msg...
+补充:
+ qq: QQ号
+ name: 消息中的ID
+ msg: 对应信息
+示例:
+ /fm 123456789*生草人*草 114514*仙贝*臭死了
+"""
+
+fake_msg = sv.on_command(
+ cmd="/fm",
+ docs=__doc__,
+ rule=is_block() & is_in_service('/fm') & is_in_dormant()
+)
+
+@fake_msg.handle()
+async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None:
+ msg = str(event.message).split(' ')
+ user = event.user_id
+ group = event.group_id
+ node = []
+ check = await is_too_exciting(user, group, 2, True)
+
+ if check:
+ for i in msg:
+ args = i.split('*')
+ print(args)
+ qq = args[0]
+ name = args[1].replace('[', '[')
+ name = name.replace(']', ']')
+ repo = args[2].replace('[', '[')
+ repo = repo.replace(']', ']')
+ dic = {
+ "type": "node",
+ "data": {
+ "name": name,
+ "uin": qq,
+ "content": repo
+ }
+ }
+ node.append(dic)
+ await bot.send_group_forward_msg(group_id=group, messages=node)
diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py
index 167ca2f..ce118cd 100644
--- a/ATRI/plugins/github.py
+++ b/ATRI/plugins/github.py
@@ -12,7 +12,6 @@ URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}"
github_issues = sv.on_message(rule=is_block() & is_in_dormant())
-sv.manual_reg_service("GitHubIssue速览")
@github_issues.handle()
async def _github_issues(bot: Bot, event: MessageEvent) -> None:
diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py
new file mode 100644
index 0000000..8744262
--- /dev/null
+++ b/ATRI/plugins/help.py
@@ -0,0 +1,67 @@
+import os
+import json
+
+from nonebot.adapters.cqhttp import Bot, MessageEvent
+
+from ATRI.service import SERVICE_DIR
+from ATRI.service import Service as sv
+from ATRI.rule import is_block
+
+
+SERVICE_DIR = SERVICE_DIR / 'services'
+
+
+__doc__ = """
+查询命令用法
+权限组:所有人
+用法:
+ /h
+ /h list
+ /h info (cmd)
+"""
+
+help = sv.on_command(
+ cmd="/h",
+ docs=__doc__,
+ aliases={'/help', '.help'},
+ rule=is_block()
+)
+
+async def _help(bot: Bot, event: MessageEvent) -> None:
+ msg = str(event.message).split(' ')
+ if not msg:
+ msg = (
+ "呀?找不到路了?\n"
+ "/h list 查看可用命令列表\n"
+ "/h info (cmd) 查看命令具体帮助\n"
+ "项目地址:github.com/Kyomotoi/ATRI\n"
+ "咱只能帮你这么多了qwq"
+ )
+ await help.finish(msg)
+ elif msg[0] == "list":
+ files = []
+ for _, _, i in os.walk(SERVICE_DIR):
+ for a in i:
+ files.append(a.replace('.json', ''))
+ cmds = " ".join(map(str, files))
+ msg = "咱能做很多事!比如:\n" + cmds
+ msg0 = msg + "\n具体用法呢,/(cmd) 就好!"
+ await help.finish(msg0)
+ elif msg[0] == "info":
+ cmd = msg[1]
+ data = {}
+ path = SERVICE_DIR / f"{cmd.replace('/', '')}.json"
+ try:
+ data = json.loads(path.read_bytes())
+ except:
+ await help.finish('未找到相关命令...')
+
+ msg = (
+ f"{cmd} INFO:\n"
+ f"Enabled: {data['enabled']}\n"
+ f"{data['docs']}"
+ )
+ await help.finish(msg)
+ else:
+ await help.finish('请检查输入...')
diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py
index 3754125..e7cfe61 100644
--- a/ATRI/plugins/hitokoto.py
+++ b/ATRI/plugins/hitokoto.py
@@ -19,14 +19,23 @@ HITOKOTO_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'hitokoto'
sick_list = []
-__plugin_name__ = 'hitokoto'
+__doc__ = """
+抑郁一下
+权限组:所有人
+用法:
+ @一言
+ 抑郁一下
+ 网抑云
+补充:
+ @:at Bot
+"""
hitokoto = sv.on_command(
- name="Hitokoto",
cmd="一言",
+ docs=__doc__,
aliases={"抑郁一下", "网抑云"},
rule=is_block() & is_in_dormant()
- & is_in_service(__plugin_name__) & to_bot()
+ & is_in_service('一言') & to_bot()
)
@hitokoto.handle()
diff --git a/ATRI/plugins/key-repo/__init__.py b/ATRI/plugins/key-repo/__init__.py
index 4b2241c..dc606b0 100644
--- a/ATRI/plugins/key-repo/__init__.py
+++ b/ATRI/plugins/key-repo/__init__.py
@@ -1,171 +1,185 @@
-import os
-import json
-import time
-from random import choice
-from pathlib import Path
+import string
+from datetime import datetime
+from random import choice, sample
-from nonebot.typing import T_State
-from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
-from ATRI.config import nonebot_config
+from ATRI.config import Config
from ATRI.service import Service as sv
+from ATRI.utils.request import get_bytes
from ATRI.rule import is_block, is_in_dormant, is_in_service, to_bot
-
-# 此功能未完善
+from .data_source import (
+ add_history,
+ add_key_temp,
+ load_key_data,
+ add_key,
+ load_key_history,
+ load_key_temp_data,
+ del_key_temp
+)
-KEYREPO_DIV = Path('.') / 'ATRI' / 'data' / 'database' / 'KeyRepo'
-os.makedirs(KEYREPO_DIV, exist_ok=True)
+# 此功能暂未完善:未添加关键词删除
+# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+# !屎山注意!屎山注意!屎山注意!屎山注意!
+# !请自备降压药!请自备降压药!
+# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-__plugin_name__ = "KeyRepo"
keyrepo = sv.on_message(rule=is_block()
& is_in_dormant()
- & is_in_service(__plugin_name__)
+ & is_in_service('keyrepo')
& to_bot())
@keyrepo.handle()
async def _keyrepo(bot: Bot, event: MessageEvent) -> None:
msg = str(event.get_message())
-
- file_name = "data.json"
- path = KEYREPO_DIV / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- with open(path, 'w') as r:
- r.write(json.dumps({}))
- data = {}
+ data = load_key_data()
for key in data.keys():
if key in msg:
await keyrepo.finish(choice(data[key]))
+__doc__ = """
+关键词申请/审核
+权限组:所有人
+用法:
+ /train add (key) (repo)
+ 对于维护者:
+ /train list
+ /train info (key)
+ /train r (code) (0,1)
+补充:
+ key: 关键词
+ repo: 回复
+ 0,1: 对应布尔值False/True
+ code: 唯一识别码
+示例:
+ /train add hso 好涩哦
+"""
+
train = sv.on_command(
- name="调教",
cmd="/train",
+ docs=__doc__,
rule=is_block()
)
[email protected]("key", prompt="哦哦哦要开始学习了!请告诉咱知识点")
-async def _train(bot: Bot, event: MessageEvent, state: T_State) -> None:
- if "[CQ" in state["key"]:
- await train.reject("仅支持纯文本呢...")
-
[email protected]("repo", prompt="咱该如何回答呢?")
-async def _trainR(bot: Bot, event: MessageEvent, state: T_State) -> None:
- if "[CQ" in state["repo"]:
- await train.reject("仅支持纯文本呢...")
-
- if state["key"] == "-d":
- file_name = "review.json"
- path = KEYREPO_DIV / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
-
- key = state["repo"]
- if key not in data:
- await train.finish("未发现该待审核的知识点呢...")
- else:
+async def _train(bot: Bot, event: GroupMessageEvent) -> None:
+ user = event.user_id
+ group = event.group_id
+
+ msg = str(event.message).split(' ')
+ _type = msg[0]
+ code = "".join(sample(string.ascii_letters + string.digits, 10))
+
+ if _type == "add":
+ key = msg[1]
+ args = msg[2]
+ if user in Config.BotSelfConfig.superusers:
+ add_key(key, args)
msg = (
- f"Key: {key}\n"
- f"Repo: {data[key]['repo']}\n"
- "已经从咱的审核列表移除!"
+ "好欸学到了新的知识!\n"
+ f"关键词:{key}\n"
+ f"回复:{args}"
)
- del data[key]
- with open(path, 'w') as r:
- r.write(json.dumps(data))
- await train.finish(msg)
- elif state["key"] == "-i":
- file_name = "review.json"
- path = KEYREPO_DIV / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
- if state["repo"] not in data:
- await train.finish("未发现该知识点呢")
- key = data[state["repo"]]
-
- msg = (
- f"用户: {key['user']}\n"
- f"知识点: {state['repo']}"
- f"回复: {key['repo']}"
- f"时间: {key['time']}"
- "/train -r 知识点 y/n"
- )
- await train.finish(msg)
- elif state["key"] == "-ls":
- file_name = "review.json"
- path = KEYREPO_DIV / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
- keys = ",".join(data.keys())
- msg = f"目前等待审核的有如下:\n{keys}"
- await train.finish(msg)
- elif state["key"] == "-r":
- file_name = "review.json"
- path = KEYREPO_DIV / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
-
-
- key = state["key"]
- repo = state["repo"]
- user = event.get_user_id()
- if user not in nonebot_config["superusers"]:
- file_name = "review.json"
- path = KEYREPO_DIV / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
-
- if key in data:
- msg = "欸欸欸,该词还在等待咱的审核,请先等先来的审核完再提交吧..."
+ data = {
+ "user": user,
+ "group": group,
+ "time": str(datetime.now()),
+ "pass": True,
+ "key": key,
+ "repo": args,
+ "feature": code
+ }
+ add_history(data)
await train.finish(msg)
else:
- data[key] = {
+ data = {
"user": user,
- "repo": repo,
- "time": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
+ "group": group,
+ "time": str(datetime.now()),
+ "pass": False,
+ "key": key,
+ "repo": args,
+ "feature": code
}
- with open(path, 'r') as r:
- r.write(json.dumps(data, indent=4))
-
+ add_key_temp(data)
msg = (
- "欸欸欸这不错欸!不过,还是先等待咱审核审核,"
- "如想撤销本次学习,请发送 /train -d 知识点"
+ "感谢你的提交w,所提交的关键词将由维护者进行审核\n"
+ f"识别码:{code},你可以使用/train info 识别码\n"
+ "以查询是否通过"
)
await train.finish(msg)
-
- else:
- file_name = "data.json"
- path = KEYREPO_DIV / file_name
- try:
- data = json.loads(path.read_bytes())
- except:
- data = {}
-
- if key in data:
- repo_list: list = data[key]
- repo_list.append(repo)
- data[key] = repo_list
- msg = f"哦哦哦,{key}原来还有这样的回复,学到了~!"
- await bot.send(event, msg)
+ elif _type == "list":
+ data = load_key_temp_data()
+ node = []
+ for i in data:
+ dic = {
+ "type": "node",
+ "data": {
+ "name": "idk",
+ "uin": i['user'],
+ "content": f"Key: {i['key']}\nRepo: {i['repo']}\nTime: {i['time']}"
+ }
+ }
+ node.append(dic)
+ if not node:
+ node = [{
+ "type": "node",
+ "data": {
+ "name": "null",
+ "uin": str(user),
+ "content": "这里什么也没有呢..."
+ }
+ }]
+ await bot.send_group_forward_msg(group_id=group, messages=node)
+ elif _type == "info":
+ key = msg[1]
+ data = load_key_history()
+ for i in data:
+ if i['key'] == key:
+ msg = (
+ f"{key} 审核信息:\n"
+ f"是否通过:{i['pass']}"
+ f"结果: K: {i['key']} | R: {i['repo']}\n"
+ f"来自:{i['user']}@[群:{i['group']}]\n"
+ f"申请时间:{i['time']}"
+ )
+ await train.finish(msg)
+ else:
+ await train.finish('未找到相关信息...')
+ elif _type == "r":
+ key = msg[1]
+ args = int(msg[2])
+ data = load_key_temp_data()
+ if user in Config.BotSelfConfig.superusers:
+ if args not in [0, 1]:
+ await train.finish('请检查输入...')
+ else:
+ for i in data:
+ if bool(args):
+ if i['key'] == key:
+ msg = (
+ "好欸学到了新的知识!\n"
+ f"关键词:{i['key']}\n"
+ f"回复:{i['repo']}"
+ )
+ add_key(i['key'], i['repo'])
+ add_history(i)
+ await train.finish(msg)
+ else:
+ await train.finish('未找到相关信息...')
+ else:
+ add_history(i, False)
+ if del_key_temp(i):
+ await train.finish('已标记为不通过')
+ else:
+ await train.finish('未找到相关信息')
else:
- data[key] = [repo]
- msg = "好欸,咱学到了新的知识点!"
- await bot.send(event, msg)
-
- with open(path, 'w') as r:
- r.write(json.dumps(data))
+ await train.finish('不行哦~你的权限使得你没法这样做!')
+ else:
+ await train.finish('请检查输入...')
diff --git a/ATRI/plugins/key-repo/data_source.py b/ATRI/plugins/key-repo/data_source.py
new file mode 100644
index 0000000..3dd331d
--- /dev/null
+++ b/ATRI/plugins/key-repo/data_source.py
@@ -0,0 +1,116 @@
+import os
+import json
+from pathlib import Path
+from typing import Optional
+
+
+KEYREPO_DIV = Path('.') / 'ATRI' / 'data' / 'database' / 'KeyRepo'
+os.makedirs(KEYREPO_DIV, exist_ok=True)
+
+
+def load_key_data() -> dict:
+ file_name = "data.json"
+ path = KEYREPO_DIV / file_name
+ try:
+ data = json.loads(path.read_bytes())
+ except:
+ with open(path, 'w') as r:
+ r.write(json.dumps({}))
+ data = {}
+ return data
+
+
+def load_key_temp_data() -> list:
+ file_name = "data.temp.json"
+ path = KEYREPO_DIV / file_name
+ try:
+ data = json.loads(path.read_bytes())
+ except:
+ with open(path, 'w') as r:
+ r.write(json.dumps([]))
+ data = []
+ return data
+
+
+def load_key_history() -> list:
+ file_name = "data.history.json"
+ path = KEYREPO_DIV / file_name
+ try:
+ data = json.loads(path.read_bytes())
+ except:
+ with open(path, 'w') as r:
+ r.write(json.dumps([]))
+ data = []
+ return data
+
+
+def save_key_data(d: dict) -> None:
+ file_name = "data.json"
+ path = KEYREPO_DIV / file_name
+ with open(path, 'w') as r:
+ r.write(json.dumps(d))
+
+
+def save_key_temp_data(d: list) -> None:
+ file_name = "data.temp.json"
+ path = KEYREPO_DIV / file_name
+ with open(path, 'w') as r:
+ r.write(json.dumps(d))
+
+
+def save_key_history_data(d: list) -> None:
+ file_name = "data.history.json"
+ path = KEYREPO_DIV / file_name
+ with open(path, 'w') as r:
+ r.write(json.dumps(d))
+
+
+def add_key(key: str, repo: str) -> str:
+ data = load_key_data()
+ data_1 = data.get(key, [])
+ if repo in data_1:
+ return "该回复已存在~!"
+ data_1.append(repo)
+ data[key] = data_1
+ save_key_data(data)
+ return "成功,又学到新知识了~!"
+
+
+def add_key_temp(d: dict) -> None:
+ data: list = load_key_temp_data()
+ data.append(d)
+ save_key_temp_data(data)
+ add_history(d, False)
+
+
+def add_history(d: dict, p: bool = True) -> None:
+ d['pass'] = p
+ data: list = load_key_history()
+ data.append(d)
+ save_key_history_data(data)
+
+
+def del_key(key: str, repo: str) -> str:
+ data = load_key_data()
+ if repo == 'isALL':
+ del data[key]
+ msg = f"成功删除关键词[{key}]下所有回复..."
+ else:
+ data_1 = data.get(key, [])
+ try:
+ data_1.remove(key)
+ except KeyError:
+ raise KeyError('Find repo error.')
+ data[key] = data_1
+ msg = f"成功删除关键词[{key}]下回复:{repo}"
+ save_key_data(data)
+ return msg
+
+
+def del_key_temp(d: dict) -> bool:
+ data = load_key_temp_data()
+ if d in data:
+ data.remove(d)
+ return True
+ else:
+ return False
diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py
new file mode 100644
index 0000000..10e74fb
--- /dev/null
+++ b/ATRI/plugins/nsfw.py
@@ -0,0 +1,103 @@
+import re
+import json
+
+from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
+from nonebot.typing import T_State
+
+from ATRI.log import logger as log
+from ATRI.config import Config
+from ATRI.service import Service as sv
+from ATRI.exceptions import RequestTimeOut
+from ATRI.rule import is_block, is_in_dormant, is_in_service
+from ATRI.utils.request import get_bytes
+from ATRI.utils.cqcode import coolq_code_check
+
+
+nsfw_url = (
+ f"http://{Config.NsfwCheck.host}:"
+ f"{Config.NsfwCheck.port}/?url="
+)
+
+
+nsfw_checking = sv.on_message()
+
+@nsfw_checking.handle()
+async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None:
+ if Config.NsfwCheck.enabled:
+ msg = str(event.message)
+ user = event.user_id
+ group = event.group_id
+ check = await coolq_code_check(msg, user, group)
+ if check:
+ url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0]
+ try:
+ data = json.loads(await get_bytes(url))
+ except:
+ log.warning('检测涩图失败,请查阅文档以获取帮助')
+ return
+ if round(data['score'], 4) > 0.6:
+ score = "{:.2%}".format(round(data['score'], 4))
+ log.debug(f'截获涩图,得分:{score}')
+ await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!')
+ for sup in Config.BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=f"{msg}\n涩值: {score}")
+ else:
+ pass
+
+
+__doc__ = """
+检测你图片的涩值
+权限组:所有人
+用法:
+ /nsfw (pic)
+补充:
+ pic: 图片
+示例:
+ /nsfw 然后Bot会向你索取图片
+"""
+
+nsfw_reading = sv.on_command(
+ cmd="/nsfw",
+ docs=__doc__,
+ rule=is_block() & is_in_service('/nsfw') & is_in_dormant()
+)
+
+@nsfw_reading.handle()
+async def _nsfw_r(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
+ user = event.user_id
+ group = event.group_id
+ msg = str(event.message).strip()
+ check = await coolq_code_check(msg, user, group)
+ if check and msg:
+ state['pic'] = msg
+
+@nsfw_reading.got('pic', prompt='请提供一张图片')
+async def _nsfw_reading(bot: Bot,
+ event: GroupMessageEvent,
+ state: T_State) -> None:
+ msg = state['pic']
+ pic = re.findall(r"url=(.*?)]", msg)
+ if not pic:
+ await nsfw_reading.reject('请发送图片而不是其它东西!!')
+
+ url = nsfw_url + pic[0]
+ try:
+ data = json.loads(await get_bytes(url))
+ except RequestTimeOut:
+ raise RequestTimeOut('Time out!')
+
+ score = round(data['score'], 4)
+ result = "{:.2%}".format(round(data['score'], 4))
+ if score > 0.9:
+ level = "hso! 我要发给主人看!"
+ for sup in Config.BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=f"{state['pic']}\n涩值: {result}")
+ elif 0.9 > score >= 0.6:
+ level = "嗯,可冲"
+ else:
+ level = "?能不能换张55完全冲不起来"
+
+ repo = f"涩值:{result}\n{level}"
+ await nsfw_reading.finish(repo) \ No newline at end of file
diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py
index 7c8179d..8404da6 100644
--- a/ATRI/plugins/rich/__init__.py
+++ b/ATRI/plugins/rich/__init__.py
@@ -14,7 +14,7 @@ from ATRI.rule import (
from .data_source import dec
-waiting_list = []
+temp_list = []
bilibili_rich = sv.on_message(
@@ -24,17 +24,10 @@ sv.manual_reg_service("监听b站小程序")
@bilibili_rich.handle()
async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
- global waiting_list
+ global temp_list
msg = str(event.raw_message).replace("\\", "")
- user = event.user_id
bv = False
- if count_list(waiting_list, user) == 5:
- waiting_list = del_list_aim(waiting_list, user)
- return
-
- waiting_list.append(user)
-
if "qqdocurl" not in msg:
if "av" in msg:
av = re.findall(r"(av\d+)", msg)[0].replace('av', '')
@@ -60,6 +53,13 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
else:
return
+ if count_list(temp_list, av) == 4:
+ await bot.send(event, "你是怕别人看不到么发这么多次?")
+ temp_list = del_list_aim(temp_list, av)
+ return
+
+ temp_list.append(av)
+
try:
URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}"
except:
diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py
index c3b033c..25e4e0b 100644
--- a/ATRI/plugins/status.py
+++ b/ATRI/plugins/status.py
@@ -6,12 +6,12 @@ from ATRI.service import Service as sv
from ATRI.rule import is_block
from ATRI.exceptions import GetStatusError
from ATRI.utils.apscheduler import scheduler
-from ATRI.config import nonebot_config
+from ATRI.config import Config
ping = sv.on_command(
- name="测试机器人",
cmd="/ping",
+ docs="测试机器人",
rule=is_block()
)
@@ -21,8 +21,8 @@ async def _ping(bot: Bot, event: MessageEvent) -> None:
status = sv.on_command(
- name="检查机器人状态",
cmd="/status",
+ docs="检查机器人状态",
rule=is_block()
)
@@ -98,7 +98,7 @@ async def _():
f"* netRECV: {inteRECV}MB\n"
) + msg
- for sup in nonebot_config["superusers"]:
+ for sup in Config.BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(
user_id=sup,
message=msg0
diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py
index 10b3317..b23a01f 100644
--- a/ATRI/plugins/utils/__init__.py
+++ b/ATRI/plugins/utils/__init__.py
@@ -7,16 +7,25 @@ from ATRI.rule import (
is_in_dormant,
is_in_service
)
-from .data_source import roll_dice
+from .data_source import roll_dice, Encrypt
-__plugin_name__ = "roll"
+__doc__ = """
+roll一下
+权限组:所有人
+用法:
+ /roll (int)d(int)+...
+补充:
+ int: 阿拉伯数字
+示例:
+ /roll 1d10+10d9+4d5+2d3
+"""
roll = sv.on_command(
- name="roll一下",
cmd="/roll",
+ docs=__doc__,
rule=is_block() & is_in_dormant()
- & is_in_service(__plugin_name__)
+ & is_in_service('/roll')
)
@roll.handle()
@@ -34,3 +43,36 @@ async def _(bot: Bot, event: MessageEvent, state: dict) -> None:
await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10")
await roll.finish(roll_dice(resu))
+
+
+__doc__ = """
+加密你的信息!
+权限组:所有人
+用法:
+ /enc e,d msg
+补充:
+ e,d:对应 编码/解码
+ msg: 目标内容
+示例:
+ /enc e アトリは高性能ですから!
+"""
+
+encrypt = sv.on_command(
+ cmd="/enc",
+ docs=__doc__,
+ rule=is_block() & is_in_service('/enc') & is_in_dormant()
+)
+
+async def _encrypt(bot: Bot, event: MessageEvent) -> None:
+ msg = str(event.message).split(' ')
+ _type = msg[0]
+ s = msg[1]
+ e = Encrypt()
+
+ if _type == "e":
+ await encrypt.finish(e.encode(s))
+ elif _type == "d":
+ await encrypt.finish(e.decode(s))
+ else:
+ await encrypt.finish('请检查输入~!')
diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py
index f41a1d1..71da581 100644
--- a/ATRI/plugins/utils/data_source.py
+++ b/ATRI/plugins/utils/data_source.py
@@ -1,5 +1,7 @@
import re
import random
+from math import floor
+from typing import Union
def roll_dice(par: str) -> str:
@@ -32,3 +34,140 @@ def roll_dice(par: str) -> str:
result = f"{par}=({proc})={result}"
return result
+
+
+class Encrypt():
+ cr = 'ĀāĂ㥹ÀÁÂÃÄÅ'
+ cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ'
+ cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr'
+ cb = 'ĨĩĪīĬĭĮįİı'
+
+ sr = len(cr)
+ sc = len(cc)
+ sn = len(cn)
+ sb = len(cb)
+ src = sr * sc
+ snb = sn * sb
+ scnb = sc * snb
+
+ def _div(self, a: int, b: int) -> int:
+ return floor(a / b)
+
+ def _encodeByte(self, i) -> Union[str, None]:
+ if i > 0xFF:
+ raise ValueError('ERROR! rc/nb overflow')
+
+ if i > 0x7F:
+ i = i & 0x7F
+ return self.cn[self._div(i, self.sb) + int(self.cb[i % self.sb])]
+
+ return self.cr[self._div(i, self.sc) + int(self.cc[i % self.sc])]
+
+ def _encodeShort(self, i) -> str:
+ if i > 0xFFFF:
+ raise ValueError('ERROR! rcnb overflow')
+
+ reverse = False
+ if i > 0x7FFF:
+ reverse = True
+ i = i & 0x7FFF
+
+ char = [
+ self._div(i, self.scnb),
+ self._div(i % self.scnb, self.snb),
+ self._div(i % self.snb, self.sb), i % self.sb
+ ]
+ char = [
+ self.cr[char[0]], self.cc[char[1]], self.cn[char[2]],
+ self.cb[char[3]]
+ ]
+
+ if reverse:
+ return char[2] + char[3] + char[0] + char[1]
+
+ return ''.join(char)
+
+ def _decodeByte(self, c) -> int:
+ nb = False
+ idx = [self.cr.index(c[0]), self.cc.index(c[1])]
+ if idx[0] < 0 or idx[1] < 0:
+ idx = [self.cn.index(c[0]), self.cb.index(c[1])]
+ nb = True
+ raise ValueError('ERROR! rc/nb overflow')
+
+ result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1]
+ if result > 0x7F:
+ raise ValueError('ERROR! rc/nb overflow')
+
+ return result | 0x80 if nb else 0
+
+ def _decodeShort(self, c) -> int:
+ reverse = c[0] not in self.cr
+ if not reverse:
+ idx = [
+ self.cr.index(c[0]),
+ self.cc.index(c[1]),
+ self.cn.index(c[2]),
+ self.cb.index(c[3])
+ ]
+ else:
+ idx = [
+ self.cr.index(c[2]),
+ self.cc.index(c[3]),
+ self.cn.index(c[0]),
+ self.cb.index(c[1])
+ ]
+
+ if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0:
+ raise ValueError('ERROR! not rcnb')
+
+ result = idx[0] * self.scnb + idx[1] * self.snb + idx[
+ 2] * self.sb + idx[3]
+ if result > 0x7FFF:
+ raise ValueError('ERROR! rcnb overflow')
+
+ result |= 0x8000 if reverse else 0
+ return result
+
+ def _encodeBytes(self, b) -> str:
+ result = []
+ for i in range(0, (len(b) >> 1)):
+ result.append(self._encodeShort((b[i * 2] << 8 | b[i * 2 + 1])))
+
+ if len(b) & 1 == 1:
+ result.append(self._encodeByte(b[-1]))
+
+ return ''.join(result)
+
+ def encode(self, s: str, encoding: str = 'utf-8'):
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ return self._encodeBytes(s.encode(encoding))
+
+ def _decodeBytes(self, s: str):
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ if len(s) & 1:
+ raise ValueError('ERROR length')
+
+ result = []
+ for i in range(0, (len(s) >> 2)):
+ result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8]))
+ result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF
+ ]))
+
+ if (len(s) & 2) == 2:
+ result.append(bytes([self._decodeByte(s[-2:])]))
+
+ return b''.join(result)
+
+ def decode(self, s: str, encoding: str = 'utf-8') -> str:
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ try:
+ return self._decodeBytes(s).decode(encoding)
+ except UnicodeDecodeError:
+ raise ValueError('Decoding failed')