summaryrefslogtreecommitdiff
path: root/ATRI/plugins/manage/plugin.py
blob: cfe4c7baf3922bb9ead79c9797649e60fc87420f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import json
from pathlib import Path
from typing import Union
from pip import main as pipmain
from asyncio import as_completed

import nonebot

from ATRI.log import log
from ATRI.utils import request
from ATRI.service import Service, ServiceTools

from .models import NonebotPluginInfo


_NONEBOT_STORE_URLS = [
    "https://registry.nonebot.dev/plugins.json",
    "https://jsd.imki.moe/gh/nonebot/registry@results/plugins.json",
    "https://cdn.staticaly.com/gh/nonebot/registry@results/plugins.json",
    "https://jsd.cdn.zzko.cn/gh/nonebot/registry@results/plugins.json",
    "https://ghproxy.com/https://raw.githubusercontent.com/nonebot/registry/results/plugins.json",
]

_plugin_list = dict()


class NonebotPluginManager:
    _plugin_name = str()
    _conf_path = Path(".") / "nonebot_plugins.json"

    def get_list(self) -> list:
        if not self._conf_path.is_file():
            with open(self._conf_path, "w", encoding="utf-8") as w:
                w.write(json.dumps(list()))

            with open(".env.prod", "w", encoding="utf-8") as w:
                w.write("# 请在此填写来自 Nonebot 商店的插件设置, 填写后需重启以生效")

        return json.loads(self._conf_path.read_bytes())

    def revise_list(self, is_del: bool) -> None:
        data = self.get_list()
        if is_del:
            if self._plugin_name in data:
                data.remove(self._plugin_name)
        else:
            data.append(self._plugin_name)
            data = list(set(data))

        with open(self._conf_path, "w", encoding="utf-8") as w:
            w.write(json.dumps(data))

    def assign_plugin(self, plugin_name: str) -> "NonebotPluginManager":
        self._plugin_name = plugin_name
        return self

    async def get_store_list(self) -> None:
        global _plugin_list

        if not _plugin_list:
            tasks = [request.get(url) for url in _NONEBOT_STORE_URLS]
            for future in as_completed(tasks, timeout=114514):
                try:
                    data = await future
                    _plugin_list = {
                        plugin["module_name"]: plugin for plugin in data.json()
                    }
                    log.success("刷新 Nonebot 商店成功")
                    return
                except Exception:
                    log.warning("刷新 Nonebot 商店失败, 尝试下一个链接")

    def get_plugin_info(self) -> Union[NonebotPluginInfo, None]:
        if plugin_data := _plugin_list.get(self._plugin_name):
            return NonebotPluginInfo.parse_obj(plugin_data)
        else:
            return None

    def plugin_is_exist(self, is_conf: bool = False) -> bool:
        if not is_conf:
            return bool(self.get_plugin_info())
        else:
            return True if self._plugin_name in self.get_list() else False

    def add_plugin(self) -> str:
        if not self.plugin_is_exist():
            return "未找到插件"

        try:
            pipmain(["install", self._plugin_name])
        except Exception:
            return "插件下载失败"

        nonebot.load_plugin(self._plugin_name)
        self.revise_list(False)
        plugin_info = self.get_plugin_info()
        desc = plugin_info.desc + "\n" + plugin_info.homepage  # type: ignore
        Service(self._plugin_name).document(desc).is_nonebot_plugin()

        return "完成~!"

    def remove_plugin(self) -> str:
        if not self.plugin_is_exist():
            return "未找到插件"

        try:
            pipmain(["uninstall", "-y", self._plugin_name])
        except Exception:
            return "插件包卸载失败, 请重启后再尝试"

        self.revise_list(True)
        try:
            ServiceTools(self._plugin_name).del_service()
        except Exception:
            return f"部分完成: 信息文件删除失败, 路径: data/services/{self._plugin_name}.json"
        return "完成~! 将在下次重启生效"

    def upgrade_plugin(self) -> list:
        if not (plugin_list := self.get_list()):
            return list()

        succ_list = list()
        for plugin in plugin_list:
            try:
                pipmain(["install", "--upgrade", plugin])
                succ_list.append(plugin)
                log.success(f"Nonebot 插件 {plugin} 更新成功")
            except Exception:
                log.warning(f"Nonebot 插件 {plugin} 更新失败")

        return succ_list

    def load_plugin(self) -> None:
        plugin_list = self.get_list()
        for plugin in plugin_list:
            nonebot.load_plugin(plugin)