diff options
Diffstat (limited to 'ATRI/plugins/essential.py')
-rw-r--r-- | ATRI/plugins/essential.py | 415 |
1 files changed, 193 insertions, 222 deletions
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 6a14f1a..5369c9b 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -1,16 +1,16 @@ import os -import time import json -import shutil -from pathlib import Path -from random import choice +import asyncio from datetime import datetime +from pydantic.main import BaseModel +from random import choice, randint +from pathlib import Path +import nonebot from nonebot.typing import T_State from nonebot.matcher import Matcher from nonebot.message import run_preprocessor from nonebot.exception import IgnoredException -from nonebot.adapters.cqhttp.message import Message from nonebot.adapters.cqhttp import ( Bot, MessageEvent, @@ -21,246 +21,224 @@ from nonebot.adapters.cqhttp import ( GroupDecreaseNoticeEvent, GroupAdminNoticeEvent, GroupBanNoticeEvent, - LuckyKingNotifyEvent, - GroupUploadNoticeEvent, GroupRecallNoticeEvent, FriendRecallNoticeEvent, ) import ATRI -from ATRI.log import logger -from ATRI.exceptions import WriteError +from ATRI.service import Service +from ATRI.log import logger as log +from ATRI.rule import is_in_service from ATRI.config import BotSelfConfig -from ATRI.service import Service as sv -from ATRI.utils.cqcode import coolq_code_check +from ATRI.utils import CoolqCodeChecker -PLUGIN_INFO_DIR = Path(".") / "ATRI" / "data" / "service" / "services" +driver = ATRI.driver() +bots = nonebot.get_bots() + ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" -os.makedirs(PLUGIN_INFO_DIR, exist_ok=True) +MANEGE_DIR = Path(".") / "ATRI" / "data" / "database" / "manege" os.makedirs(ESSENTIAL_DIR, exist_ok=True) - - -driver = ATRI.driver() +os.makedirs(MANEGE_DIR, exist_ok=True) @driver.on_startup -async def startup() -> None: - logger.info("アトリは、高性能ですから!") +async def startup(): + log.info("アトリは、高性能ですから!") @driver.on_shutdown -async def shutdown() -> None: - logger.info("Thanks for using.") - logger.debug("bot已停止运行,正在清理插件信息...") - try: - shutil.rmtree(PLUGIN_INFO_DIR) - logger.debug("成功!") - except Exception: - repo = ("清理插件信息失败", "请前往 ATRI/data/service/services 下", "将 services 整个文件夹删除") - time.sleep(10) - raise Exception(repo) - - [email protected]_bot_connect -async def connect(bot) -> None: - for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 成功连接,数据开始传输。") - - [email protected]_bot_disconnect -async def disconnect(bot) -> None: - for superuser in BotSelfConfig.superusers: - try: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 貌似断开了呢...") - except: - logger.error("WebSocket 已断开,等待重连") +async def shutdown(): + log.info("Thanks for using.") @run_preprocessor # type: ignore async def _check_block( matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State ) -> None: - user = str(event.user_id) - try: - msg = str(event.message) - except: - msg = "" - if not sv.BlockSystem.auth_user(user): - raise IgnoredException(f"Block user: {user}") - - if not sv.Dormant.is_dormant(): - if "/dormant" not in msg: - raise IgnoredException("Bot has been dormant.") + user_file = "block_user.json" + path = MANEGE_DIR / user_file + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + + user_id = event.get_user_id() + if user_id in data: + raise IgnoredException(f"Block user: {user_id}") if isinstance(event, GroupMessageEvent): - group = str(event.group_id) - if not sv.BlockSystem.auth_group(group): - raise IgnoredException(f"Block group: {group}") - + group_file = "block_group.json" + path = MANEGE_DIR / group_file + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + + group_id = str(event.group_id) + if group_id in data: + raise IgnoredException(f"Block group: {user_id}") -@run_preprocessor # type: ignore -async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> None: - if isinstance(event, GroupMessageEvent): - if event.sub_type == "normal": - now_time = datetime.now().strftime("%Y-%m-%d") - GROUP_DIR = ESSENTIAL_DIR / "chat_history" / f"{event.group_id}" - os.makedirs(GROUP_DIR, exist_ok=True) - path = GROUP_DIR / f"{now_time}.chat.json" - now_time = datetime.now().strftime("%Y%m%d-%H%M%S") - - try: - data = json.loads(path.read_bytes()) - except: - data = dict() - data[str(event.message_id)] = { - "date": now_time, - "time": str(time.time()), - "post_type": str(event.post_type), - "sub_type": str(event.sub_type), - "user_id": str(event.user_id), - "group_id": str(event.group_id), - "message_type": str(event.message_type), - "message": str(event.message), - "raw_message": event.raw_message, - "font": str(event.font), - "sender": { - "user_id": str(event.sender.user_id), - "nickname": event.sender.nickname, - "sex": event.sender.sex, - "age": str(event.sender.age), - "card": event.sender.card, - "area": event.sender.area, - "level": event.sender.level, - "role": event.sender.role, - "title": event.sender.title, - }, - "to_me": str(event.to_me), - } - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - logger.debug(f"写入消息成功,id: {event.message_id}") - except WriteError: - logger.error("消息记录失败,可能是缺少文件的原因!") - else: - pass - else: - pass - else: - pass +class FriendRequestInfo(BaseModel): + user_id: str + comment: str + time: str + is_approve: bool -# 处理:好友请求 -request_friend_event = sv.on_request() +class GroupRequestInfo(BaseModel): + user_id: str + comment: str + time: str + is_approve: bool -@request_friend_event.handle() -async def _request_friend_event(bot, event: FriendRequestEvent) -> None: - file_name = "request_friend.json" - path = ESSENTIAL_DIR / file_name - path.parent.mkdir(exist_ok=True, parents=True) - try: - data = json.loads(path.read_bytes()) - except: - data = dict() - data[event.flag] = {"user_id": event.user_id, "comment": event.comment} - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - except WriteError: - raise WriteError("Writing file failed!") +__doc__ = """ +对bot基础/必须请求进行处理 +""" - for superuser in BotSelfConfig.superusers: - msg = ( - "主人,收到一条好友请求:\n" - f"请求人:{event.get_user_id()}\n" - f"申请信息:{event.comment}\n" - f"申请码:{event.flag}" - ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) +class Essential(Service): + + def __init__(self): + Service.__init__(self, "基础部件", __doc__) -# 处理:邀请入群,如身为管理,还附有入群请求 -request_group_event = sv.on_request() +friend_add_event = Essential().on_request("好友添加") -@request_group_event.handle() -async def _request_group_event(bot, event: GroupRequestEvent) -> None: - file_name = "request_group.json" +@friend_add_event.handle() +async def _friend_add(bot: Bot, event: FriendRequestEvent): + """ + 存储文件结构: + { + "Apply code": { + "user_id": "User ID", + "comment": "Comment content" + "time": "Time", + "is_approve": bool # Default: False + } + } + """ + file_name = "friend_add.json" path = ESSENTIAL_DIR / file_name - path.parent.mkdir(exist_ok=True, parents=True) - - try: - data = json.loads(path.read_bytes()) - except: + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) data = dict() - data[event.flag] = { - "user_id": event.user_id, - "group_id": event.group_id, - "sub_type": event.sub_type, - "comment": event.comment, + + apply_code = event.flag + apply_comment = event.comment + user_id = event.get_user_id() + now_time = datetime.now() + + data = json.loads(path.read_bytes()) + data[apply_code] = FriendRequestInfo( + user_id=user_id, + comment=apply_comment, + time=now_time, + is_approve=False + ) + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + + repo = ( + "咱收到一条好友请求...\n" + f"请求人:{user_id}\n" + f"申请信息:{apply_comment}\n" + f"申请码:{apply_code}\n" + "Tip:好友申请 帮助" + ) + for superuser in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=superuser, message=repo) + + +group_invite_event = Essential().on_request("邀请入群") + +@group_invite_event.handle() +async def _group_invite(bot: Bot, event: GroupRequestEvent): + """ + 存储文件结构: + { + "Apply code": { + "user_id": "User ID", + "comment": "Comment content" + "time": "Time", + "is_approve": bool # Default: False + } } - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - except WriteError: - raise WriteError("Writing file failed!") - + """ + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + apply_code = event.flag + apply_comment = event.comment + user_id = event.get_user_id() + now_time = datetime.now() + + data = json.loads(path.read_bytes()) + data[apply_code] = GroupRequestInfo( + user_id=user_id, + comment=apply_comment, + time=now_time, + is_approve=False + ) + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + + repo = ( + "咱收到一条群聊邀请请求...\n" + f"请求人:{user_id}\n" + f"申请信息:{apply_comment}\n" + f"申请码:{apply_code}\n" + "Tip:群聊邀请 帮助" + ) for superuser in BotSelfConfig.superusers: - msg = ( - "主人,收到一条入群请求:\n" - f"请求人:{event.get_user_id()}\n" - f"申请信息:{event.comment}\n" - f"申请码:{event.flag}" - ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) - + await bot.send_private_msg(user_id=superuser, message=repo) -# 处理群成员变动 -group_member_event = sv.on_notice() +group_member_event = Essential().on_notice("群成员变动") @group_member_event.handle() -async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None: - msg = "好欸!事新人!\n" f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!" +async def _group_member_join(bot: Bot, event: GroupIncreaseNoticeEvent): + await asyncio.sleep(randint(1, 6)) + msg = ( + "好欸!事新人!\n" + f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!" + ) await group_member_event.finish(msg) - @group_member_event.handle() -async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: - if event.is_tome(): - if event.user_id != event.self_id: - return - msg = "呜呜呜,主人" f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." - for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) - else: - await group_member_event.finish("阿!有人离开了我们...") +async def _group_member_left(bot: Bot, event: GroupDecreaseNoticeEvent): + await asyncio.sleep(randint(1, 6)) + await group_member_event.finish("呜——有人跑了...") -# 处理群管理事件 -group_admin_event = sv.on_notice() - +group_admin_event = Essential().on_notice("群管理变动") @group_admin_event.handle() -async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: +async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent): if not event.is_tome(): return for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( + await bot.send_private_msg( user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" ) -# 处理群禁言事件 -group_ban_event = sv.on_notice() - +group_ban_event = Essential().on_notice("群禁言变动") @group_ban_event.handle() -async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: +async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent): if not event.is_tome(): return @@ -271,70 +249,63 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: f"时长...是 {event.duration} 秒" ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) else: msg = "好欸!主人\n" f"咱在群 {event.group_id} 的口球被 {event.operator_id} 解除了!" for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) -# 处理群红包运气王事件 -lucky_read_bag_event = sv.on_notice() - - -@lucky_read_bag_event.handle() -async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: - msg = "8行,这可忍?" f"gkd [CQ:at,qq={event.user_id}] 发一个!" - await lucky_read_bag_event.finish(Message(msg)) - - -# 处理群文件上传事件 -group_file_upload_event = sv.on_notice() - - -@group_file_upload_event.handle() -async def _group_file_upload_event(bot, event: GroupUploadNoticeEvent) -> None: - await group_file_upload_event.finish("让我康康传了啥好东西") - - -# 处理撤回事件 -recall_event = sv.on_notice() - +recall_event = Essential().on_notice("撤回事件") @recall_event.handle() -async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None: +async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent): + if event.is_tome(): + return + try: repo = await bot.get_msg(message_id=event.message_id) - except: + except BaseException: return + user = event.user_id group = event.group_id repo = str(repo["message"]) - check = await coolq_code_check(repo, group=group) + check = CoolqCodeChecker(repo).check if not check: repo = repo.replace("CQ", "QC") - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[群:{event.group_id}]\n" "撤回了\n" f"{repo}" - + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{user}@[群:{group}]\n" + "撤回了\n" + f"{repo}" + ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) @recall_event.handle() -async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: +async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent): + if event.is_tome(): + return + try: repo = await bot.get_msg(message_id=event.message_id) - except: + except BaseException: return user = event.user_id repo = str(repo["message"]) - check = await coolq_code_check(repo, user) + check = CoolqCodeChecker(repo).check if not check: repo = repo.replace("CQ", "QC") - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[私聊]" "撤回了\n" f"{repo}" - - await bot.send(event, "咱看到惹~!") + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{user}@[私聊]" + "撤回了\n" + f"{repo}" + ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) |