summaryrefslogtreecommitdiff
path: root/ATRI/plugins/console/data_source.py
blob: db0b6b104b7c92d25fd3e3a3dfe5c2d1a79a2bed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import json
import socket
import string
import zipfile
from random import sample
from pathlib import Path

from nonebot.permission import SUPERUSER

from ATRI.service import Service
from ATRI.utils import request
from ATRI.rule import is_in_service
from ATRI.exceptions import WriteFileError
from ATRI.log import logger as log


CONSOLE_DIR = Path(".") / "data" / "plugins" / "console"
CONSOLE_DIR.mkdir(parents=True, exist_ok=True)


class Console(Service):
    def __init__(self):
        Service.__init__(
            self,
            "控制台",
            "前端管理页面",
            True,
            is_in_service("控制台"),
            main_cmd="/con",
            permission=SUPERUSER,
        )

    @staticmethod
    async def get_host_ip(is_pub: bool):
        if is_pub:
            data = await request.get("https://ifconfig.me/ip")
            return data.text

        s = None
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("8.8.8.8", 80))
            ip = s.getsockname()[0]
            return ip
        finally:
            if s:
                s.close()

    @staticmethod
    def get_random_str(k: int) -> str:
        return "".join(sample(string.ascii_letters + string.digits, k))

    @staticmethod
    def get_auth_info() -> dict:
        df = CONSOLE_DIR / "data.json"
        if not df.is_file():
            try:
                with open(df, "w", encoding="utf-8") as w:
                    w.write(json.dumps({}))
            except Exception:
                raise WriteFileError("Writing file: " + str(df) + " failed!")

        base_data: dict = json.loads(df.read_bytes())
        data = base_data.get("data", None)
        if not data:
            return {"data": None}
        return data


FRONTEND_DIR = CONSOLE_DIR / "frontend"
FRONTEND_DIR.mkdir(parents=True, exist_ok=True)
CONSOLE_RESOURCE_URL = (
    "https://jsd.imki.moe/gh/kyomotoi/Project-ATRI-Console@main/archive/dist.zip"
)


async def init_resource():
    log.info("控制台初始化中...")

    try:
        resp = await request.get(CONSOLE_RESOURCE_URL)
    except Exception:
        log.error("控制台资源装载失败, 将无法访问管理界面")
        return
    print(len(resp.read()))
    file_path = CONSOLE_DIR / "dist.zip"
    with open(file_path, "wb") as w:
        w.write(resp.read())

    with zipfile.ZipFile(file_path, "r") as zr:
        zr.extractall(FRONTEND_DIR)

    log.success("控制台初始化完成")