diff options
Diffstat (limited to 'ATRI/plugins')
-rw-r--r-- | ATRI/plugins/admin.py | 116 | ||||
-rw-r--r-- | ATRI/plugins/anime-search/__init__.py | 4 | ||||
-rw-r--r-- | ATRI/plugins/anti-rubbish.py | 166 | ||||
-rw-r--r-- | ATRI/plugins/call-owner.py | 8 | ||||
-rw-r--r-- | ATRI/plugins/code-runner.py | 2 | ||||
-rw-r--r-- | ATRI/plugins/curse/__init__.py | 2 | ||||
-rw-r--r-- | ATRI/plugins/drifting-bottle/__init__.py | 5 | ||||
-rw-r--r-- | ATRI/plugins/essential.py | 152 | ||||
-rw-r--r-- | ATRI/plugins/funny.py | 111 | ||||
-rw-r--r-- | ATRI/plugins/github.py | 1 | ||||
-rw-r--r-- | ATRI/plugins/help.py | 67 | ||||
-rw-r--r-- | ATRI/plugins/hitokoto.py | 15 | ||||
-rw-r--r-- | ATRI/plugins/key-repo/__init__.py | 280 | ||||
-rw-r--r-- | ATRI/plugins/key-repo/data_source.py | 116 | ||||
-rw-r--r-- | ATRI/plugins/nsfw.py | 103 | ||||
-rw-r--r-- | ATRI/plugins/rich/__init__.py | 18 | ||||
-rw-r--r-- | ATRI/plugins/status.py | 8 | ||||
-rw-r--r-- | ATRI/plugins/utils/__init__.py | 50 | ||||
-rw-r--r-- | ATRI/plugins/utils/data_source.py | 139 |
19 files changed, 945 insertions, 418 deletions
diff --git a/ATRI/plugins/admin.py b/ATRI/plugins/admin.py index 9465130..cd902e4 100644 --- a/ATRI/plugins/admin.py +++ b/ATRI/plugins/admin.py @@ -12,6 +12,7 @@ from nonebot.adapters.cqhttp import ( ) from nonebot.typing import T_State +from ATRI.config import Config from ATRI.service import Service as sv from ATRI.exceptions import WriteError, load_error from ATRI.utils.file import open_file @@ -80,11 +81,12 @@ async def _chat_monitor(bot: Bot, event: GroupMessageEvent) -> None: else: pass + ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' request_friend = sv.on_command( - name="好友申请处理", cmd="好友申请", + docs="好友申请处理", permission=SUPERUSER ) @@ -123,8 +125,8 @@ async def _request_friend(bot: Bot, event: MessageEvent) -> None: request_group = sv.on_command( - name="群聊申请处理", cmd="群聊申请", + docs="群聊申请处理", permission=SUPERUSER ) @@ -173,8 +175,8 @@ async def _request_group(bot: Bot, event: MessageEvent) -> None: broadcast = sv.on_command( - name="广播", - cmd="/broadcast", + cmd="/bc", + docs="广播\n用法:/bc 广播内容", permission=SUPERUSER ) @@ -214,8 +216,8 @@ async def _bd(bot: Bot, event: MessageEvent, state: T_State) -> None: track_error = sv.on_command( - name="错误堆栈查看", cmd="/track", + docs="报错堆栈查看\n用法:/track 追踪ID", permission=SUPERUSER ) @@ -246,8 +248,8 @@ async def _(bot: Bot, event: MessageEvent, state: T_State) -> None: get_log = sv.on_command( - name="获取控制台信息", cmd="/getlog", + docs="获取控制台信息\n用法:/getlog 等级:info,warning,debug 行数:比如-20即最近20行", permission=SUPERUSER ) @@ -282,8 +284,8 @@ async def _get_log(bot: Bot, event: MessageEvent) -> None: shutdown = sv.on_command( - name="紧急停机", - cmd="/shutdown", + cmd="/st", + docs="紧急停机", permission=SUPERUSER ) @@ -300,3 +302,101 @@ async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: exit(0) else: await shutdown.finish("再考虑下先吧 ;w;") + + +__doc__ = """ +懒得和你废话,block了 +权限组:维护者 +用法: + /b user,group 0,1 +补充: + user:QQ号 + group:QQ群号 + 0,1:对应布尔值False, True + 范围为全局 +""" + +block = sv.on_command( + cmd="/b", + docs="懒得和你废话,block了\n用法:/b u,g 0,1", + permission=SUPERUSER +) + +async def _block(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + _type = msg[0] + arg = int(msg[1]) + is_enabled = bool(int(msg[2])) + b_type = "" + + status = "封禁" if is_enabled else "解封" + + if _type == "g": + sv.BlockSystem.control_list(is_enabled=is_enabled, group=arg) + b_type = "Group" + elif _type == "u": + sv.BlockSystem.control_list(is_enabled, user=arg) + b_type = "User" + else: + await block.finish("请检查输入...") + + await block.finish(f"已成功将[{b_type}@{arg}]{status}") + + +__doc__ = """ +功能开关控制 +权限组:维护者,群管理 +用法: + 对于维护者: + /s 目标指令 u+int,g+int,global 0,1 + 对于群管理: + /s 目标指令 0,1 +补充: + user:QQ号 + group:QQ群号 + global:全局 + 0,1:对应布尔值False, True +示例: + 对于维护者: + /s /status u123456789 0 + 对于群管理: + /s /status 0 +""" + +service_control = sv.on_command( + cmd="/s", + docs=__doc__, + permission=SUPERUSER +) + +@service_control.handle() +async def _service_control(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + user = event.user_id + cmd = msg[0] + _type = msg[1] + is_enabled = bool(msg[2]) + + status = "封禁" if is_enabled else "解封" + + if user in Config.BotSelfConfig.superusers: + if _type == "global": + sv.control_service(cmd, True, is_enabled) + else: + if "u" in _type: + qq = _type.replace('u', '') + sv.control_service(cmd, False, is_enabled, user=int(qq)) + elif "g" in _type: + group = _type.replace('g', '') + sv.control_service(cmd, False, is_enabled, group=int(group)) + else: + await service_control.finish("请检查输入~!") + else: + if isinstance(event, GroupMessageEvent): + group = event.group_id + sv.control_service(cmd, False, bool(_type), group=group) + else: + await service_control.finish("此功能仅在群聊中触发") + + await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]") diff --git a/ATRI/plugins/anime-search/__init__.py b/ATRI/plugins/anime-search/__init__.py index 07cc8dc..5759f19 100644 --- a/ATRI/plugins/anime-search/__init__.py +++ b/ATRI/plugins/anime-search/__init__.py @@ -18,8 +18,8 @@ URL = "https://trace.moe/api/search?url=" anime_search = sv.on_command( - name="以图搜番", cmd="/anime", + docs="以图搜番", rule=is_block() & is_in_dormant() ) @@ -31,7 +31,7 @@ async def _anime_search(bot: Bot, if msg: state["msg"] = msg -@anime_search.got("msg", prompt="请发送咱一张图片~!") +@anime_search.got("msg", prompt="请告诉咱目标图片~!") async def _(bot: Bot, event: MessageEvent, state: T_State) -> None: diff --git a/ATRI/plugins/anti-rubbish.py b/ATRI/plugins/anti-rubbish.py deleted file mode 100644 index de8e1e1..0000000 --- a/ATRI/plugins/anti-rubbish.py +++ /dev/null @@ -1,166 +0,0 @@ -import json -from pathlib import Path -from datetime import datetime -from typing import Optional - -from nonebot.plugin import on_command, on_message -from nonebot.adapters.cqhttp import Bot, GroupMessageEvent - -from ATRI.log import logger -from ATRI.rule import is_block -from ATRI.config import nonebot_config -from ATRI.utils.file import write_file -from ATRI.utils.apscheduler import scheduler -from ATRI.exceptions import WriteError - - -RUBBISH_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'anti-rubbish' -now_time = datetime.now().strftime('%Y%m%d-%H%M%S') - - -# 365 x 24 不间断监听啥b发屎人 -# 日您🐎的,自己在厕所吃完自助餐还不够,还要分享给群友 -# 👴就搁这公示公示这些啥b -# 每日0点如有发屎记录则全群通报 -anti_rubbish = on_message() - -@anti_rubbish.handle() -async def _anti_rubbish(bot: Bot, event: GroupMessageEvent) -> None: - msg = str(event.message).strip() - user = int(event.user_id) - group = event.group_id - key_word: dict = NoobRubbish.get_keyword() - noob_data: dict = NoobRubbish.read_noobs(group) - noob_data.setdefault(user, {}) - - for key in key_word.keys(): - if key in msg: - noob_data[user].setdefault(key, 1) - noob_data[user][key] += 1 - await write_file(NoobRubbish._get_noob(group), json.dumps(noob_data)) - logger.info( - f"GET 吃屎人 {user}[@群{group}] 第{noob_data[user][key]}次: {msg}") - - -rubbish = on_command("/rubbish", rule=is_block()) - -async def _rubbish(bot: Bot, event: GroupMessageEvent) -> None: - cmd = str(event.message).split(" ") - user = int(event.user_id) - group = event.group_id - - if cmd[0] == "list": - noob_data: dict = NoobRubbish.read_noobs(group) - noob_list = "" - for key in noob_data.keys(): - noob_list += f"{key}\n" - - if not noob_list: - await rubbish.finish("此群很干净呢~!") - else: - msg = ( - f"截至{now_time}\n" - f"吃过厕所自助餐的有:\n" - ) + noob_list - await rubbish.finish(msg) - - elif cmd[0] == "read": - try: - user = cmd[1] - except: - await rubbish.finish("格式/rubbish read qq") - - noob_data: dict = NoobRubbish.read_noob(group, int(user)) - if not noob_data: - await rubbish.finish("该群友很干净!") - else: - noob_keys = "" - for key in noob_data.keys(): - noob_keys += f"{key}-{noob_data[key]}次\n" - msg = ( - f"截至{now_time}\n" - f"此群友吃的屎的种类,以及次数:\n" - ) + noob_keys - await rubbish.finish(msg) - - elif cmd[0] == "update": - if user not in nonebot_config["superusers"]: - await rubbish.finish("没权限呢...") - - key_word = cmd[1] - data = NoobRubbish.get_keyword() - data[key_word] = now_time - await NoobRubbish.store_keyword(data) - await rubbish.finish(f"勉強しました!\n[{key_word}]") - - elif cmd[0] == "del": - if user not in nonebot_config["superusers"]: - await rubbish.finish("没权限呢...") - - key_word = cmd[1] - data = NoobRubbish.get_keyword() - del data[key_word] - await NoobRubbish.store_keyword(data) - await rubbish.finish(f"清除~![{key_word}]") - - else: - await rubbish.finish("请检查格式~!详细请查阅文档") - - -# @scheduler.scheduled_job( -# "cron", -# hour=0, -# misfire_grace_time=120 -# ) -# async def _(): -# group = GroupMessageEvent.group_id -# noob_data = NoobRubbish.read_noobs(group) - - -class NoobRubbish: - @staticmethod - def _get_noob(group: Optional[int] = None) -> Path: - file_name = f"{now_time}.noob.json" - GROUP_DIR = RUBBISH_DIR / f"{group}" - path = GROUP_DIR / file_name - - if not GROUP_DIR.exists(): - GROUP_DIR.mkdir() - return path - - @classmethod - def read_noobs(cls, group: int) -> dict: - try: - data = json.loads(cls._get_noob(group).read_bytes()) - except: - data = {} - return data - - @classmethod - def read_noob(cls, group: int, user: Optional[int]) -> dict: - try: - data = json.loads(cls._get_noob(group).read_bytes()) - except: - data = {} - data.setdefault(user, {}) - return data - - @staticmethod - def get_keyword() -> dict: - file_name = "keyword.json" - path = RUBBISH_DIR / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - return data - - @staticmethod - async def store_keyword(data: dict) -> None: - file_name = "keyword.json" - path = RUBBISH_DIR / file_name - try: - await write_file(path, json.dumps(data)) - except WriteError: - raise WriteError("Writing file failed!") diff --git a/ATRI/plugins/call-owner.py b/ATRI/plugins/call-owner.py index af8e83d..2e12a1e 100644 --- a/ATRI/plugins/call-owner.py +++ b/ATRI/plugins/call-owner.py @@ -7,7 +7,7 @@ from nonebot.adapters.cqhttp import ( from ATRI.service import Service as sv from ATRI.rule import is_block -from ATRI.config import nonebot_config +from ATRI.config import Config from ATRI.utils.apscheduler import scheduler from ATRI.utils.list import count_list @@ -16,8 +16,8 @@ repo_list = [] repo = sv.on_command( - name="给维护者留言", cmd="来杯红茶", + docs="给维护者留言", rule=is_block() ) @@ -38,7 +38,7 @@ async def _repo_(bot: Bot, event: MessageEvent, state: T_State) -> None: repo_list.append(user) - for sup in nonebot_config["superusers"]: + for sup in Config.BotSelfConfig.superusers: await bot.send_private_msg( user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}" @@ -58,8 +58,8 @@ async def _() -> None: reset_repo = sv.on_command( - name="重置给维护者留言次数", cmd="重置红茶", + docs="重置给维护者的留言次数", permission=SUPERUSER ) diff --git a/ATRI/plugins/code-runner.py b/ATRI/plugins/code-runner.py index 731abfc..6da97b0 100644 --- a/ATRI/plugins/code-runner.py +++ b/ATRI/plugins/code-runner.py @@ -41,8 +41,8 @@ SUPPORTED_LANGUAGES = { code_runner = sv.on_command( - name="运行代码", cmd="/code", + docs="在线运行代码", rule=is_block() & is_in_dormant() ) diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py index 03da205..6db80cc 100644 --- a/ATRI/plugins/curse/__init__.py +++ b/ATRI/plugins/curse/__init__.py @@ -18,8 +18,8 @@ sick_list = [] __plugin_name__ = 'curse' curse = sv.on_command( - name="口臭", cmd="口臭一下", + docs="口臭", aliases={"口臭", "骂我"}, rule=is_block() & is_in_dormant() & is_in_service(__plugin_name__) diff --git a/ATRI/plugins/drifting-bottle/__init__.py b/ATRI/plugins/drifting-bottle/__init__.py index 5bd514c..4d3182e 100644 --- a/ATRI/plugins/drifting-bottle/__init__.py +++ b/ATRI/plugins/drifting-bottle/__init__.py @@ -1,2 +1,5 @@ from ATRI.service import Service as sv -from ATRI.rule import is_block, is_in_service
\ No newline at end of file +from ATRI.rule import is_block, is_in_service + + +# 等待撰写
\ No newline at end of file diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 020a413..80a1c97 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -23,9 +23,10 @@ from nonebot.adapters.cqhttp import ( import ATRI from ATRI.log import logger from ATRI.exceptions import WriteError -from ATRI.config import nonebot_config +from ATRI.config import Config from ATRI.rule import is_block from ATRI.service import Service as sv +from ATRI.utils.cqcode import coolq_code_check PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services' @@ -59,7 +60,7 @@ async def shutdown() -> None: @driver.on_bot_connect async def connect(bot) -> None: - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( int(superuser), "WebSocket 成功连接,数据开始传输。" @@ -68,7 +69,7 @@ async def connect(bot) -> None: @driver.on_bot_disconnect async def disconnect(bot) -> None: - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: try: await sv.NetworkPost.send_private_msg( int(superuser), @@ -82,7 +83,7 @@ ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' os.makedirs(ESSENTIAL_DIR, exist_ok=True) # 处理:好友请求 -request_friend_event = sv.on_request("Friends request", rule=is_block()) +request_friend_event = sv.on_request(rule=is_block()) @request_friend_event.handle() async def _request_friend_event(bot, event: FriendRequestEvent) -> None: @@ -108,7 +109,7 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None: except WriteError: raise WriteError("Writing file failed!") - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: msg = ( "主人,收到一条好友请求:\n" f"请求人:{event.get_user_id()}\n" @@ -122,7 +123,7 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None: # 处理:邀请入群,如身为管理,还附有入群请求 -request_group_event = sv.on_request("Group request",rule=is_block()) +request_group_event = sv.on_request(rule=is_block()) @request_group_event.handle() async def _request_group_event(bot, event: GroupRequestEvent) -> None: @@ -150,7 +151,7 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None: except WriteError: raise WriteError("Writing file failed!") - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: msg = ( "主人,收到一条入群请求:\n" f"请求人:{event.get_user_id()}\n" @@ -164,39 +165,39 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None: # 处理群成员变动 -group_member_event = sv.on_notice("Group member change") +group_member_event = sv.on_notice() @group_member_event.handle() -async def _group_member_event(bot: Bot, event) -> None: - if isinstance(event, GroupIncreaseNoticeEvent): +async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None: + msg = ( + "好欸!事新人!\n" + f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!" + ) + await group_member_event.finish(msg) + +@group_member_event.handle() +async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: + if event.is_tome(): msg = ( - "好欸!事新人!\n" - f"在下 {choice(list(nonebot_config['nickname']))} 哒!w!" + "呜呜呜,主人" + f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." ) - await group_member_event.finish(msg) - - elif isinstance(event, GroupDecreaseNoticeEvent): - if event.is_tome(): - msg = ( - "呜呜呜,主人" - f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." + for superuser in Config.BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg ) - for superuser in nonebot_config["superusers"]: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) - else: - await group_member_event.finish(f"{event.user_id} 离开了我们...") + else: + await group_member_event.finish(f"{event.user_id} 离开了我们...") # 处理群管理事件 -group_admin_event = sv.on_notice("Group admin change") +group_admin_event = sv.on_notice() @group_admin_event.handle() async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: if event.is_tome(): - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" @@ -204,7 +205,7 @@ async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: # 处理群禁言事件 -group_ban_event = sv.on_notice("Group ban change") +group_ban_event = sv.on_notice() @group_ban_event.handle() async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: @@ -215,7 +216,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n" f"时长...是 {event.duration} 秒" ) - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=int(superuser), message=msg @@ -225,7 +226,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: "好欸!主人\n" f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!" ) - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=int(superuser), message=msg @@ -233,7 +234,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: # 处理群红包运气王事件 -lucky_read_bag_event = sv.on_notice("Group read bag winner") +lucky_read_bag_event = sv.on_notice() @lucky_read_bag_event.handle() async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: @@ -245,7 +246,7 @@ async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: # 处理群文件上传事件 -group_file_upload_event = sv.on_notice("Group file change") +group_file_upload_event = sv.on_notice() @group_file_upload_event.handle() async def _group_file_upload_event(bot, @@ -254,51 +255,56 @@ async def _group_file_upload_event(bot, # 处理撤回事件 -recall_event = sv.on_notice("Group member recall") +recall_event = sv.on_notice() @recall_event.handle() -async def _recall_event(bot: Bot, event) -> None: - if isinstance(event, GroupRecallNoticeEvent): - repo = await bot.call_api( - "get_msg", - message_id=event.message_id - ) - repo = str(repo["message"]) - if "CQ" in repo: - repo = repo.replace("CQ", "QC") +async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None: + group = event.group_id + repo = await bot.call_api( + "get_msg", + message_id=event.message_id + ) + repo = str(repo["message"]) + check = await coolq_code_check(repo, group=group) + if not check: + repo = repo.replace("CQ", "QC") - msg = ( - "主人,咱拿到了一条撤回信息!\n" - f"{event.user_id}@[群:{event.group_id}]\n" - "撤回了\n" - f"{repo}" + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{event.user_id}@[群:{event.group_id}]\n" + "撤回了\n" + f"{repo}" + ) + + await bot.send(event, "咱看到惹~!") + for superuser in Config.BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg ) - await bot.send(event, "咱看到惹~!") - for superuser in nonebot_config["superusers"]: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) +@recall_event.handle() +async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: + user = event.user_id + repo = await bot.call_api( + "get_msg", + message_id=event.message_id + ) + repo = str(repo["message"]) + check = await coolq_code_check(repo, user) + if not check: + repo = repo.replace("CQ", "QC") - elif isinstance(event, FriendRecallNoticeEvent): - repo = await bot.call_api( - "get_msg", - message_id=event.message_id - ) - repo = str(repo["message"]) - if "CQ" in repo: - repo = repo.replace("CQ", "QC") + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{event.user_id}@[私聊]" + "撤回了\n" + f"{repo}" + ) - msg = ( - "主人,咱拿到了一条撤回信息!\n" - f"{event.user_id}@[私聊]" - "撤回了\n" - f"{repo}" + await bot.send(event, "咱看到惹~!") + for superuser in Config.BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg ) - - for superuser in nonebot_config["superusers"]: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py index 472c77c..f334bcf 100644 --- a/ATRI/plugins/funny.py +++ b/ATRI/plugins/funny.py @@ -1,9 +1,12 @@ +import asyncio from pathlib import Path from random import choice, randint -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent +from nonebot.adapters.cqhttp.message import Message from ATRI.service import Service as sv +from ATRI.utils.limit import is_too_exciting from ATRI.rule import ( is_block, is_in_dormant, @@ -11,13 +14,18 @@ from ATRI.rule import ( ) -__plugin_name__ = "laugh" +__doc__ = """ +看不懂的笑话 +权限组:所有人 +用法: + 来句笑话 +""" get_laugh = sv.on_command( - name="看不懂的笑话", cmd="来句笑话", + docs=__doc__, rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) + & is_in_service('来句笑话') ) @get_laugh.handle() @@ -34,12 +42,10 @@ async def _get_laugh(bot: Bot, event: MessageEvent) -> None: await get_laugh.finish(result.replace("%name", user_name)) -__plugin_name__ = "wty" - me_to_you = sv.on_message( - rule=is_block() & is_in_dormant() & is_in_service(__plugin_name__) + rule=is_block() & is_in_dormant() & is_in_service('你又行了') ) -sv.manual_reg_service("你又彳亍了") +sv.manual_reg_service('你又行了') @me_to_you.handle() async def _me_to_you(bot: Bot, event: MessageEvent) -> None: @@ -47,3 +53,92 @@ async def _me_to_you(bot: Bot, event: MessageEvent) -> None: msg = str(event.message) if "我" in msg and "CQ" not in msg: await me_to_you.finish(msg.replace("我", "你")) + + +__doc__ = """ +随机选择一位群友成为我的老婆! +权限组:所有人 +用法: + 抽老婆 +""" + +roll_wife = sv.on_command( + cmd="抽老婆", + docs=__doc__, + rule=is_block() & is_in_dormant() & is_in_service('抽老婆') +) + +@roll_wife.handle() +async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None: + user = event.user_id + group = event.group_id + user_name = await bot.get_group_member_info(group_id=group, + user_id=user) + user_name = user_name['nickname'] + run = await is_too_exciting(user, group, 5, True) + if not run: + return + + luck_list = await bot.get_group_member_list(group_id=group) + luck_user = choice(luck_list) + luck_qq = luck_user['user_id'] + luck_user = luck_user['nickname'] + msg = ( + "5秒后咱将随机抽取一位群友成为\n" + f"{user_name} 的老婆!究竟是谁呢~?" + ) + await bot.send(event, msg) + await asyncio.sleep(5) + msg = ( + f"> {luck_user}({luck_qq})\n" + f"恭喜成为 {user_name} 的老婆~⭐" + ) + await bot.send(event, msg) + + +__doc__ = """ +伪造转发 +权限组:所有人 +用法: + /fm qq-name-msg... +补充: + qq: QQ号 + name: 消息中的ID + msg: 对应信息 +示例: + /fm 123456789*生草人*草 114514*仙贝*臭死了 +""" + +fake_msg = sv.on_command( + cmd="/fm", + docs=__doc__, + rule=is_block() & is_in_service('/fm') & is_in_dormant() +) + +@fake_msg.handle() +async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None: + msg = str(event.message).split(' ') + user = event.user_id + group = event.group_id + node = [] + check = await is_too_exciting(user, group, 2, True) + + if check: + for i in msg: + args = i.split('*') + print(args) + qq = args[0] + name = args[1].replace('[', '[') + name = name.replace(']', ']') + repo = args[2].replace('[', '[') + repo = repo.replace(']', ']') + dic = { + "type": "node", + "data": { + "name": name, + "uin": qq, + "content": repo + } + } + node.append(dic) + await bot.send_group_forward_msg(group_id=group, messages=node) diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py index 167ca2f..ce118cd 100644 --- a/ATRI/plugins/github.py +++ b/ATRI/plugins/github.py @@ -12,7 +12,6 @@ URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}" github_issues = sv.on_message(rule=is_block() & is_in_dormant()) -sv.manual_reg_service("GitHubIssue速览") @github_issues.handle() async def _github_issues(bot: Bot, event: MessageEvent) -> None: diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py new file mode 100644 index 0000000..8744262 --- /dev/null +++ b/ATRI/plugins/help.py @@ -0,0 +1,67 @@ +import os +import json + +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.service import SERVICE_DIR +from ATRI.service import Service as sv +from ATRI.rule import is_block + + +SERVICE_DIR = SERVICE_DIR / 'services' + + +__doc__ = """ +查询命令用法 +权限组:所有人 +用法: + /h + /h list + /h info (cmd) +""" + +help = sv.on_command( + cmd="/h", + docs=__doc__, + aliases={'/help', '.help'}, + rule=is_block() +) + +async def _help(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + if not msg: + msg = ( + "呀?找不到路了?\n" + "/h list 查看可用命令列表\n" + "/h info (cmd) 查看命令具体帮助\n" + "项目地址:github.com/Kyomotoi/ATRI\n" + "咱只能帮你这么多了qwq" + ) + await help.finish(msg) + elif msg[0] == "list": + files = [] + for _, _, i in os.walk(SERVICE_DIR): + for a in i: + files.append(a.replace('.json', '')) + cmds = " ".join(map(str, files)) + msg = "咱能做很多事!比如:\n" + cmds + msg0 = msg + "\n具体用法呢,/(cmd) 就好!" + await help.finish(msg0) + elif msg[0] == "info": + cmd = msg[1] + data = {} + path = SERVICE_DIR / f"{cmd.replace('/', '')}.json" + try: + data = json.loads(path.read_bytes()) + except: + await help.finish('未找到相关命令...') + + msg = ( + f"{cmd} INFO:\n" + f"Enabled: {data['enabled']}\n" + f"{data['docs']}" + ) + await help.finish(msg) + else: + await help.finish('请检查输入...') diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py index 3754125..e7cfe61 100644 --- a/ATRI/plugins/hitokoto.py +++ b/ATRI/plugins/hitokoto.py @@ -19,14 +19,23 @@ HITOKOTO_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'hitokoto' sick_list = [] -__plugin_name__ = 'hitokoto' +__doc__ = """ +抑郁一下 +权限组:所有人 +用法: + @一言 + 抑郁一下 + 网抑云 +补充: + @:at Bot +""" hitokoto = sv.on_command( - name="Hitokoto", cmd="一言", + docs=__doc__, aliases={"抑郁一下", "网抑云"}, rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) & to_bot() + & is_in_service('一言') & to_bot() ) @hitokoto.handle() diff --git a/ATRI/plugins/key-repo/__init__.py b/ATRI/plugins/key-repo/__init__.py index 4b2241c..dc606b0 100644 --- a/ATRI/plugins/key-repo/__init__.py +++ b/ATRI/plugins/key-repo/__init__.py @@ -1,171 +1,185 @@ -import os -import json -import time -from random import choice -from pathlib import Path +import string +from datetime import datetime +from random import choice, sample -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent -from ATRI.config import nonebot_config +from ATRI.config import Config from ATRI.service import Service as sv +from ATRI.utils.request import get_bytes from ATRI.rule import is_block, is_in_dormant, is_in_service, to_bot - -# 此功能未完善 +from .data_source import ( + add_history, + add_key_temp, + load_key_data, + add_key, + load_key_history, + load_key_temp_data, + del_key_temp +) -KEYREPO_DIV = Path('.') / 'ATRI' / 'data' / 'database' / 'KeyRepo' -os.makedirs(KEYREPO_DIV, exist_ok=True) +# 此功能暂未完善:未添加关键词删除 +# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +# !屎山注意!屎山注意!屎山注意!屎山注意! +# !请自备降压药!请自备降压药! +# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -__plugin_name__ = "KeyRepo" keyrepo = sv.on_message(rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) + & is_in_service('keyrepo') & to_bot()) @keyrepo.handle() async def _keyrepo(bot: Bot, event: MessageEvent) -> None: msg = str(event.get_message()) - - file_name = "data.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - with open(path, 'w') as r: - r.write(json.dumps({})) - data = {} + data = load_key_data() for key in data.keys(): if key in msg: await keyrepo.finish(choice(data[key])) +__doc__ = """ +关键词申请/审核 +权限组:所有人 +用法: + /train add (key) (repo) + 对于维护者: + /train list + /train info (key) + /train r (code) (0,1) +补充: + key: 关键词 + repo: 回复 + 0,1: 对应布尔值False/True + code: 唯一识别码 +示例: + /train add hso 好涩哦 +""" + train = sv.on_command( - name="调教", cmd="/train", + docs=__doc__, rule=is_block() ) [email protected]("key", prompt="哦哦哦要开始学习了!请告诉咱知识点") -async def _train(bot: Bot, event: MessageEvent, state: T_State) -> None: - if "[CQ" in state["key"]: - await train.reject("仅支持纯文本呢...") - [email protected]("repo", prompt="咱该如何回答呢?") -async def _trainR(bot: Bot, event: MessageEvent, state: T_State) -> None: - if "[CQ" in state["repo"]: - await train.reject("仅支持纯文本呢...") - - if state["key"] == "-d": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - key = state["repo"] - if key not in data: - await train.finish("未发现该待审核的知识点呢...") - else: +async def _train(bot: Bot, event: GroupMessageEvent) -> None: + user = event.user_id + group = event.group_id + + msg = str(event.message).split(' ') + _type = msg[0] + code = "".join(sample(string.ascii_letters + string.digits, 10)) + + if _type == "add": + key = msg[1] + args = msg[2] + if user in Config.BotSelfConfig.superusers: + add_key(key, args) msg = ( - f"Key: {key}\n" - f"Repo: {data[key]['repo']}\n" - "已经从咱的审核列表移除!" + "好欸学到了新的知识!\n" + f"关键词:{key}\n" + f"回复:{args}" ) - del data[key] - with open(path, 'w') as r: - r.write(json.dumps(data)) - await train.finish(msg) - elif state["key"] == "-i": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - if state["repo"] not in data: - await train.finish("未发现该知识点呢") - key = data[state["repo"]] - - msg = ( - f"用户: {key['user']}\n" - f"知识点: {state['repo']}" - f"回复: {key['repo']}" - f"时间: {key['time']}" - "/train -r 知识点 y/n" - ) - await train.finish(msg) - elif state["key"] == "-ls": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - keys = ",".join(data.keys()) - msg = f"目前等待审核的有如下:\n{keys}" - await train.finish(msg) - elif state["key"] == "-r": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - - key = state["key"] - repo = state["repo"] - user = event.get_user_id() - if user not in nonebot_config["superusers"]: - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - if key in data: - msg = "欸欸欸,该词还在等待咱的审核,请先等先来的审核完再提交吧..." + data = { + "user": user, + "group": group, + "time": str(datetime.now()), + "pass": True, + "key": key, + "repo": args, + "feature": code + } + add_history(data) await train.finish(msg) else: - data[key] = { + data = { "user": user, - "repo": repo, - "time": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + "group": group, + "time": str(datetime.now()), + "pass": False, + "key": key, + "repo": args, + "feature": code } - with open(path, 'r') as r: - r.write(json.dumps(data, indent=4)) - + add_key_temp(data) msg = ( - "欸欸欸这不错欸!不过,还是先等待咱审核审核," - "如想撤销本次学习,请发送 /train -d 知识点" + "感谢你的提交w,所提交的关键词将由维护者进行审核\n" + f"识别码:{code},你可以使用/train info 识别码\n" + "以查询是否通过" ) await train.finish(msg) - - else: - file_name = "data.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - if key in data: - repo_list: list = data[key] - repo_list.append(repo) - data[key] = repo_list - msg = f"哦哦哦,{key}原来还有这样的回复,学到了~!" - await bot.send(event, msg) + elif _type == "list": + data = load_key_temp_data() + node = [] + for i in data: + dic = { + "type": "node", + "data": { + "name": "idk", + "uin": i['user'], + "content": f"Key: {i['key']}\nRepo: {i['repo']}\nTime: {i['time']}" + } + } + node.append(dic) + if not node: + node = [{ + "type": "node", + "data": { + "name": "null", + "uin": str(user), + "content": "这里什么也没有呢..." + } + }] + await bot.send_group_forward_msg(group_id=group, messages=node) + elif _type == "info": + key = msg[1] + data = load_key_history() + for i in data: + if i['key'] == key: + msg = ( + f"{key} 审核信息:\n" + f"是否通过:{i['pass']}" + f"结果: K: {i['key']} | R: {i['repo']}\n" + f"来自:{i['user']}@[群:{i['group']}]\n" + f"申请时间:{i['time']}" + ) + await train.finish(msg) + else: + await train.finish('未找到相关信息...') + elif _type == "r": + key = msg[1] + args = int(msg[2]) + data = load_key_temp_data() + if user in Config.BotSelfConfig.superusers: + if args not in [0, 1]: + await train.finish('请检查输入...') + else: + for i in data: + if bool(args): + if i['key'] == key: + msg = ( + "好欸学到了新的知识!\n" + f"关键词:{i['key']}\n" + f"回复:{i['repo']}" + ) + add_key(i['key'], i['repo']) + add_history(i) + await train.finish(msg) + else: + await train.finish('未找到相关信息...') + else: + add_history(i, False) + if del_key_temp(i): + await train.finish('已标记为不通过') + else: + await train.finish('未找到相关信息') else: - data[key] = [repo] - msg = "好欸,咱学到了新的知识点!" - await bot.send(event, msg) - - with open(path, 'w') as r: - r.write(json.dumps(data)) + await train.finish('不行哦~你的权限使得你没法这样做!') + else: + await train.finish('请检查输入...') diff --git a/ATRI/plugins/key-repo/data_source.py b/ATRI/plugins/key-repo/data_source.py new file mode 100644 index 0000000..3dd331d --- /dev/null +++ b/ATRI/plugins/key-repo/data_source.py @@ -0,0 +1,116 @@ +import os +import json +from pathlib import Path +from typing import Optional + + +KEYREPO_DIV = Path('.') / 'ATRI' / 'data' / 'database' / 'KeyRepo' +os.makedirs(KEYREPO_DIV, exist_ok=True) + + +def load_key_data() -> dict: + file_name = "data.json" + path = KEYREPO_DIV / file_name + try: + data = json.loads(path.read_bytes()) + except: + with open(path, 'w') as r: + r.write(json.dumps({})) + data = {} + return data + + +def load_key_temp_data() -> list: + file_name = "data.temp.json" + path = KEYREPO_DIV / file_name + try: + data = json.loads(path.read_bytes()) + except: + with open(path, 'w') as r: + r.write(json.dumps([])) + data = [] + return data + + +def load_key_history() -> list: + file_name = "data.history.json" + path = KEYREPO_DIV / file_name + try: + data = json.loads(path.read_bytes()) + except: + with open(path, 'w') as r: + r.write(json.dumps([])) + data = [] + return data + + +def save_key_data(d: dict) -> None: + file_name = "data.json" + path = KEYREPO_DIV / file_name + with open(path, 'w') as r: + r.write(json.dumps(d)) + + +def save_key_temp_data(d: list) -> None: + file_name = "data.temp.json" + path = KEYREPO_DIV / file_name + with open(path, 'w') as r: + r.write(json.dumps(d)) + + +def save_key_history_data(d: list) -> None: + file_name = "data.history.json" + path = KEYREPO_DIV / file_name + with open(path, 'w') as r: + r.write(json.dumps(d)) + + +def add_key(key: str, repo: str) -> str: + data = load_key_data() + data_1 = data.get(key, []) + if repo in data_1: + return "该回复已存在~!" + data_1.append(repo) + data[key] = data_1 + save_key_data(data) + return "成功,又学到新知识了~!" + + +def add_key_temp(d: dict) -> None: + data: list = load_key_temp_data() + data.append(d) + save_key_temp_data(data) + add_history(d, False) + + +def add_history(d: dict, p: bool = True) -> None: + d['pass'] = p + data: list = load_key_history() + data.append(d) + save_key_history_data(data) + + +def del_key(key: str, repo: str) -> str: + data = load_key_data() + if repo == 'isALL': + del data[key] + msg = f"成功删除关键词[{key}]下所有回复..." + else: + data_1 = data.get(key, []) + try: + data_1.remove(key) + except KeyError: + raise KeyError('Find repo error.') + data[key] = data_1 + msg = f"成功删除关键词[{key}]下回复:{repo}" + save_key_data(data) + return msg + + +def del_key_temp(d: dict) -> bool: + data = load_key_temp_data() + if d in data: + data.remove(d) + return True + else: + return False diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py new file mode 100644 index 0000000..10e74fb --- /dev/null +++ b/ATRI/plugins/nsfw.py @@ -0,0 +1,103 @@ +import re +import json + +from nonebot.adapters.cqhttp import Bot, GroupMessageEvent +from nonebot.typing import T_State + +from ATRI.log import logger as log +from ATRI.config import Config +from ATRI.service import Service as sv +from ATRI.exceptions import RequestTimeOut +from ATRI.rule import is_block, is_in_dormant, is_in_service +from ATRI.utils.request import get_bytes +from ATRI.utils.cqcode import coolq_code_check + + +nsfw_url = ( + f"http://{Config.NsfwCheck.host}:" + f"{Config.NsfwCheck.port}/?url=" +) + + +nsfw_checking = sv.on_message() + +@nsfw_checking.handle() +async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None: + if Config.NsfwCheck.enabled: + msg = str(event.message) + user = event.user_id + group = event.group_id + check = await coolq_code_check(msg, user, group) + if check: + url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0] + try: + data = json.loads(await get_bytes(url)) + except: + log.warning('检测涩图失败,请查阅文档以获取帮助') + return + if round(data['score'], 4) > 0.6: + score = "{:.2%}".format(round(data['score'], 4)) + log.debug(f'截获涩图,得分:{score}') + await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!') + for sup in Config.BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=f"{msg}\n涩值: {score}") + else: + pass + + +__doc__ = """ +检测你图片的涩值 +权限组:所有人 +用法: + /nsfw (pic) +补充: + pic: 图片 +示例: + /nsfw 然后Bot会向你索取图片 +""" + +nsfw_reading = sv.on_command( + cmd="/nsfw", + docs=__doc__, + rule=is_block() & is_in_service('/nsfw') & is_in_dormant() +) + +@nsfw_reading.handle() +async def _nsfw_r(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: + user = event.user_id + group = event.group_id + msg = str(event.message).strip() + check = await coolq_code_check(msg, user, group) + if check and msg: + state['pic'] = msg + +@nsfw_reading.got('pic', prompt='请提供一张图片') +async def _nsfw_reading(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: + msg = state['pic'] + pic = re.findall(r"url=(.*?)]", msg) + if not pic: + await nsfw_reading.reject('请发送图片而不是其它东西!!') + + url = nsfw_url + pic[0] + try: + data = json.loads(await get_bytes(url)) + except RequestTimeOut: + raise RequestTimeOut('Time out!') + + score = round(data['score'], 4) + result = "{:.2%}".format(round(data['score'], 4)) + if score > 0.9: + level = "hso! 我要发给主人看!" + for sup in Config.BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=f"{state['pic']}\n涩值: {result}") + elif 0.9 > score >= 0.6: + level = "嗯,可冲" + else: + level = "?能不能换张55完全冲不起来" + + repo = f"涩值:{result}\n{level}" + await nsfw_reading.finish(repo)
\ No newline at end of file diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py index 7c8179d..8404da6 100644 --- a/ATRI/plugins/rich/__init__.py +++ b/ATRI/plugins/rich/__init__.py @@ -14,7 +14,7 @@ from ATRI.rule import ( from .data_source import dec -waiting_list = [] +temp_list = [] bilibili_rich = sv.on_message( @@ -24,17 +24,10 @@ sv.manual_reg_service("监听b站小程序") @bilibili_rich.handle() async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: - global waiting_list + global temp_list msg = str(event.raw_message).replace("\\", "") - user = event.user_id bv = False - if count_list(waiting_list, user) == 5: - waiting_list = del_list_aim(waiting_list, user) - return - - waiting_list.append(user) - if "qqdocurl" not in msg: if "av" in msg: av = re.findall(r"(av\d+)", msg)[0].replace('av', '') @@ -60,6 +53,13 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: else: return + if count_list(temp_list, av) == 4: + await bot.send(event, "你是怕别人看不到么发这么多次?") + temp_list = del_list_aim(temp_list, av) + return + + temp_list.append(av) + try: URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}" except: diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py index c3b033c..25e4e0b 100644 --- a/ATRI/plugins/status.py +++ b/ATRI/plugins/status.py @@ -6,12 +6,12 @@ from ATRI.service import Service as sv from ATRI.rule import is_block from ATRI.exceptions import GetStatusError from ATRI.utils.apscheduler import scheduler -from ATRI.config import nonebot_config +from ATRI.config import Config ping = sv.on_command( - name="测试机器人", cmd="/ping", + docs="测试机器人", rule=is_block() ) @@ -21,8 +21,8 @@ async def _ping(bot: Bot, event: MessageEvent) -> None: status = sv.on_command( - name="检查机器人状态", cmd="/status", + docs="检查机器人状态", rule=is_block() ) @@ -98,7 +98,7 @@ async def _(): f"* netRECV: {inteRECV}MB\n" ) + msg - for sup in nonebot_config["superusers"]: + for sup in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=sup, message=msg0 diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py index 10b3317..b23a01f 100644 --- a/ATRI/plugins/utils/__init__.py +++ b/ATRI/plugins/utils/__init__.py @@ -7,16 +7,25 @@ from ATRI.rule import ( is_in_dormant, is_in_service ) -from .data_source import roll_dice +from .data_source import roll_dice, Encrypt -__plugin_name__ = "roll" +__doc__ = """ +roll一下 +权限组:所有人 +用法: + /roll (int)d(int)+... +补充: + int: 阿拉伯数字 +示例: + /roll 1d10+10d9+4d5+2d3 +""" roll = sv.on_command( - name="roll一下", cmd="/roll", + docs=__doc__, rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) + & is_in_service('/roll') ) @roll.handle() @@ -34,3 +43,36 @@ async def _(bot: Bot, event: MessageEvent, state: dict) -> None: await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10") await roll.finish(roll_dice(resu)) + + +__doc__ = """ +加密你的信息! +权限组:所有人 +用法: + /enc e,d msg +补充: + e,d:对应 编码/解码 + msg: 目标内容 +示例: + /enc e アトリは高性能ですから! +""" + +encrypt = sv.on_command( + cmd="/enc", + docs=__doc__, + rule=is_block() & is_in_service('/enc') & is_in_dormant() +) + +async def _encrypt(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + _type = msg[0] + s = msg[1] + e = Encrypt() + + if _type == "e": + await encrypt.finish(e.encode(s)) + elif _type == "d": + await encrypt.finish(e.decode(s)) + else: + await encrypt.finish('请检查输入~!') diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py index f41a1d1..71da581 100644 --- a/ATRI/plugins/utils/data_source.py +++ b/ATRI/plugins/utils/data_source.py @@ -1,5 +1,7 @@ import re import random +from math import floor +from typing import Union def roll_dice(par: str) -> str: @@ -32,3 +34,140 @@ def roll_dice(par: str) -> str: result = f"{par}=({proc})={result}" return result + + +class Encrypt(): + cr = 'ĀāĂ㥹ÀÁÂÃÄÅ' + cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ' + cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr' + cb = 'ĨĩĪīĬĭĮįİı' + + sr = len(cr) + sc = len(cc) + sn = len(cn) + sb = len(cb) + src = sr * sc + snb = sn * sb + scnb = sc * snb + + def _div(self, a: int, b: int) -> int: + return floor(a / b) + + def _encodeByte(self, i) -> Union[str, None]: + if i > 0xFF: + raise ValueError('ERROR! rc/nb overflow') + + if i > 0x7F: + i = i & 0x7F + return self.cn[self._div(i, self.sb) + int(self.cb[i % self.sb])] + + return self.cr[self._div(i, self.sc) + int(self.cc[i % self.sc])] + + def _encodeShort(self, i) -> str: + if i > 0xFFFF: + raise ValueError('ERROR! rcnb overflow') + + reverse = False + if i > 0x7FFF: + reverse = True + i = i & 0x7FFF + + char = [ + self._div(i, self.scnb), + self._div(i % self.scnb, self.snb), + self._div(i % self.snb, self.sb), i % self.sb + ] + char = [ + self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], + self.cb[char[3]] + ] + + if reverse: + return char[2] + char[3] + char[0] + char[1] + + return ''.join(char) + + def _decodeByte(self, c) -> int: + nb = False + idx = [self.cr.index(c[0]), self.cc.index(c[1])] + if idx[0] < 0 or idx[1] < 0: + idx = [self.cn.index(c[0]), self.cb.index(c[1])] + nb = True + raise ValueError('ERROR! rc/nb overflow') + + result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1] + if result > 0x7F: + raise ValueError('ERROR! rc/nb overflow') + + return result | 0x80 if nb else 0 + + def _decodeShort(self, c) -> int: + reverse = c[0] not in self.cr + if not reverse: + idx = [ + self.cr.index(c[0]), + self.cc.index(c[1]), + self.cn.index(c[2]), + self.cb.index(c[3]) + ] + else: + idx = [ + self.cr.index(c[2]), + self.cc.index(c[3]), + self.cn.index(c[0]), + self.cb.index(c[1]) + ] + + if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0: + raise ValueError('ERROR! not rcnb') + + result = idx[0] * self.scnb + idx[1] * self.snb + idx[ + 2] * self.sb + idx[3] + if result > 0x7FFF: + raise ValueError('ERROR! rcnb overflow') + + result |= 0x8000 if reverse else 0 + return result + + def _encodeBytes(self, b) -> str: + result = [] + for i in range(0, (len(b) >> 1)): + result.append(self._encodeShort((b[i * 2] << 8 | b[i * 2 + 1]))) + + if len(b) & 1 == 1: + result.append(self._encodeByte(b[-1])) + + return ''.join(result) + + def encode(self, s: str, encoding: str = 'utf-8'): + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + return self._encodeBytes(s.encode(encoding)) + + def _decodeBytes(self, s: str): + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + if len(s) & 1: + raise ValueError('ERROR length') + + result = [] + for i in range(0, (len(s) >> 2)): + result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8])) + result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF + ])) + + if (len(s) & 2) == 2: + result.append(bytes([self._decodeByte(s[-2:])])) + + return b''.join(result) + + def decode(self, s: str, encoding: str = 'utf-8') -> str: + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + try: + return self._decodeBytes(s).decode(encoding) + except UnicodeDecodeError: + raise ValueError('Decoding failed') |