summaryrefslogtreecommitdiff
path: root/ATRI
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2023-01-28 22:38:48 +0800
committerKyomotoi <[email protected]>2023-01-28 22:38:48 +0800
commitbb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c (patch)
tree0bf4e76243dbc918672a682da059e445557fb047 /ATRI
parent5ed9a6e902fdcec43f20d94d96bef201960f3e36 (diff)
downloadATRI-bb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c.tar.gz
ATRI-bb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c.tar.bz2
ATRI-bb52f7b70dd3b5def1a938fbbc3c0ee08ea3a77c.zip
✨ 新增 Nonebot 商店插件安装
Diffstat (limited to 'ATRI')
-rw-r--r--ATRI/plugins/manage/__init__.py86
-rw-r--r--ATRI/plugins/manage/listener.py76
-rw-r--r--ATRI/plugins/manage/models.py12
-rw-r--r--ATRI/plugins/manage/plugin.py128
-rw-r--r--ATRI/service.py13
5 files changed, 315 insertions, 0 deletions
diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py
index f79c53c..90d1a79 100644
--- a/ATRI/plugins/manage/__init__.py
+++ b/ATRI/plugins/manage/__init__.py
@@ -6,9 +6,11 @@ from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessage
from ATRI.rule import to_bot
from ATRI.service import Service
+from ATRI.message import MessageBuilder
from ATRI.permission import MASTER, ADMIN
from .data_source import Manage
+from .plugin import NonebotPluginManager
plugin = Service("管理").document("控制bot的各项服务").only_admin(True).permission(MASTER)
@@ -441,3 +443,87 @@ async def _recall_msg(bot: Bot, event: MessageEvent):
await recall_msg.finish("无法获取必要信息...没法撤回惹...")
await bot.delete_msg(message_id=recall_id)
+
+
+add_nonebot_plugin = plugin.on_command("添加插件", "添加来自 nonebot 商店的插件")
+
+
+@add_nonebot_plugin.got("plguin_name", "插件名呢?")
+async def _(plugin_name: str = ArgPlainText("plguin_name")):
+ nbm = NonebotPluginManager().assign_plugin(plugin_name)
+
+ if nbm.plugin_is_exist(True):
+ await add_nonebot_plugin.finish("该插件已存在")
+
+ if not (plugin_info := nbm.get_plugin_info()):
+ await add_nonebot_plugin.finish("未找到该插件")
+
+ msg = (
+ MessageBuilder(f"[{plugin_name}]")
+ .text(f"名称: {plugin_info.name}")
+ .text(f"说明: {plugin_info.desc}")
+ .text(f"作者: {plugin_info.author}")
+ .text(f"插件主页: {plugin_info.homepage}")
+ .text(f"{str() if plugin_info.is_official else '[!] 非'}官方插件")
+ )
+ await add_nonebot_plugin.send(msg)
+
+
+@add_nonebot_plugin.got("att", "是否安装(y/n)")
+async def _(
+ att: str = ArgPlainText("att"), plugin_name: str = ArgPlainText("plguin_name")
+):
+ if att not in ["y", "Y", "是"]:
+ await add_nonebot_plugin.finish("反悔了呢")
+
+ nbm = NonebotPluginManager().assign_plugin(plugin_name)
+ result = nbm.add_plugin()
+ await add_nonebot_plugin.finish(result)
+
+
+remove_nonebot_plugin = plugin.on_command(
+ "移除插件", "移除来自 nonebot 商店的插件", aliases={"删除插件", "卸载插件"}
+)
+
+
+@remove_nonebot_plugin.got("plugin_name", "要移除的插件名呢?")
+@remove_nonebot_plugin.got("att", "确定吗(y/n)")
+async def _(
+ att: str = ArgPlainText("att"), plugin_name: str = ArgPlainText("plugin_name")
+):
+ if att not in ["y", "Y", "是"]:
+ await remove_nonebot_plugin.finish("反悔了呢")
+
+ nbm = NonebotPluginManager().assign_plugin(plugin_name)
+ result = nbm.remove_plugin()
+ await remove_nonebot_plugin.finish(result)
+
+
+upgrade_nonebot_plugin = plugin.on_command("更新插件", "更新来自 Nonebot 商店的插件", aliases={"升级插件"})
+
+
+@upgrade_nonebot_plugin.handle()
+async def _(event: MessageEvent):
+ result = NonebotPluginManager.upgrade_plugin()
+ if not result:
+ await upgrade_nonebot_plugin.finish("当前没有插件可更新...")
+
+ msg = "更新完成~! 成功更新:" + "\n".join(map(str, result))
+ await upgrade_nonebot_plugin.finish(msg)
+
+
+from ATRI import driver
+from ATRI.utils.apscheduler import scheduler
+
+from .listener import init_listener
+
+driver().on_startup(init_listener)
+driver().on_startup(NonebotPluginManager().get_store_list)
+driver().on_startup(NonebotPluginManager.load_plugin)
+scheduler.scheduled_job(
+ "interval",
+ name="Nonebot 商店刷新",
+ hours=1,
+ max_instances=3,
+ misfire_grace_time=60,
+)(NonebotPluginManager().get_store_list)
diff --git a/ATRI/plugins/manage/listener.py b/ATRI/plugins/manage/listener.py
new file mode 100644
index 0000000..fb66a9c
--- /dev/null
+++ b/ATRI/plugins/manage/listener.py
@@ -0,0 +1,76 @@
+import json
+
+from nonebot.matcher import Matcher
+from nonebot.message import run_preprocessor
+from nonebot.exception import IgnoredException
+from nonebot.adapters.onebot.v11 import (
+ MessageEvent,
+ GroupMessageEvent,
+ PrivateMessageEvent,
+)
+
+from ATRI.service import ServiceTools
+
+from .data_source import MANAGE_DIR
+from .plugin import *
+
+
+@run_preprocessor
+async def _(matcher: Matcher, event: MessageEvent):
+ plugin_name = str(matcher.plugin_name)
+
+ if not "nonebot_" in plugin_name:
+ return
+
+ if not "gocqhttp" in plugin_name:
+ serv = ServiceTools(plugin_name)
+ try:
+ serv.load_service()
+ except Exception:
+ raise IgnoredException(f"{plugin_name} limited")
+
+ if not serv.auth_service():
+ raise IgnoredException(f"{plugin_name} limited")
+
+ if isinstance(event, PrivateMessageEvent):
+ user_id = event.get_user_id()
+ result = serv.auth_service(user_id)
+ elif isinstance(event, GroupMessageEvent):
+ user_id = event.get_user_id()
+ group_id = str(event.group_id)
+ result = serv.auth_service(user_id, group_id)
+ else:
+ result = True
+
+ if not result:
+ raise IgnoredException(f"{plugin_name} limited")
+
+
+@run_preprocessor
+async def _(event: MessageEvent):
+ blockuser_file_path = MANAGE_DIR / "block_user.json"
+ if not blockuser_file_path.is_file():
+ with open(blockuser_file_path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(dict()))
+
+ data = json.loads(blockuser_file_path.read_bytes())
+
+ user_id = event.get_user_id()
+ if user_id in data:
+ raise IgnoredException(f"Blocked user: {user_id}")
+
+ if isinstance(event, GroupMessageEvent):
+ blockgroup_file_path = MANAGE_DIR / "block_group.json"
+ if not blockgroup_file_path.is_file():
+ with open(blockgroup_file_path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(dict()))
+
+ data = json.loads(blockgroup_file_path.read_bytes())
+
+ group_id = str(event.group_id)
+ if group_id in data:
+ raise IgnoredException(f"Blocked group: {group_id}")
+
+
+def init_listener():
+ """初始化监听器"""
diff --git a/ATRI/plugins/manage/models.py b/ATRI/plugins/manage/models.py
new file mode 100644
index 0000000..90f8059
--- /dev/null
+++ b/ATRI/plugins/manage/models.py
@@ -0,0 +1,12 @@
+from pydantic import BaseModel
+
+
+class NonebotPluginInfo(BaseModel):
+ module_name: str
+ project_link: str
+ name: str
+ desc: str
+ author: str
+ homepage: str
+ tags: list
+ is_official: bool
diff --git a/ATRI/plugins/manage/plugin.py b/ATRI/plugins/manage/plugin.py
new file mode 100644
index 0000000..3fe2db7
--- /dev/null
+++ b/ATRI/plugins/manage/plugin.py
@@ -0,0 +1,128 @@
+import json
+from pathlib import Path
+from typing import Union
+from pip import main as pipmain
+
+import nonebot
+
+from ATRI.log import log
+from ATRI.utils import request
+from ATRI.service import Service, ServiceTools
+
+from .models import NonebotPluginInfo
+
+
+_NONEBOT_STORE_URL = (
+ "https://jsd.imki.moe/gh/nonebot/nonebot2/website/static/plugins.json"
+)
+
+_plugin_list = dict()
+
+
+class NonebotPluginManager:
+ _plugin_name = str()
+ _conf_path = Path(".") / "nonebot_plugins.json"
+
+ def get_list(self) -> list:
+ if not self._conf_path.is_file():
+ with open(self._conf_path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(list()))
+
+ with open(".env.prod", "w", encoding="utf-8") as w:
+ w.write("# 请在此填写来自 Nonebot 商店的插件设置, 填写后需重启以生效")
+
+ return json.loads(self._conf_path.read_bytes())
+
+ def revise_list(self, is_del: bool):
+ data = self.get_list()
+ if is_del:
+ if self._plugin_name in data:
+ data.remove(self._plugin_name)
+ else:
+ data.append(self._plugin_name)
+ data = list(set(data))
+
+ with open(self._conf_path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data))
+
+ def assign_plugin(self, plugin_name: str) -> "NonebotPluginManager":
+ self._plugin_name = plugin_name
+ return self
+
+ async def get_store_list(self):
+ global _plugin_list
+
+ if not _plugin_list:
+ try:
+ data = await request.get(_NONEBOT_STORE_URL)
+ _plugin_list = {plugin["module_name"]: plugin for plugin in data.json()}
+ log.success("刷新 Nonebot 商店成功")
+ except Exception:
+ log.warning("刷新 Nonebot 商店失败")
+
+ def get_plugin_info(self) -> Union[NonebotPluginInfo, None]:
+ if plugin_data := _plugin_list.get(self._plugin_name):
+ return NonebotPluginInfo.parse_obj(plugin_data)
+ else:
+ return None
+
+ def plugin_is_exist(self, is_conf: bool = False) -> bool:
+ if not is_conf:
+ return bool(self.get_plugin_info())
+ else:
+ return True if self._plugin_name in self.get_list() else False
+
+ def add_plugin(self) -> str:
+ if not self.plugin_is_exist():
+ return "未找到插件"
+
+ try:
+ pipmain(["install", self._plugin_name])
+ except Exception:
+ return "插件下载失败"
+
+ nonebot.load_plugin(self._plugin_name)
+ self.revise_list(False)
+ plugin_info = self.get_plugin_info()
+ desc = plugin_info.desc + "\n" + plugin_info.homepage # type: ignore
+ Service(self._plugin_name).document(desc).is_nonebot_plugin()
+
+ return "完成~!"
+
+ def remove_plugin(self) -> str:
+ if not self.plugin_is_exist():
+ return "未找到插件"
+
+ try:
+ pipmain(["uninstall", "-y", self._plugin_name])
+ except Exception:
+ return "插件包卸载失败, 请重启后再尝试"
+
+ self.revise_list(True)
+ try:
+ ServiceTools(self._plugin_name).del_service()
+ except Exception:
+ return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json"
+ return "完成~! 将在下次重启生效"
+
+ @classmethod
+ def upgrade_plugin(cls) -> list:
+ if not (plugin_list := cls.get_list(cls)):
+ return list()
+
+ succ_list = list()
+ for plugin in plugin_list:
+ try:
+ pipmain(["install", "--upgrade", plugin])
+ succ_list.append(plugin)
+ log.success(f"Nonebot 插件 {plugin} 更新成功")
+ except Exception:
+ log.warning(f"Nonebot 插件 {plugin} 更新失败")
+
+ return succ_list
+
+ @classmethod
+ def load_plugin(cls):
+ plugin_list = cls.get_list(cls)
+ for plugin in plugin_list:
+ nonebot.load_plugin(plugin)
diff --git a/ATRI/service.py b/ATRI/service.py
index 4915c0a..d6d7746 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -145,6 +145,15 @@ class Service:
self._main_cmd = (cmd,)
return self
+
+ def is_nonebot_plugin(self) -> "Service":
+ cmd_list = self.__load_cmds()
+ name = "请参考对应插件文档"
+ cmd_list[name] = CommandInfo(
+ type="ignore", docs=str(), aliases=list()
+ ).dict()
+ self.__save_cmds(cmd_list)
+ return self
def get_path(self) -> Path:
return self._path
@@ -395,6 +404,10 @@ class ServiceTools:
)
return ServiceInfo.parse_file(path)
+
+ def del_service(self):
+ path = SERVICES_DIR / f"{self.service}.json"
+ path.unlink()
def auth_service(self, user_id: str = str(), group_id: str = str()) -> bool:
data = self.load_service()