summaryrefslogtreecommitdiff
path: root/ATRI/plugins/essential.py
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-11-04 10:20:01 +0800
committerKyomotoi <[email protected]>2022-11-04 10:20:01 +0800
commitb2507f8d4dd051490e49bf9a0d235ab930b63ddd (patch)
treef1c00581aa3cf7ba5382d21415adbe6ab42db1f8 /ATRI/plugins/essential.py
parent24342060a99979e6c51390cfe8d86b177136d5c5 (diff)
downloadATRI-b2507f8d4dd051490e49bf9a0d235ab930b63ddd.tar.gz
ATRI-b2507f8d4dd051490e49bf9a0d235ab930b63ddd.tar.bz2
ATRI-b2507f8d4dd051490e49bf9a0d235ab930b63ddd.zip
♻️ 重构插件: 基础部件
Diffstat (limited to 'ATRI/plugins/essential.py')
-rw-r--r--ATRI/plugins/essential.py362
1 files changed, 0 insertions, 362 deletions
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
deleted file mode 100644
index 08c2ec1..0000000
--- a/ATRI/plugins/essential.py
+++ /dev/null
@@ -1,362 +0,0 @@
-import json
-import shutil
-import asyncio
-from datetime import datetime
-from pydantic.main import BaseModel
-from random import choice, randint
-from pathlib import Path
-from typing import Union
-
-import nonebot
-from nonebot.message import run_preprocessor
-from nonebot.exception import IgnoredException
-from nonebot.adapters.onebot.v11 import (
- Bot,
- MessageEvent,
- GroupMessageEvent,
- FriendRequestEvent,
- GroupRequestEvent,
- GroupIncreaseNoticeEvent,
- GroupDecreaseNoticeEvent,
- GroupAdminNoticeEvent,
- GroupBanNoticeEvent,
- GroupRecallNoticeEvent,
- FriendRecallNoticeEvent,
- MessageSegment,
- Message,
-)
-
-from ATRI import conf
-from ATRI.service import Service
-from ATRI.log import log
-from ATRI.utils import MessageChecker
-from ATRI.utils.apscheduler import scheduler
-from ATRI.permission import MASTER
-
-
-bots = nonebot.get_bots()
-
-
-ESSENTIAL_DIR = Path(".") / "data" / "plugins" / "essential"
-MANEGE_DIR = Path(".") / "data" / "plugins" / "manege"
-TEMP_PATH = Path(".") / "data" / "temp"
-ESSENTIAL_DIR.mkdir(parents=True, exist_ok=True)
-MANEGE_DIR.mkdir(parents=True, exist_ok=True)
-TEMP_PATH.mkdir(parents=True, exist_ok=True)
-
-
-@run_preprocessor
-async def _check_block(event: MessageEvent):
- user_file = "block_user.json"
- path = MANEGE_DIR / user_file
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(dict()))
-
- try:
- data = json.loads(path.read_bytes())
- except Exception:
- data = dict()
-
- user_id = event.get_user_id()
- if user_id in data:
- raise IgnoredException(f"Block user: {user_id}")
-
- if isinstance(event, GroupMessageEvent):
- group_file = "block_group.json"
- path = MANEGE_DIR / group_file
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(dict()))
-
- try:
- data = json.loads(path.read_bytes())
- except Exception:
- data = dict()
-
- group_id = str(event.group_id)
- if group_id in data:
- raise IgnoredException(f"Block group: {user_id}")
-
-
-class FriendRequestInfo(BaseModel):
- user_id: str
- comment: str
- time: str
- is_approve: bool
-
-
-class GroupRequestInfo(BaseModel):
- user_id: str
- comment: str
- time: str
- is_approve: bool
-
-
-ess = Service("基础部件").document("对bot基础/必须请求进行处理")
-
-
-friend_add_event = ess.on_request("好友添加", "好友添加检测")
-
-
-@friend_add_event.handle()
-async def _friend_add(bot: Bot, event: FriendRequestEvent):
- """
- 存储文件结构:
- {
- "Apply code": {
- "user_id": "User ID",
- "comment": "Comment content"
- "time": "Time",
- "is_approve": bool # Default: False
- }
- }
- """
- file_name = "friend_add.json"
- path = ESSENTIAL_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
- data = dict()
-
- apply_code = event.flag
- apply_comment = event.comment
- user_id = event.get_user_id()
- now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- data = json.loads(path.read_bytes())
- data[apply_code] = FriendRequestInfo(
- user_id=user_id, comment=apply_comment, time=now_time, is_approve=False
- ).dict()
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(data, indent=4))
-
- repo = (
- "咱收到一条好友请求...\n"
- f"请求人:{user_id}\n"
- f"申请信息:{apply_comment}\n"
- f"申请码:{apply_code}\n"
- "Tip: 获取好友申请"
- )
- for superuser in conf.BotConfig.superusers:
- await bot.send_private_msg(user_id=superuser, message=repo)
-
-
-group_invite_event = ess.on_request("邀请入群", "被邀请入群检测")
-
-
-@group_invite_event.handle()
-async def _group_invite(bot: Bot, event: GroupRequestEvent):
- """
- 存储文件结构:
- {
- "Apply code": {
- "user_id": "User ID",
- "comment": "Comment content"
- "time": "Time",
- "is_approve": bool # Default: False
- }
- }
- """
- file_name = "group_invite.json"
- path = ESSENTIAL_DIR / file_name
- if not path.is_file():
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps({}))
- data = dict()
-
- apply_code = event.flag
- apply_comment = event.comment
- user_id = event.get_user_id()
- now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- data = json.loads(path.read_bytes())
- data[apply_code] = GroupRequestInfo(
- user_id=user_id, comment=apply_comment, time=now_time, is_approve=False
- ).dict()
- with open(path, "w", encoding="utf-8") as w:
- w.write(json.dumps(data, indent=4))
-
- repo = (
- "咱收到一条群聊请求...\n"
- f"请求人:{user_id}\n"
- f"申请信息:{apply_comment}\n"
- f"申请码:{apply_code}\n"
- "Tip: 获取邀请列表"
- )
- for superuser in conf.BotConfig.superusers:
- await bot.send_private_msg(user_id=superuser, message=repo)
-
-
-group_member_event = ess.on_notice("群成员变动", "群成员变动检测")
-
-
-@group_member_event.handle()
-async def _group_member_join(bot: Bot, event: GroupIncreaseNoticeEvent):
- await asyncio.sleep(randint(1, 6))
- msg = "好欸!事新人!\n" f"在下 {choice(list(conf.BotConfig.nickname))} 哒!w!"
- await group_member_event.finish(msg)
-
-
-@group_member_event.handle()
-async def _group_member_left(bot: Bot, event: GroupDecreaseNoticeEvent):
- await asyncio.sleep(randint(1, 6))
- await group_member_event.finish("呜——有人跑了...")
-
-
-group_admin_event = ess.on_notice("群管理变动", "群管理变动检测")
-
-
-@group_admin_event.handle()
-async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent):
- if not event.is_tome():
- return
-
- for superuser in conf.BotConfig.superusers:
- await bot.send_private_msg(
- user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!"
- )
-
-
-group_ban_event = ess.on_notice("群禁言变动", "群禁言变动检测")
-
-
-@group_ban_event.handle()
-async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent):
- if not event.is_tome():
- return
-
- if event.duration:
- msg = (
- "那个..。,主人\n"
- f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n"
- f"时长...是 {event.duration} 秒"
- )
- for superuser in conf.BotConfig.superusers:
- await bot.send_private_msg(user_id=int(superuser), message=msg)
- else:
- msg = "好欸!主人\n" f"咱在群 {event.group_id} 的口球被 {event.operator_id} 解除了!"
- for superuser in conf.BotConfig.superusers:
- await bot.send_private_msg(user_id=int(superuser), message=msg)
-
-
-_acc_recall = True
-
-
-recall_event = ess.on_notice("撤回事件", "撤回事件检测")
-
-
-@recall_event.handle()
-async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent):
- if event.is_tome():
- return
-
- if not _acc_recall:
- return
-
- try:
- repo = await bot.get_msg(message_id=event.message_id)
- except Exception:
- return
-
- log.debug(f"Recall raw msg:\n{repo}")
- user = event.user_id
- group = event.group_id
- repo = repo["message"]
-
- try:
- m = recall_msg_dealer(repo)
- except Exception:
- check = MessageChecker(repo).check_cq_code
- if not check:
- m = repo
- else:
- return
- msg = f"主人,咱拿到了一条撤回信息!\n{user}@[群:{group}]\n撤回了\n{m}"
- for superuser in conf.BotConfig.superusers:
- await bot.send_private_msg(user_id=int(superuser), message=Message(msg))
-
-
-@recall_event.handle()
-async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent):
- if event.is_tome():
- return
-
- if not _acc_recall:
- return
-
- try:
- repo = await bot.get_msg(message_id=event.message_id)
- except Exception:
- return
-
- log.debug(f"Recall raw msg:\n{repo}")
- user = event.user_id
- repo = repo["message"]
-
- try:
- m = recall_msg_dealer(repo)
- except Exception:
- check = MessageChecker(repo).check_cq_code
- if not check:
- m = repo
- else:
- return
-
- msg = f"主人,咱拿到了一条撤回信息!\n{user}@[私聊]撤回了\n{m}"
- for superuser in conf.BotConfig.superusers:
- await bot.send_private_msg(user_id=int(superuser), message=Message(msg))
-
-
-rej_recall = ess.on_command("拒绝撤回", "拒绝撤回信息", permission=MASTER)
-
-
-@rej_recall.handle()
-async def _():
- global _acc_recall
- _acc_recall = False
- await rej_recall.finish("已拒绝撤回信息...")
-
-
-acc_recall = ess.on_command("接受撤回", "接受撤回信息", permission=MASTER)
-
-
-@acc_recall.handle()
-async def _():
- global _acc_recall
- _acc_recall = True
- await acc_recall.finish("现在可以接受撤回信息啦!")
-
-
[email protected]_job("interval", name="清除缓存", minutes=30, misfire_grace_time=5) # type: ignore
-async def _clear_cache():
- try:
- shutil.rmtree(TEMP_PATH)
- TEMP_PATH.mkdir(parents=True, exist_ok=True)
- except Exception:
- log.warning("清除缓存失败,请手动清除:data/temp")
-
-
-def recall_msg_dealer(msg: Union[str, dict]) -> str:
- if isinstance(msg, str):
- return msg
- temp_m = list()
- for i in msg:
- _type = i.get("type", "idk")
- _data = i.get("data", "idk")
- if _type == "text":
- temp_m.append(_data["text"])
- elif _type == "image":
- url = _data["url"]
- check = MessageChecker(url).check_image_url
- if check:
- temp_m.append(MessageSegment.image(url))
- else:
- temp_m.append(f"[该图片可能包含非法内容,源url:{url}]")
- elif _type == "face":
- temp_m.append(MessageSegment.face(_data["id"]))
- else:
- temp_m.append(f"[未知类型信息:{_data}]")
-
- repo = str().join(map(str, temp_m))
- return repo