summaryrefslogtreecommitdiff
path: root/ATRI/plugins/console/data_source.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/console/data_source.py')
-rw-r--r--ATRI/plugins/console/data_source.py93
1 files changed, 0 insertions, 93 deletions
diff --git a/ATRI/plugins/console/data_source.py b/ATRI/plugins/console/data_source.py
deleted file mode 100644
index cab951d..0000000
--- a/ATRI/plugins/console/data_source.py
+++ /dev/null
@@ -1,93 +0,0 @@
-import json
-import socket
-import string
-import hashlib
-from pathlib import Path
-from random import sample
-from typing import Optional
-from datetime import datetime, timedelta
-
-from ATRI.utils import request, FileDealer
-from ATRI.log import log
-
-from .models import AuthData
-
-
-CONSOLE_DIR = Path(".") / "data" / "plugins" / "console"
-CONSOLE_DIR.mkdir(parents=True, exist_ok=True)
-
-
-class AuthDealer:
- AUTH_FILE_PATH = CONSOLE_DIR / "data.json"
-
- def __init__(self):
- self.token = str().join(sample(string.ascii_letters + string.digits, 20))
-
- def get_token(self):
- return self.token
-
- def get_md5(self):
- hl = hashlib.md5()
- hl.update(self.token.encode(encoding="utf-8"))
- return hl.hexdigest()
-
- async def store(self) -> AuthData:
- dead_time = (datetime.now() + timedelta(minutes=15)).timestamp()
- data = AuthData(token=self.token, md5=self.get_md5(), dead_time=int(dead_time))
- await FileDealer(self.AUTH_FILE_PATH).write_json(data.dict())
- return data
-
- @classmethod
- def get(cls) -> Optional[AuthData]:
- return (
- AuthData.parse_file(cls.AUTH_FILE_PATH)
- if cls.AUTH_FILE_PATH.is_file()
- and json.loads(cls.AUTH_FILE_PATH.read_bytes())
- else None
- )
-
- @classmethod
- async def clear(cls):
- await FileDealer(cls.AUTH_FILE_PATH).write_json(dict())
-
-
-async def get_host_ip(is_pub: bool):
- if is_pub:
- data = await request.get("https://ifconfig.me/ip")
- return data.text
-
- s = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(("8.8.8.8", 80))
- ip = s.getsockname()[0]
- return ip
- finally:
- if s:
- s.close()
-
-
-# FRONTEND_DIR = CONSOLE_DIR / "frontend"
-# FRONTEND_DIR.mkdir(parents=True, exist_ok=True)
-# __CONSOLE_RESOURCE_URL = (
-# "https://guc.imki.moe/kyomotoi/Project-ATRI-Console/main/archive/dist.zip"
-# )
-
-
-# async def init_resource():
-# log.info("控制台初始化中...")
-
-# try:
-# resp = await request.get(__CONSOLE_RESOURCE_URL)
-# except Exception:
-# log.error("控制台资源装载失败, 将无法访问管理界面")
-# return
-
-# file_path = CONSOLE_DIR / "dist.zip"
-# with open(file_path, "wb") as w:
-# w.write(resp.read())
-
-# with zipfile.ZipFile(file_path, "r") as zr:
-# zr.extractall(FRONTEND_DIR)
-
-# log.success("控制台初始化完成")