1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
import json
import socket
import string
import zipfile
from random import sample
from pathlib import Path
from nonebot.permission import SUPERUSER
from ATRI.service import Service
from ATRI.utils import request
from ATRI.rule import is_in_service
from ATRI.exceptions import WriteFileError
from ATRI.log import logger as log
CONSOLE_DIR = Path(".") / "data" / "plugins" / "console"
CONSOLE_DIR.mkdir(parents=True, exist_ok=True)
class Console(Service):
def __init__(self):
Service.__init__(
self,
"控制台",
"前端管理页面",
True,
is_in_service("控制台"),
main_cmd="/con",
permission=SUPERUSER,
)
@staticmethod
async def get_host_ip(is_pub: bool):
if is_pub:
data = await request.get("https://ifconfig.me/ip")
return data.text
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
return ip
finally:
if s:
s.close()
@staticmethod
def get_random_str(k: int) -> str:
return "".join(sample(string.ascii_letters + string.digits, k))
@staticmethod
def get_auth_info() -> dict:
df = CONSOLE_DIR / "data.json"
if not df.is_file():
try:
with open(df, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
except Exception:
raise WriteFileError("Writing file: " + str(df) + " failed!")
base_data: dict = json.loads(df.read_bytes())
data = base_data.get("data", None)
if not data:
return {"data": None}
return data
FRONTEND_DIR = CONSOLE_DIR / "frontend"
FRONTEND_DIR.mkdir(parents=True, exist_ok=True)
CONSOLE_RESOURCE_URL = (
"https://jsd.imki.moe/gh/kyomotoi/Project-ATRI-Console@main/archive/dist.zip"
)
async def init_resource():
log.info("控制台初始化中...")
try:
resp = await request.get(CONSOLE_RESOURCE_URL)
except Exception:
log.error("控制台资源装载失败, 将无法访问管理界面")
return
file_path = CONSOLE_DIR / "dist.zip"
with open(file_path, "wb") as w:
w.write(resp.read())
with zipfile.ZipFile(file_path, "r") as zr:
zr.extractall(FRONTEND_DIR)
log.success("控制台初始化完成")
|