summaryrefslogtreecommitdiff
path: root/ATRI/plugins/status/data_source.py
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-05-04 16:07:18 +0800
committerKyomotoi <[email protected]>2022-05-04 16:07:18 +0800
commit7042d94213532191a0c72d34d7c85193184c079f (patch)
tree0ec3c458f828889f92719e7010bf0a031a627de7 /ATRI/plugins/status/data_source.py
parent18302364230e5f7898b3c2da56284e89d86d1947 (diff)
downloadATRI-7042d94213532191a0c72d34d7c85193184c079f.tar.gz
ATRI-7042d94213532191a0c72d34d7c85193184c079f.tar.bz2
ATRI-7042d94213532191a0c72d34d7c85193184c079f.zip
✨ 为前端页面打基础, 更新命令及API
Diffstat (limited to 'ATRI/plugins/status/data_source.py')
-rw-r--r--ATRI/plugins/status/data_source.py106
1 files changed, 92 insertions, 14 deletions
diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py
index 3595c0c..1658468 100644
--- a/ATRI/plugins/status/data_source.py
+++ b/ATRI/plugins/status/data_source.py
@@ -1,29 +1,41 @@
+import os
import time
+import json
import psutil
+import socket
+import string
+from pathlib import Path
+from random import sample
from datetime import datetime
from ATRI.service import Service
from ATRI.log import logger as log
from ATRI.rule import is_in_service
-from ATRI.exceptions import GetStatusError
+from ATRI.utils import request
+from ATRI.exceptions import GetStatusError, WriteError
+from .models import PlatfromRuntimeInfo, BotRuntimeInfo
+STATUS_DIR = Path(".") / "data" / "database" / "status"
+STATUS_DIR.mkdir(exist_ok=True)
+
_status_msg = """
> Status Overview
-[CPU: {cpu}%]
-[Memory: {mem}%]
-[Disk usage: {disk}%]
+[CPU: {p_cpu}% of {b_cpu}]
+[Memory: {p_mem}% of {b_mem}]
+[Disk usage: {p_disk}%]
[Net sent: {inteSENT}MB]
[Net recv: {inteRECV}MB]
-[Runtime: {up_time}]
+[Bot runtime: {bot_time}]
+[Platform runtime: {boot_time}]
{msg}
""".strip()
-class IsSurvive(Service):
+class Status(Service):
def __init__(self):
Service.__init__(self, "状态", "检查自身状态", rule=is_in_service("状态"))
@@ -32,8 +44,9 @@ class IsSurvive(Service):
return "I'm fine."
@staticmethod
- def get_status():
- log.info("开始检查资源消耗...")
+ def get_status(is_for_fn: bool = False) -> tuple:
+ data_p = Path(".") / "data"
+
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
@@ -41,12 +54,21 @@ class IsSurvive(Service):
inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
+ process = psutil.Process(os.getpid())
+ b_cpu = process.cpu_percent(interval=1)
+ b_mem = process.memory_percent(memtype="rss")
+
now = time.time()
boot = psutil.boot_time()
- up_time = str(
+ b = process.create_time()
+ boot_time = str(
datetime.utcfromtimestamp(now).replace(microsecond=0)
- datetime.utcfromtimestamp(boot).replace(microsecond=0)
)
+ bot_time = str(
+ datetime.utcfromtimestamp(now).replace(microsecond=0)
+ - datetime.utcfromtimestamp(b).replace(microsecond=0)
+ )
except GetStatusError:
raise GetStatusError("Failed to get status.")
@@ -64,17 +86,73 @@ class IsSurvive(Service):
msg = "咱感觉身体要被塞满了..."
is_ok = False
else:
- log.info("资源占用正常")
is_ok = True
+ if is_for_fn:
+ return (
+ PlatfromRuntimeInfo(
+ stat_msg=msg,
+ cpu_percent=str(cpu),
+ mem_percent=mem,
+ disk_percent=str(disk),
+ inte_send=str(inteSENT),
+ inte_recv=str(inteRECV),
+ boot_time=boot_time,
+ ).dict(),
+ BotRuntimeInfo(
+ cpu_percent=str(b_cpu),
+ mem_percent=str(b_mem),
+ bot_run_time=bot_time,
+ ).dict(),
+ )
+
msg0 = _status_msg.format(
- cpu=cpu,
- mem=mem,
- disk=disk,
+ p_cpu=cpu,
+ p_mem=mem,
+ p_disk=disk,
+ b_cpu=f"{b_cpu}%",
+ b_mem="%.1f%%" % b_mem,
inteSENT=inteSENT,
inteRECV=inteRECV,
- up_time=up_time,
+ bot_time=bot_time,
+ boot_time=boot_time,
msg=msg,
)
return msg0, is_ok
+
+ @staticmethod
+ async def get_host_ip(is_pub: bool):
+ if is_pub:
+ data = await request.get("https://ifconfig.me/ip")
+ return data.text
+
+ s = None
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.connect(('8.8.8.8', 80))
+ ip = s.getsockname()[0]
+ return ip
+ finally:
+ if s:
+ s.close()
+
+ @staticmethod
+ def get_random_str(k: int) -> str:
+ return "".join(sample(string.ascii_letters + string.digits, k))
+
+ @staticmethod
+ def get_auth_info() -> dict:
+ df = STATUS_DIR / "data.json"
+ if not df.is_file():
+ try:
+ with open(df, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ except WriteError:
+ raise WriteError("Writing file: " + str(df) + " failed!")
+
+ base_data: dict = json.loads(df.read_bytes())
+ data = base_data.get("data", None)
+ if not data:
+ return {"data": None}
+ return data