summaryrefslogtreecommitdiff
path: root/ATRI/plugins/anti_effort/data_source.py
blob: f9a943f6b430a9f78136f3e8ab23935fc8d51f7e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
import re
import os
from pathlib import Path

from ATRI.service import Service
from ATRI.rule import is_in_service
from ATRI.utils import request
from ATRI.log import logger as log

from .models import AntiEffortUserModel


PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort"
PLUGIN_DIR.mkdir(parents=True, exist_ok=True)


class AntiEffort(Service):
    def __init__(self):
        Service.__init__(
            self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae"
        )

    def add_user(self, group_id: int, user_id: int, waka_url: str) -> str:
        patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json"
        match = re.findall(patt, waka_url)
        if not match:
            return "哥,你提供的链接有问题啊"

        w_user_name = match[0][0]
        w_user_id = match[0][1]

        file_path = PLUGIN_DIR / f"{group_id}.json"
        if not file_path.exists():
            with open(file_path, "w") as f:
                f.write(json.dumps(list()))

        data: list = json.loads(file_path.read_text())
        data.append(
            AntiEffortUserModel(
                user_id=user_id,
                w_user_name=w_user_name,
                w_user_id=w_user_id,
                waka_url=waka_url,
                last_7_days_count=int(),
                recent_count=int(),
            ).dict()
        )
        with open(file_path, "w") as f:
            f.write(json.dumps(data))

        return "成功加入卷王统计榜!"

    def del_user(self, group_id: int, user_id: int) -> str:
        data = self.get_data(group_id)
        file_path = PLUGIN_DIR / f"{group_id}.json"
        if file_path.exists():
            for i in data:
                if i["user_id"] == user_id:
                    data.remove(i)
                    with open(PLUGIN_DIR / f"{group_id}.json", "w") as f:
                        f.write(json.dumps(data))
                    return "成功退出卷王统计榜!"

        return "贵群还没有人加入卷王统计榜!"

    def get_data(self, group_id: int) -> list:
        file_path = PLUGIN_DIR / f"{group_id}.json"
        if not file_path.exists():
            return list()
        return json.loads(file_path.read_text())

    def update_user(self, group_id: int, user_id: int, update_map: dict):
        data = self.get_data(group_id)
        if data:
            file_path = PLUGIN_DIR / f"{group_id}.json"
            for i in data:
                if i["user_id"] == user_id:
                    for k, v in update_map.items():
                        i[k] = v
                    with open(file_path, "w") as f:
                        f.write(json.dumps(data))

    async def update_data(self):
        files = os.listdir(PLUGIN_DIR)
        if not files:
            return 114514

        groups = list()
        for f in files:
            group_id = int(f.split(".")[0])
            groups.append(group_id)

        for i in groups:
            data = self.get_data(i)
            if not data:
                continue

            for j in data:
                try:
                    resp = await request.get(j["waka_url"])
                except Exception:
                    log.error(f"获取卷王 {j['user_id']} 数据失败!")
                    continue

                user_w_data = resp.json()  # type: ignore
                d = user_w_data["data"]
                user_w_last_7_days_count = float()
                user_w_recent_count = float(d[-1]["grand_total"]["decimal"])
                for u in d:
                    user_w_last_7_days_count += float(u["grand_total"]["decimal"])

                u_data = {
                    "last_7_days_count": user_w_last_7_days_count,
                    "recent_count": user_w_recent_count,
                }
                self.update_user(i, j["user_id"], u_data)