summaryrefslogtreecommitdiff
path: root/ATRI/plugins/manage/plugin.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/manage/plugin.py')
1 files changed, 128 insertions, 0 deletions
diff --git a/ATRI/plugins/manage/plugin.py b/ATRI/plugins/manage/plugin.py
new file mode 100644
index 0000000..3fe2db7
--- /dev/null
+++ b/ATRI/plugins/manage/plugin.py
@@ -0,0 +1,128 @@
+import json
+from pathlib import Path
+from typing import Union
+from pip import main as pipmain
+
+import nonebot
+
+from ATRI.log import log
+from ATRI.utils import request
+from ATRI.service import Service, ServiceTools
+
+from .models import NonebotPluginInfo
+
+
+_NONEBOT_STORE_URL = (
+ "https://jsd.imki.moe/gh/nonebot/nonebot2/website/static/plugins.json"
+)
+
+_plugin_list = dict()
+
+
+class NonebotPluginManager:
+ _plugin_name = str()
+ _conf_path = Path(".") / "nonebot_plugins.json"
+
+ def get_list(self) -> list:
+ if not self._conf_path.is_file():
+ with open(self._conf_path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(list()))
+
+ with open(".env.prod", "w", encoding="utf-8") as w:
+ w.write("# 请在此填写来自 Nonebot 商店的插件设置, 填写后需重启以生效")
+
+ return json.loads(self._conf_path.read_bytes())
+
+ def revise_list(self, is_del: bool):
+ data = self.get_list()
+ if is_del:
+ if self._plugin_name in data:
+ data.remove(self._plugin_name)
+ else:
+ data.append(self._plugin_name)
+ data = list(set(data))
+
+ with open(self._conf_path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data))
+
+ def assign_plugin(self, plugin_name: str) -> "NonebotPluginManager":
+ self._plugin_name = plugin_name
+ return self
+
+ async def get_store_list(self):
+ global _plugin_list
+
+ if not _plugin_list:
+ try:
+ data = await request.get(_NONEBOT_STORE_URL)
+ _plugin_list = {plugin["module_name"]: plugin for plugin in data.json()}
+ log.success("刷新 Nonebot 商店成功")
+ except Exception:
+ log.warning("刷新 Nonebot 商店失败")
+
+ def get_plugin_info(self) -> Union[NonebotPluginInfo, None]:
+ if plugin_data := _plugin_list.get(self._plugin_name):
+ return NonebotPluginInfo.parse_obj(plugin_data)
+ else:
+ return None
+
+ def plugin_is_exist(self, is_conf: bool = False) -> bool:
+ if not is_conf:
+ return bool(self.get_plugin_info())
+ else:
+ return True if self._plugin_name in self.get_list() else False
+
+ def add_plugin(self) -> str:
+ if not self.plugin_is_exist():
+ return "未找到插件"
+
+ try:
+ pipmain(["install", self._plugin_name])
+ except Exception:
+ return "插件下载失败"
+
+ nonebot.load_plugin(self._plugin_name)
+ self.revise_list(False)
+ plugin_info = self.get_plugin_info()
+ desc = plugin_info.desc + "\n" + plugin_info.homepage # type: ignore
+ Service(self._plugin_name).document(desc).is_nonebot_plugin()
+
+ return "完成~!"
+
+ def remove_plugin(self) -> str:
+ if not self.plugin_is_exist():
+ return "未找到插件"
+
+ try:
+ pipmain(["uninstall", "-y", self._plugin_name])
+ except Exception:
+ return "插件包卸载失败, 请重启后再尝试"
+
+ self.revise_list(True)
+ try:
+ ServiceTools(self._plugin_name).del_service()
+ except Exception:
+ return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json"
+ return "完成~! 将在下次重启生效"
+
+ @classmethod
+ def upgrade_plugin(cls) -> list:
+ if not (plugin_list := cls.get_list(cls)):
+ return list()
+
+ succ_list = list()
+ for plugin in plugin_list:
+ try:
+ pipmain(["install", "--upgrade", plugin])
+ succ_list.append(plugin)
+ log.success(f"Nonebot 插件 {plugin} 更新成功")
+ except Exception:
+ log.warning(f"Nonebot 插件 {plugin} 更新失败")
+
+ return succ_list
+
+ @classmethod
+ def load_plugin(cls):
+ plugin_list = cls.get_list(cls)
+ for plugin in plugin_list:
+ nonebot.load_plugin(plugin)