diff options
Diffstat (limited to 'ATRI/plugins/manage/plugin.py')
-rw-r--r-- | ATRI/plugins/manage/plugin.py | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/ATRI/plugins/manage/plugin.py b/ATRI/plugins/manage/plugin.py new file mode 100644 index 0000000..3fe2db7 --- /dev/null +++ b/ATRI/plugins/manage/plugin.py @@ -0,0 +1,128 @@ +import json +from pathlib import Path +from typing import Union +from pip import main as pipmain + +import nonebot + +from ATRI.log import log +from ATRI.utils import request +from ATRI.service import Service, ServiceTools + +from .models import NonebotPluginInfo + + +_NONEBOT_STORE_URL = ( + "https://jsd.imki.moe/gh/nonebot/nonebot2/website/static/plugins.json" +) + +_plugin_list = dict() + + +class NonebotPluginManager: + _plugin_name = str() + _conf_path = Path(".") / "nonebot_plugins.json" + + def get_list(self) -> list: + if not self._conf_path.is_file(): + with open(self._conf_path, "w", encoding="utf-8") as w: + w.write(json.dumps(list())) + + with open(".env.prod", "w", encoding="utf-8") as w: + w.write("# 请在此填写来自 Nonebot 商店的插件设置, 填写后需重启以生效") + + return json.loads(self._conf_path.read_bytes()) + + def revise_list(self, is_del: bool): + data = self.get_list() + if is_del: + if self._plugin_name in data: + data.remove(self._plugin_name) + else: + data.append(self._plugin_name) + data = list(set(data)) + + with open(self._conf_path, "w", encoding="utf-8") as w: + w.write(json.dumps(data)) + + def assign_plugin(self, plugin_name: str) -> "NonebotPluginManager": + self._plugin_name = plugin_name + return self + + async def get_store_list(self): + global _plugin_list + + if not _plugin_list: + try: + data = await request.get(_NONEBOT_STORE_URL) + _plugin_list = {plugin["module_name"]: plugin for plugin in data.json()} + log.success("刷新 Nonebot 商店成功") + except Exception: + log.warning("刷新 Nonebot 商店失败") + + def get_plugin_info(self) -> Union[NonebotPluginInfo, None]: + if plugin_data := _plugin_list.get(self._plugin_name): + return NonebotPluginInfo.parse_obj(plugin_data) + else: + return None + + def plugin_is_exist(self, is_conf: bool = False) -> bool: + if not is_conf: + return bool(self.get_plugin_info()) + else: + return True if self._plugin_name in self.get_list() else False + + def add_plugin(self) -> str: + if not self.plugin_is_exist(): + return "未找到插件" + + try: + pipmain(["install", self._plugin_name]) + except Exception: + return "插件下载失败" + + nonebot.load_plugin(self._plugin_name) + self.revise_list(False) + plugin_info = self.get_plugin_info() + desc = plugin_info.desc + "\n" + plugin_info.homepage # type: ignore + Service(self._plugin_name).document(desc).is_nonebot_plugin() + + return "完成~!" + + def remove_plugin(self) -> str: + if not self.plugin_is_exist(): + return "未找到插件" + + try: + pipmain(["uninstall", "-y", self._plugin_name]) + except Exception: + return "插件包卸载失败, 请重启后再尝试" + + self.revise_list(True) + try: + ServiceTools(self._plugin_name).del_service() + except Exception: + return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json" + return "完成~! 将在下次重启生效" + + @classmethod + def upgrade_plugin(cls) -> list: + if not (plugin_list := cls.get_list(cls)): + return list() + + succ_list = list() + for plugin in plugin_list: + try: + pipmain(["install", "--upgrade", plugin]) + succ_list.append(plugin) + log.success(f"Nonebot 插件 {plugin} 更新成功") + except Exception: + log.warning(f"Nonebot 插件 {plugin} 更新失败") + + return succ_list + + @classmethod + def load_plugin(cls): + plugin_list = cls.get_list(cls) + for plugin in plugin_list: + nonebot.load_plugin(plugin) |