summaryrefslogtreecommitdiff
path: root/ATRI/plugins/anti_effort/data_source.py
blob: 0a0ef02e44f7dd3a43274f69c963c40c552b15ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
import re
import os
from pathlib import Path

from ATRI.service import Service
from ATRI.rule import is_in_service
from ATRI.utils import request
from ATRI.log import logger as log

from .models import AntiEffortUserModel


PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort"
PLUGIN_DIR.mkdir(parents=True, exist_ok=True)


class AntiEffort(Service):
    def __init__(self):
        Service.__init__(
            self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae"
        )

    def add_user(
        self, group_id: int, user_id: int, user_nickname: str, waka_url: str
    ) -> str:
        patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json"
        match = re.findall(patt, waka_url)
        if not match:
            return "哥,你提供的链接有问题啊"

        w_user_id = match[0][1]

        file_path = PLUGIN_DIR / f"{group_id}.json"
        if not file_path.exists():
            with open(file_path, "w") as f:
                f.write(json.dumps(list()))

        data: list = json.loads(file_path.read_text())
        for i in data:
            if i["user_id"] == user_id:
                return "你已经在卷王统计榜力!"

        data.append(
            AntiEffortUserModel(
                user_id=user_id,
                user_nickname=user_nickname,
                w_user_id=w_user_id,
                waka_url=waka_url,
                last_7_days_count=int(),
                recent_count=int(),
            ).dict()
        )
        with open(file_path, "w") as f:
            f.write(json.dumps(data))

        return "成功加入卷王统计榜!"

    def del_user(self, group_id: int, user_id: int) -> str:
        data = self.get_data(group_id)
        file_path = PLUGIN_DIR / f"{group_id}.json"
        if file_path.exists():
            for i in data:
                if i["user_id"] == user_id:
                    data.remove(i)
                    with open(PLUGIN_DIR / f"{group_id}.json", "w") as f:
                        f.write(json.dumps(data))
                    return "成功退出卷王统计榜!"

        return "你未加入卷王统计榜捏"

    def get_data(self, group_id: int) -> list:
        file_path = PLUGIN_DIR / f"{group_id}.json"
        if not file_path.exists():
            return list()
        return json.loads(file_path.read_text())

    def update_user(self, group_id: int, user_id: int, update_map: dict):
        data = self.get_data(group_id)
        if data:
            file_path = PLUGIN_DIR / f"{group_id}.json"
            for i in data:
                if i["user_id"] == user_id:
                    for k, v in update_map.items():
                        i[k] = v
                    with open(file_path, "w") as f:
                        f.write(json.dumps(data))

    async def update_data(self):
        files = os.listdir(PLUGIN_DIR)
        if not files:
            return 114514

        groups = list()
        for f in files:
            group_id = int(f.split(".")[0])
            groups.append(group_id)

        for i in groups:
            data = self.get_data(i)
            if not data:
                continue

            for j in data:
                try:
                    resp = await request.get(j["waka_url"])
                    user_w_data = resp.json()  # type: ignore
                except Exception:
                    log.error(f"获取卷王 {j['user_id']} 数据失败!")
                    continue

                d = user_w_data["data"]
                user_w_last_7_days_count = float()
                user_w_recent_count = float(d[-1]["grand_total"]["decimal"])
                for u in d:
                    user_w_last_7_days_count += float(u["grand_total"]["decimal"])

                u_data = {
                    "last_7_days_count": user_w_last_7_days_count,
                    "recent_count": user_w_recent_count,
                }
                self.update_user(i, j["user_id"], u_data)