import json from pathlib import Path from typing import Union from pip import main as pipmain import nonebot from ATRI.log import log from ATRI.utils import request from ATRI.service import Service, ServiceTools from .models import NonebotPluginInfo _NONEBOT_STORE_URL = ( "https://jsd.imki.moe/gh/nonebot/nonebot2/website/static/plugins.json" ) _plugin_list = dict() class NonebotPluginManager: _plugin_name = str() _conf_path = Path(".") / "nonebot_plugins.json" def get_list(self) -> list: if not self._conf_path.is_file(): with open(self._conf_path, "w", encoding="utf-8") as w: w.write(json.dumps(list())) with open(".env.prod", "w", encoding="utf-8") as w: w.write("# 请在此填写来自 Nonebot 商店的插件设置, 填写后需重启以生效") return json.loads(self._conf_path.read_bytes()) def revise_list(self, is_del: bool): data = self.get_list() if is_del: if self._plugin_name in data: data.remove(self._plugin_name) else: data.append(self._plugin_name) data = list(set(data)) with open(self._conf_path, "w", encoding="utf-8") as w: w.write(json.dumps(data)) def assign_plugin(self, plugin_name: str) -> "NonebotPluginManager": self._plugin_name = plugin_name return self async def get_store_list(self): global _plugin_list if not _plugin_list: try: data = await request.get(_NONEBOT_STORE_URL) _plugin_list = {plugin["module_name"]: plugin for plugin in data.json()} log.success("刷新 Nonebot 商店成功") except Exception: log.warning("刷新 Nonebot 商店失败") def get_plugin_info(self) -> Union[NonebotPluginInfo, None]: if plugin_data := _plugin_list.get(self._plugin_name): return NonebotPluginInfo.parse_obj(plugin_data) else: return None def plugin_is_exist(self, is_conf: bool = False) -> bool: if not is_conf: return bool(self.get_plugin_info()) else: return True if self._plugin_name in self.get_list() else False def add_plugin(self) -> str: if not self.plugin_is_exist(): return "未找到插件" try: pipmain(["install", self._plugin_name]) except Exception: return "插件下载失败" nonebot.load_plugin(self._plugin_name) self.revise_list(False) plugin_info = self.get_plugin_info() desc = plugin_info.desc + "\n" + plugin_info.homepage # type: ignore Service(self._plugin_name).document(desc).is_nonebot_plugin() return "完成~!" def remove_plugin(self) -> str: if not self.plugin_is_exist(): return "未找到插件" try: pipmain(["uninstall", "-y", self._plugin_name]) except Exception: return "插件包卸载失败, 请重启后再尝试" self.revise_list(True) try: ServiceTools(self._plugin_name).del_service() except Exception: return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json" return "完成~! 将在下次重启生效" @classmethod def upgrade_plugin(cls) -> list: if not (plugin_list := cls.get_list(cls)): return list() succ_list = list() for plugin in plugin_list: try: pipmain(["install", "--upgrade", plugin]) succ_list.append(plugin) log.success(f"Nonebot 插件 {plugin} 更新成功") except Exception: log.warning(f"Nonebot 插件 {plugin} 更新失败") return succ_list @classmethod def load_plugin(cls): plugin_list = cls.get_list(cls) for plugin in plugin_list: nonebot.load_plugin(plugin)