blob: c49570bf9d50f3905bef920bb1c20d5d4f00c673 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
import time
import psutil
from datetime import datetime
from nonebot.adapters.cqhttp import Bot, MessageEvent
from ATRI.log import logger as log
from ATRI.service import Service as sv
from ATRI.rule import is_in_service
from ATRI.exceptions import GetStatusError
from ATRI.utils.apscheduler import scheduler
from ATRI.config import BotSelfConfig
__doc__ = """
测试机器人状态
权限组:所有人
用法:
/ping
"""
ping = sv.on_command(cmd="/ping", docs="测试机器人", rule=is_in_service("ping"))
@ping.handle()
async def _ping(bot: Bot, event: MessageEvent) -> None:
await ping.finish("I'm fine.")
__doc__ = """
检查机器人性能
权限组:所有人
用法:
/status
"""
status = sv.on_command(cmd="/status", docs=__doc__, rule=is_in_service("status"))
@status.handle()
async def _status(bot: Bot, event: MessageEvent) -> None:
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
now = time.time()
boot = psutil.boot_time()
up_time = str(
datetime.utcfromtimestamp(now).replace(microsecond=0)
- datetime.utcfromtimestamp(boot).replace(microsecond=0)
)
except GetStatusError:
raise GetStatusError("Failed to get status.")
msg = "アトリは、高性能ですから!"
if cpu > 90: # type: ignore
msg = "咱感觉有些头晕..."
if mem > 90:
msg = "咱感觉有点头晕并且有点累..."
elif mem > 90:
msg = "咱感觉有点累..."
elif disk > 90:
msg = "咱感觉身体要被塞满了..."
msg0 = (
"Self status:\n"
f"* CPU: {cpu}%\n"
f"* MEM: {mem}%\n"
f"* DISK: {disk}%\n"
f"* netSENT: {inteSENT}MB\n"
f"* netRECV: {inteRECV}MB\n"
f"* Runtime: {up_time}\n"
) + msg
await status.finish(msg0)
@scheduler.scheduled_job("interval", minutes=5, misfire_grace_time=10)
async def _():
log.info("开始自检")
try:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
except GetStatusError:
raise GetStatusError("Failed to get status.")
msg = str()
if cpu > 90: # type: ignore
msg = "咱感觉有些头晕..."
if mem > 90:
msg = "咱感觉有点头晕并且有点累..."
elif mem > 90:
msg = "咱感觉有点累..."
elif disk > 90:
msg = "咱感觉身体要被塞满了..."
else:
log.info("运作正常")
return
msg0 = (
"Self status:\n"
f"* CPU: {cpu}%\n"
f"* MEM: {mem}%\n"
f"* DISK: {disk}%\n"
f"* netSENT: {inteSENT}MB\n"
f"* netRECV: {inteRECV}MB\n"
) + msg
for sup in BotSelfConfig.superusers:
await sv.NetworkPost.send_private_msg(user_id=sup, message=msg0)
|