summaryrefslogtreecommitdiff
path: root/ATRI/plugins/console/data_source.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/console/data_source.py')
-rw-r--r--ATRI/plugins/console/data_source.py96
1 files changed, 56 insertions, 40 deletions
diff --git a/ATRI/plugins/console/data_source.py b/ATRI/plugins/console/data_source.py
index b4a28cb..ab6b51b 100644
--- a/ATRI/plugins/console/data_source.py
+++ b/ATRI/plugins/console/data_source.py
@@ -2,59 +2,75 @@ import json
import socket
import string
import zipfile
-from random import sample
+import hashlib
from pathlib import Path
+from random import sample
+from typing import Optional
+from datetime import datetime, timedelta
-from ATRI.utils import request
-from ATRI.exceptions import WriteFileError
+from ATRI.utils import request, FileDealer
from ATRI.log import log
+from .models import AuthData
+
CONSOLE_DIR = Path(".") / "data" / "plugins" / "console"
CONSOLE_DIR.mkdir(parents=True, exist_ok=True)
-class Console:
- @staticmethod
- async def get_host_ip(is_pub: bool):
- if is_pub:
- data = await request.get("https://ifconfig.me/ip")
- return data.text
-
- s = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(("8.8.8.8", 80))
- ip = s.getsockname()[0]
- return ip
- finally:
- if s:
- s.close()
-
- @staticmethod
- def get_random_str(k: int) -> str:
- return "".join(sample(string.ascii_letters + string.digits, k))
-
- @staticmethod
- def get_auth_info() -> dict:
- df = CONSOLE_DIR / "data.json"
- if not df.is_file():
- try:
- with open(df, "w", encoding="utf-8") as w:
- w.write(json.dumps(dict()))
- except Exception:
- raise WriteFileError("Writing file: " + str(df) + " failed!")
-
- base_data: dict = json.loads(df.read_bytes())
- data = base_data.get("data", None)
- if not data:
- return {"data": None}
+class AuthDealer:
+ AUTH_FILE_PATH = CONSOLE_DIR / "data.json"
+
+ def __init__(self):
+ self.token = str().join(sample(string.ascii_letters + string.digits, 20))
+
+ def get_token(self):
+ return self.token
+
+ def get_md5(self):
+ hl = hashlib.md5()
+ hl.update(self.token.encode(encoding="utf-8"))
+ return hl.hexdigest()
+
+ async def store(self) -> AuthData:
+ dead_time = (datetime.now() + timedelta(minutes=15)).timestamp()
+ data = AuthData(token=self.token, md5=self.get_md5(), dead_time=int(dead_time))
+ await FileDealer(self.AUTH_FILE_PATH).write(json.dumps(data.dict()))
return data
+ @classmethod
+ def get(cls) -> Optional[AuthData]:
+ return (
+ AuthData.parse_file(cls.AUTH_FILE_PATH)
+ if cls.AUTH_FILE_PATH.is_file()
+ and json.loads(cls.AUTH_FILE_PATH.read_bytes())
+ else None
+ )
+
+ @classmethod
+ async def clear(cls):
+ await FileDealer(cls.AUTH_FILE_PATH).write(json.dumps(dict()))
+
+
+async def get_host_ip(is_pub: bool):
+ if is_pub:
+ data = await request.get("https://ifconfig.me/ip")
+ return data.text
+
+ s = None
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.connect(("8.8.8.8", 80))
+ ip = s.getsockname()[0]
+ return ip
+ finally:
+ if s:
+ s.close()
+
FRONTEND_DIR = CONSOLE_DIR / "frontend"
FRONTEND_DIR.mkdir(parents=True, exist_ok=True)
-CONSOLE_RESOURCE_URL = (
+__CONSOLE_RESOURCE_URL = (
"https://jsd.imki.moe/gh/kyomotoi/Project-ATRI-Console@main/archive/dist.zip"
)
@@ -63,7 +79,7 @@ async def init_resource():
log.info("控制台初始化中...")
try:
- resp = await request.get(CONSOLE_RESOURCE_URL)
+ resp = await request.get(__CONSOLE_RESOURCE_URL)
except Exception:
log.error("控制台资源装载失败, 将无法访问管理界面")
return