summaryrefslogtreecommitdiff
path: root/ATRI/plugins/console/driver/data_source.py
blob: 12fb938ef3d1e15bfc365543c254eb16cb9ff02f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import os
import json
import psutil
from pathlib import Path
from datetime import datetime

from ATRI.exceptions import GetStatusError

from ATRI.utils import FileDealer
from ATRI.service import ServiceTools, SERVICES_DIR

from . import models


def get_process_info() -> dict:
    try:
        platform_cpu = psutil.cpu_percent(interval=1)
        platform_mem = psutil.virtual_memory().percent
        platform_disk = psutil.disk_usage("/").percent
        platform_net_sent = str(psutil.net_io_counters().bytes_sent / 1000000)
        platform_net_recv = str(psutil.net_io_counters().bytes_recv / 1000000)

        process = psutil.Process(os.getpid())
        bot_cpu = str(process.cpu_percent(interval=1))
        bot_mem = str(process.memory_percent(memtype="rss"))

        now_time = datetime.now().timestamp()
        _boot_time = psutil.boot_time()
        _bot_run_time = process.create_time()
        boot_time = str(
            datetime.utcfromtimestamp(now_time).replace(microsecond=0)
            - datetime.utcfromtimestamp(_boot_time).replace(microsecond=0)
        )
        bot_run_time = str(
            datetime.utcfromtimestamp(now_time).replace(microsecond=0)
            - datetime.utcfromtimestamp(_bot_run_time).replace(microsecond=0)
        )
    except Exception:
        raise GetStatusError("获取实例运行信息失败")

    stat_msg = "アトリは、高性能ですから!"
    if platform_cpu > 90:
        stat_msg = "咱感觉有些头晕..."
        if platform_mem > 90:
            stat_msg = "咱感觉有点头晕并且有点累..."
    elif platform_mem > 90:
        stat_msg = "咱感觉有点累..."
    elif platform_disk > 90:
        stat_msg = "咱感觉身体要被塞满了..."

    platform_cpu = str(platform_cpu)
    platform_mem = str(platform_mem)
    platform_disk = str(platform_disk)

    return models.RuntimeInfo(
        status_message=stat_msg,
        platform_info=models.PlatformRuntimeInfo(
            cpu_percent=platform_cpu,
            mem_percent=platform_mem,
            disk_percent=platform_disk,
            net_sent=platform_net_sent,
            net_recv=platform_net_recv,
            boot_time=boot_time,
        ),
        bot_info=models.BotRuntimeInfo(
            cpu_percent=bot_cpu, mem_percent=bot_mem, run_time=bot_run_time
        ),
    ).dict()


def get_service_list() -> dict:
    result = dict()

    files = os.listdir(SERVICES_DIR)
    for file in files:
        # Thank you, MacOS
        if file == ".DS_Store":
            continue

        service_path = SERVICES_DIR / file
        data = models.ServiceInfo.parse_file(service_path)
        result[data.service] = data.dict()

    return result


def edit_service(
    service: str, global_enabled: int, enabled: bool, user_id: str, group_id: str
):
    data = ServiceTools.load_service(service)

    if global_enabled != 2 and global_enabled:
        data.enabled = bool(global_enabled)
    else:
        data.enabled = False
    if user_id or group_id:
        if enabled:
            if user_id not in data.disable_user:
                return {"detail": "用户不存在于禁用名单"}
            else:
                data.disable_user.remove(user_id)

            if group_id not in data.disable_group:
                return {"detail": "群不存在于禁用名单"}
            else:
                data.disable_group.remove(group_id)
        else:
            if user_id not in data.disable_user:
                data.disable_user.append(user_id)
            else:
                return {"detail": "用户已存在于禁用名单"}

            if group_id not in data.disable_group:
                data.disable_group.append(group_id)
            else:
                return {"detail": "群已存在于禁用名单"}

    ServiceTools.save_service(data.dict(), service)

    return {"detail": "操作完成~"}


def get_block_list() -> models.BlockInfo:
    file_dir = Path(".") / "data" / "plugins" / "manege"
    path = file_dir / "block_user.json"
    user_data = json.loads(path.read_bytes())

    path = file_dir / "block_group.json"
    group_data = json.loads(path.read_bytes())

    return models.BlockInfo(user=user_data, group=group_data)


async def edit_block_list(enabled: bool, user_id: str, group_id: str):
    data = get_block_list()
    now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    if enabled:
        if user_id:
            if user_id in data.user:
                return {"detail": "用户已存在于黑名单"}
            else:
                data.user[user_id] = now_time
        if group_id:
            if group_id in data.group:
                return {"detail": "群已存在于黑名单"}
            else:
                data.group[group_id] = now_time
    else:
        if user_id:
            if user_id in data.user:
                del data.user[user_id]
            else:
                return {"detail": "用户不存在于黑名单"}
        if group_id:
            if group_id in data.group:
                del data.group[group_id]
            else:
                return {"detail": "群不存在于黑名单"}

    file_dir = Path(".") / "data" / "plugins" / "manege"
    path = file_dir / "block_user.json"
    await FileDealer(path).write(json.dumps(data.user))

    path = file_dir / "block_group.json"
    await FileDealer(path).write(json.dumps(data.group))

    return {"detail": "操作完成~"}