blob: b4a28cb8f6cb69fc80fa04fb4f043a9a2053ddfb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
import json
import socket
import string
import zipfile
from random import sample
from pathlib import Path
from ATRI.utils import request
from ATRI.exceptions import WriteFileError
from ATRI.log import log
CONSOLE_DIR = Path(".") / "data" / "plugins" / "console"
CONSOLE_DIR.mkdir(parents=True, exist_ok=True)
class Console:
@staticmethod
async def get_host_ip(is_pub: bool):
if is_pub:
data = await request.get("https://ifconfig.me/ip")
return data.text
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
return ip
finally:
if s:
s.close()
@staticmethod
def get_random_str(k: int) -> str:
return "".join(sample(string.ascii_letters + string.digits, k))
@staticmethod
def get_auth_info() -> dict:
df = CONSOLE_DIR / "data.json"
if not df.is_file():
try:
with open(df, "w", encoding="utf-8") as w:
w.write(json.dumps(dict()))
except Exception:
raise WriteFileError("Writing file: " + str(df) + " failed!")
base_data: dict = json.loads(df.read_bytes())
data = base_data.get("data", None)
if not data:
return {"data": None}
return data
FRONTEND_DIR = CONSOLE_DIR / "frontend"
FRONTEND_DIR.mkdir(parents=True, exist_ok=True)
CONSOLE_RESOURCE_URL = (
"https://jsd.imki.moe/gh/kyomotoi/Project-ATRI-Console@main/archive/dist.zip"
)
async def init_resource():
log.info("控制台初始化中...")
try:
resp = await request.get(CONSOLE_RESOURCE_URL)
except Exception:
log.error("控制台资源装载失败, 将无法访问管理界面")
return
file_path = CONSOLE_DIR / "dist.zip"
with open(file_path, "wb") as w:
w.write(resp.read())
with zipfile.ZipFile(file_path, "r") as zr:
zr.extractall(FRONTEND_DIR)
log.success("控制台初始化完成")
|