diff options
author | Kyomotoi <[email protected]> | 2023-01-28 22:38:48 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2023-01-28 22:38:48 +0800 |
commit | bb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c (patch) | |
tree | 0bf4e76243dbc918672a682da059e445557fb047 /ATRI | |
parent | 5ed9a6e902fdcec43f20d94d96bef201960f3e36 (diff) | |
download | ATRI-bb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c.tar.gz ATRI-bb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c.tar.bz2 ATRI-bb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c.zip |
✨ 新增 Nonebot 商店插件安装
Diffstat (limited to 'ATRI')
-rw-r--r-- | ATRI/plugins/manage/__init__.py | 86 | ||||
-rw-r--r-- | ATRI/plugins/manage/listener.py | 76 | ||||
-rw-r--r-- | ATRI/plugins/manage/models.py | 12 | ||||
-rw-r--r-- | ATRI/plugins/manage/plugin.py | 128 | ||||
-rw-r--r-- | ATRI/service.py | 13 |
5 files changed, 315 insertions, 0 deletions
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py index f79c53c..90d1a79 100644 --- a/ATRI/plugins/manage/__init__.py +++ b/ATRI/plugins/manage/__init__.py @@ -6,9 +6,11 @@ from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessage from ATRI.rule import to_bot from ATRI.service import Service +from ATRI.message import MessageBuilder from ATRI.permission import MASTER, ADMIN from .data_source import Manage +from .plugin import NonebotPluginManager plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER) @@ -441,3 +443,87 @@ async def _recall_msg(bot: Bot, event: MessageEvent): await recall_msg.finish("无法获取必要信息...没法撤回惹...") await bot.delete_msg(message_id=recall_id) + + +add_nonebot_plugin = plugin.on_command("添加插件", "添加来自 nonebot 商店的插件") + + +@add_nonebot_plugin.got("plguin_name", "插件名呢?") +async def _(plugin_name: str = ArgPlainText("plguin_name")): + nbm = NonebotPluginManager().assign_plugin(plugin_name) + + if nbm.plugin_is_exist(True): + await add_nonebot_plugin.finish("该插件已存在") + + if not (plugin_info := nbm.get_plugin_info()): + await add_nonebot_plugin.finish("未找到该插件") + + msg = ( + MessageBuilder(f"[{plugin_name}]") + .text(f"名称: {plugin_info.name}") + .text(f"说明: {plugin_info.desc}") + .text(f"作者: {plugin_info.author}") + .text(f"插件主页: {plugin_info.homepage}") + .text(f"{str() if plugin_info.is_official else '[!] 非'}官方插件") + ) + await add_nonebot_plugin.send(msg) + + +@add_nonebot_plugin.got("att", "是否安装(y/n)") +async def _( + att: str = ArgPlainText("att"), plugin_name: str = ArgPlainText("plguin_name") +): + if att not in ["y", "Y", "是"]: + await add_nonebot_plugin.finish("反悔了呢") + + nbm = NonebotPluginManager().assign_plugin(plugin_name) + result = nbm.add_plugin() + await add_nonebot_plugin.finish(result) + + +remove_nonebot_plugin = plugin.on_command( + "移除插件", "移除来自 nonebot 商店的插件", aliases={"删除插件", "卸载插件"} +) + + +@remove_nonebot_plugin.got("plugin_name", "要移除的插件名呢?") +@remove_nonebot_plugin.got("att", "确定吗(y/n)") +async def _( + att: str = ArgPlainText("att"), plugin_name: str = ArgPlainText("plugin_name") +): + if att not in ["y", "Y", "是"]: + await remove_nonebot_plugin.finish("反悔了呢") + + nbm = NonebotPluginManager().assign_plugin(plugin_name) + result = nbm.remove_plugin() + await remove_nonebot_plugin.finish(result) + + +upgrade_nonebot_plugin = plugin.on_command("更新插件", "更新来自 Nonebot 商店的插件", aliases={"升级插件"}) + + +@upgrade_nonebot_plugin.handle() +async def _(event: MessageEvent): + result = NonebotPluginManager.upgrade_plugin() + if not result: + await upgrade_nonebot_plugin.finish("当前没有插件可更新...") + + msg = "更新完成~! 成功更新:" + "\n".join(map(str, result)) + await upgrade_nonebot_plugin.finish(msg) + + +from ATRI import driver +from ATRI.utils.apscheduler import scheduler + +from .listener import init_listener + +driver().on_startup(init_listener) +driver().on_startup(NonebotPluginManager().get_store_list) +driver().on_startup(NonebotPluginManager.load_plugin) +scheduler.scheduled_job( + "interval", + name="Nonebot 商店刷新", + hours=1, + max_instances=3, + misfire_grace_time=60, +)(NonebotPluginManager().get_store_list) diff --git a/ATRI/plugins/manage/listener.py b/ATRI/plugins/manage/listener.py new file mode 100644 index 0000000..fb66a9c --- /dev/null +++ b/ATRI/plugins/manage/listener.py @@ -0,0 +1,76 @@ +import json + +from nonebot.matcher import Matcher +from nonebot.message import run_preprocessor +from nonebot.exception import IgnoredException +from nonebot.adapters.onebot.v11 import ( + MessageEvent, + GroupMessageEvent, + PrivateMessageEvent, +) + +from ATRI.service import ServiceTools + +from .data_source import MANAGE_DIR +from .plugin import * + + +@run_preprocessor +async def _(matcher: Matcher, event: MessageEvent): + plugin_name = str(matcher.plugin_name) + + if not "nonebot_" in plugin_name: + return + + if not "gocqhttp" in plugin_name: + serv = ServiceTools(plugin_name) + try: + serv.load_service() + except Exception: + raise IgnoredException(f"{plugin_name} limited") + + if not serv.auth_service(): + raise IgnoredException(f"{plugin_name} limited") + + if isinstance(event, PrivateMessageEvent): + user_id = event.get_user_id() + result = serv.auth_service(user_id) + elif isinstance(event, GroupMessageEvent): + user_id = event.get_user_id() + group_id = str(event.group_id) + result = serv.auth_service(user_id, group_id) + else: + result = True + + if not result: + raise IgnoredException(f"{plugin_name} limited") + + +@run_preprocessor +async def _(event: MessageEvent): + blockuser_file_path = MANAGE_DIR / "block_user.json" + if not blockuser_file_path.is_file(): + with open(blockuser_file_path, "w", encoding="utf-8") as w: + w.write(json.dumps(dict())) + + data = json.loads(blockuser_file_path.read_bytes()) + + user_id = event.get_user_id() + if user_id in data: + raise IgnoredException(f"Blocked user: {user_id}") + + if isinstance(event, GroupMessageEvent): + blockgroup_file_path = MANAGE_DIR / "block_group.json" + if not blockgroup_file_path.is_file(): + with open(blockgroup_file_path, "w", encoding="utf-8") as w: + w.write(json.dumps(dict())) + + data = json.loads(blockgroup_file_path.read_bytes()) + + group_id = str(event.group_id) + if group_id in data: + raise IgnoredException(f"Blocked group: {group_id}") + + +def init_listener(): + """初始化监听器""" diff --git a/ATRI/plugins/manage/models.py b/ATRI/plugins/manage/models.py new file mode 100644 index 0000000..90f8059 --- /dev/null +++ b/ATRI/plugins/manage/models.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel + + +class NonebotPluginInfo(BaseModel): + module_name: str + project_link: str + name: str + desc: str + author: str + homepage: str + tags: list + is_official: bool diff --git a/ATRI/plugins/manage/plugin.py b/ATRI/plugins/manage/plugin.py new file mode 100644 index 0000000..3fe2db7 --- /dev/null +++ b/ATRI/plugins/manage/plugin.py @@ -0,0 +1,128 @@ +import json +from pathlib import Path +from typing import Union +from pip import main as pipmain + +import nonebot + +from ATRI.log import log +from ATRI.utils import request +from ATRI.service import Service, ServiceTools + +from .models import NonebotPluginInfo + + +_NONEBOT_STORE_URL = ( + "https://jsd.imki.moe/gh/nonebot/nonebot2/website/static/plugins.json" +) + +_plugin_list = dict() + + +class NonebotPluginManager: + _plugin_name = str() + _conf_path = Path(".") / "nonebot_plugins.json" + + def get_list(self) -> list: + if not self._conf_path.is_file(): + with open(self._conf_path, "w", encoding="utf-8") as w: + w.write(json.dumps(list())) + + with open(".env.prod", "w", encoding="utf-8") as w: + w.write("# 请在此填写来自 Nonebot 商店的插件设置, 填写后需重启以生效") + + return json.loads(self._conf_path.read_bytes()) + + def revise_list(self, is_del: bool): + data = self.get_list() + if is_del: + if self._plugin_name in data: + data.remove(self._plugin_name) + else: + data.append(self._plugin_name) + data = list(set(data)) + + with open(self._conf_path, "w", encoding="utf-8") as w: + w.write(json.dumps(data)) + + def assign_plugin(self, plugin_name: str) -> "NonebotPluginManager": + self._plugin_name = plugin_name + return self + + async def get_store_list(self): + global _plugin_list + + if not _plugin_list: + try: + data = await request.get(_NONEBOT_STORE_URL) + _plugin_list = {plugin["module_name"]: plugin for plugin in data.json()} + log.success("刷新 Nonebot 商店成功") + except Exception: + log.warning("刷新 Nonebot 商店失败") + + def get_plugin_info(self) -> Union[NonebotPluginInfo, None]: + if plugin_data := _plugin_list.get(self._plugin_name): + return NonebotPluginInfo.parse_obj(plugin_data) + else: + return None + + def plugin_is_exist(self, is_conf: bool = False) -> bool: + if not is_conf: + return bool(self.get_plugin_info()) + else: + return True if self._plugin_name in self.get_list() else False + + def add_plugin(self) -> str: + if not self.plugin_is_exist(): + return "未找到插件" + + try: + pipmain(["install", self._plugin_name]) + except Exception: + return "插件下载失败" + + nonebot.load_plugin(self._plugin_name) + self.revise_list(False) + plugin_info = self.get_plugin_info() + desc = plugin_info.desc + "\n" + plugin_info.homepage # type: ignore + Service(self._plugin_name).document(desc).is_nonebot_plugin() + + return "完成~!" + + def remove_plugin(self) -> str: + if not self.plugin_is_exist(): + return "未找到插件" + + try: + pipmain(["uninstall", "-y", self._plugin_name]) + except Exception: + return "插件包卸载失败, 请重启后再尝试" + + self.revise_list(True) + try: + ServiceTools(self._plugin_name).del_service() + except Exception: + return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json" + return "完成~! 将在下次重启生效" + + @classmethod + def upgrade_plugin(cls) -> list: + if not (plugin_list := cls.get_list(cls)): + return list() + + succ_list = list() + for plugin in plugin_list: + try: + pipmain(["install", "--upgrade", plugin]) + succ_list.append(plugin) + log.success(f"Nonebot 插件 {plugin} 更新成功") + except Exception: + log.warning(f"Nonebot 插件 {plugin} 更新失败") + + return succ_list + + @classmethod + def load_plugin(cls): + plugin_list = cls.get_list(cls) + for plugin in plugin_list: + nonebot.load_plugin(plugin) diff --git a/ATRI/service.py b/ATRI/service.py index 4915c0a..d6d7746 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -145,6 +145,15 @@ class Service: self._main_cmd = (cmd,) return self + + def is_nonebot_plugin(self) -> "Service": + cmd_list = self.__load_cmds() + name = "请参考对应插件文档" + cmd_list[name] = CommandInfo( + type="ignore", docs=str(), aliases=list() + ).dict() + self.__save_cmds(cmd_list) + return self def get_path(self) -> Path: return self._path @@ -395,6 +404,10 @@ class ServiceTools: ) return ServiceInfo.parse_file(path) + + def del_service(self): + path = SERVICES_DIR / f"{self.service}.json" + path.unlink() def auth_service(self, user_id: str = str(), group_id: str = str()) -> bool: data = self.load_service() |