summaryrefslogtreecommitdiff
path: root/ATRI/plugins/anti_effort/data_source.py
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/anti_effort/data_source.py')
-rw-r--r--ATRI/plugins/anti_effort/data_source.py117
1 files changed, 117 insertions, 0 deletions
diff --git a/ATRI/plugins/anti_effort/data_source.py b/ATRI/plugins/anti_effort/data_source.py
new file mode 100644
index 0000000..f9a943f
--- /dev/null
+++ b/ATRI/plugins/anti_effort/data_source.py
@@ -0,0 +1,117 @@
+import json
+import re
+import os
+from pathlib import Path
+
+from ATRI.service import Service
+from ATRI.rule import is_in_service
+from ATRI.utils import request
+from ATRI.log import logger as log
+
+from .models import AntiEffortUserModel
+
+
+PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort"
+PLUGIN_DIR.mkdir(parents=True, exist_ok=True)
+
+
+class AntiEffort(Service):
+ def __init__(self):
+ Service.__init__(
+ self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae"
+ )
+
+ def add_user(self, group_id: int, user_id: int, waka_url: str) -> str:
+ patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json"
+ match = re.findall(patt, waka_url)
+ if not match:
+ return "哥,你提供的链接有问题啊"
+
+ w_user_name = match[0][0]
+ w_user_id = match[0][1]
+
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ if not file_path.exists():
+ with open(file_path, "w") as f:
+ f.write(json.dumps(list()))
+
+ data: list = json.loads(file_path.read_text())
+ data.append(
+ AntiEffortUserModel(
+ user_id=user_id,
+ w_user_name=w_user_name,
+ w_user_id=w_user_id,
+ waka_url=waka_url,
+ last_7_days_count=int(),
+ recent_count=int(),
+ ).dict()
+ )
+ with open(file_path, "w") as f:
+ f.write(json.dumps(data))
+
+ return "成功加入卷王统计榜!"
+
+ def del_user(self, group_id: int, user_id: int) -> str:
+ data = self.get_data(group_id)
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ if file_path.exists():
+ for i in data:
+ if i["user_id"] == user_id:
+ data.remove(i)
+ with open(PLUGIN_DIR / f"{group_id}.json", "w") as f:
+ f.write(json.dumps(data))
+ return "成功退出卷王统计榜!"
+
+ return "贵群还没有人加入卷王统计榜!"
+
+ def get_data(self, group_id: int) -> list:
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ if not file_path.exists():
+ return list()
+ return json.loads(file_path.read_text())
+
+ def update_user(self, group_id: int, user_id: int, update_map: dict):
+ data = self.get_data(group_id)
+ if data:
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ for i in data:
+ if i["user_id"] == user_id:
+ for k, v in update_map.items():
+ i[k] = v
+ with open(file_path, "w") as f:
+ f.write(json.dumps(data))
+
+ async def update_data(self):
+ files = os.listdir(PLUGIN_DIR)
+ if not files:
+ return 114514
+
+ groups = list()
+ for f in files:
+ group_id = int(f.split(".")[0])
+ groups.append(group_id)
+
+ for i in groups:
+ data = self.get_data(i)
+ if not data:
+ continue
+
+ for j in data:
+ try:
+ resp = await request.get(j["waka_url"])
+ except Exception:
+ log.error(f"获取卷王 {j['user_id']} 数据失败!")
+ continue
+
+ user_w_data = resp.json() # type: ignore
+ d = user_w_data["data"]
+ user_w_last_7_days_count = float()
+ user_w_recent_count = float(d[-1]["grand_total"]["decimal"])
+ for u in d:
+ user_w_last_7_days_count += float(u["grand_total"]["decimal"])
+
+ u_data = {
+ "last_7_days_count": user_w_last_7_days_count,
+ "recent_count": user_w_recent_count,
+ }
+ self.update_user(i, j["user_id"], u_data)