summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-07-19 11:54:58 +0800
committerKyomotoi <[email protected]>2022-07-19 11:54:58 +0800
commit871d6f0bc8693b1b1fb35760636a610931e57cf9 (patch)
tree492db9cefb60d30d2a7870eef5925e77cbefd07b
parent8b0111a6e23745f6c51e2037564a2c8972bf84e9 (diff)
downloadATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.tar.gz
ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.tar.bz2
ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.zip
✨ 新增插件: 谁是卷王
-rw-r--r--ATRI/plugins/anti_effort/__init__.py121
-rw-r--r--ATRI/plugins/anti_effort/data_source.py117
-rw-r--r--ATRI/plugins/anti_effort/models.py10
3 files changed, 248 insertions, 0 deletions
diff --git a/ATRI/plugins/anti_effort/__init__.py b/ATRI/plugins/anti_effort/__init__.py
new file mode 100644
index 0000000..febfdda
--- /dev/null
+++ b/ATRI/plugins/anti_effort/__init__.py
@@ -0,0 +1,121 @@
+from tabulate import tabulate
+from datetime import datetime
+
+from nonebot.matcher import Matcher
+from nonebot.params import CommandArg, ArgPlainText
+from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent
+
+from .data_source import AntiEffort
+
+
+_GET_URL_MSG = """请键入wakatime share embed URL:
+获取方法:
+ - 前往 wakatime.com/share/embed
+ - Format 选择 JSON
+ - Chart Type 选择 Coding Activity
+ - Date Range 选择 Last 7 Days
+ - 所需url就在下一栏 HTML 中的 url
+""".strip()
+
+
+add_user = AntiEffort().on_command("!我也要卷", "加入卷王统计榜")
+add_user_cmd = AntiEffort().cmd_as_group("join", "加入卷王统计榜")
+
+
+@add_user.handle()
+@add_user_cmd.handle()
+async def _add_user(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("waka_url", args)
+
+
+@add_user.got("waka_url", _GET_URL_MSG)
+async def _deal_add_user(
+ event: GroupMessageEvent, _url: str = ArgPlainText("waka_url")
+):
+ group_id = event.group_id
+ user_id = event.user_id
+
+ result = AntiEffort().add_user(group_id, user_id, (_url))
+ await add_user.finish(result)
+
+
+user_leave = AntiEffort().cmd_as_group("leave", "退出卷王统计榜")
+
+
+@user_leave.handle()
+async def _user_leave(event: GroupMessageEvent):
+ group_id = event.group_id
+ user_id = event.user_id
+
+ result = AntiEffort().del_user(group_id, user_id)
+ await user_leave.finish(result)
+
+
+check_rank_today = AntiEffort().on_command("今日卷王", "查看今日卷王榜")
+check_rank_today_cmd = AntiEffort().cmd_as_group("rank.today", "查看今日卷王榜")
+
+
+@check_rank_today.handle()
+@check_rank_today_cmd.handle()
+async def _check_rank_today(event: GroupMessageEvent):
+ await check_rank_today.send("别急!正在统计!")
+
+ group_id = event.group_id
+ u = await AntiEffort().update_data()
+ if u == 114514:
+ await check_rank_today.finish("贵群还没有人加入卷王统计榜!")
+
+ data = AntiEffort().get_data(group_id)
+ if not data:
+ await check_rank_today.finish("贵群还没有人加入卷王统计榜!")
+
+ data = sorted(data, key=lambda x: x["recent_count"], reverse=True)
+ table = [
+ [
+ f"{i + 1}",
+ f"{x['w_user_name']}",
+ f"{round(x['recent_count'], 2)}",
+ ]
+ for i, x in enumerate(data)
+ ]
+ table.insert(0, ["Rank", "Member", "Today"])
+ result = tabulate(table, tablefmt="plain")
+ now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S")
+ repo = f"《今日卷王》 单位: 小时\nDate: {now_time}\n{result}"
+ await check_rank_today.finish(repo)
+
+
+check_rank_recent_week = AntiEffort().on_command("周卷王", "查看近一周卷王榜")
+check_rank_recent_week_cmd = AntiEffort().cmd_as_group("rank.week", "查看近一周卷王榜")
+
+
+@check_rank_recent_week.handle()
+@check_rank_recent_week_cmd.handle()
+async def _check_rank_recent_week(event: GroupMessageEvent):
+ await check_rank_recent_week.send("别急!正在统计!")
+
+ group_id = event.group_id
+ u = await AntiEffort().update_data()
+ if u == 114514:
+ await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!")
+
+ data = AntiEffort().get_data(group_id)
+ if not data:
+ await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!")
+
+ data = sorted(data, key=lambda x: x["last_7_days_count"], reverse=True)
+ table = [
+ [
+ f"{i + 1}",
+ f"{x['w_user_name']}",
+ f"{round(x['last_7_days_count'], 2)}",
+ ]
+ for i, x in enumerate(data)
+ ]
+ table.insert(0, ["Rank", "Member", "Last 7 Days"])
+ result = tabulate(table, tablefmt="plain")
+ now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S")
+ repo = f"《近一周卷王》 单位: 小时\nDate: {now_time}\n{result}"
+ await check_rank_recent_week.finish(repo)
diff --git a/ATRI/plugins/anti_effort/data_source.py b/ATRI/plugins/anti_effort/data_source.py
new file mode 100644
index 0000000..f9a943f
--- /dev/null
+++ b/ATRI/plugins/anti_effort/data_source.py
@@ -0,0 +1,117 @@
+import json
+import re
+import os
+from pathlib import Path
+
+from ATRI.service import Service
+from ATRI.rule import is_in_service
+from ATRI.utils import request
+from ATRI.log import logger as log
+
+from .models import AntiEffortUserModel
+
+
+PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort"
+PLUGIN_DIR.mkdir(parents=True, exist_ok=True)
+
+
+class AntiEffort(Service):
+ def __init__(self):
+ Service.__init__(
+ self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae"
+ )
+
+ def add_user(self, group_id: int, user_id: int, waka_url: str) -> str:
+ patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json"
+ match = re.findall(patt, waka_url)
+ if not match:
+ return "哥,你提供的链接有问题啊"
+
+ w_user_name = match[0][0]
+ w_user_id = match[0][1]
+
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ if not file_path.exists():
+ with open(file_path, "w") as f:
+ f.write(json.dumps(list()))
+
+ data: list = json.loads(file_path.read_text())
+ data.append(
+ AntiEffortUserModel(
+ user_id=user_id,
+ w_user_name=w_user_name,
+ w_user_id=w_user_id,
+ waka_url=waka_url,
+ last_7_days_count=int(),
+ recent_count=int(),
+ ).dict()
+ )
+ with open(file_path, "w") as f:
+ f.write(json.dumps(data))
+
+ return "成功加入卷王统计榜!"
+
+ def del_user(self, group_id: int, user_id: int) -> str:
+ data = self.get_data(group_id)
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ if file_path.exists():
+ for i in data:
+ if i["user_id"] == user_id:
+ data.remove(i)
+ with open(PLUGIN_DIR / f"{group_id}.json", "w") as f:
+ f.write(json.dumps(data))
+ return "成功退出卷王统计榜!"
+
+ return "贵群还没有人加入卷王统计榜!"
+
+ def get_data(self, group_id: int) -> list:
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ if not file_path.exists():
+ return list()
+ return json.loads(file_path.read_text())
+
+ def update_user(self, group_id: int, user_id: int, update_map: dict):
+ data = self.get_data(group_id)
+ if data:
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ for i in data:
+ if i["user_id"] == user_id:
+ for k, v in update_map.items():
+ i[k] = v
+ with open(file_path, "w") as f:
+ f.write(json.dumps(data))
+
+ async def update_data(self):
+ files = os.listdir(PLUGIN_DIR)
+ if not files:
+ return 114514
+
+ groups = list()
+ for f in files:
+ group_id = int(f.split(".")[0])
+ groups.append(group_id)
+
+ for i in groups:
+ data = self.get_data(i)
+ if not data:
+ continue
+
+ for j in data:
+ try:
+ resp = await request.get(j["waka_url"])
+ except Exception:
+ log.error(f"获取卷王 {j['user_id']} 数据失败!")
+ continue
+
+ user_w_data = resp.json() # type: ignore
+ d = user_w_data["data"]
+ user_w_last_7_days_count = float()
+ user_w_recent_count = float(d[-1]["grand_total"]["decimal"])
+ for u in d:
+ user_w_last_7_days_count += float(u["grand_total"]["decimal"])
+
+ u_data = {
+ "last_7_days_count": user_w_last_7_days_count,
+ "recent_count": user_w_recent_count,
+ }
+ self.update_user(i, j["user_id"], u_data)
diff --git a/ATRI/plugins/anti_effort/models.py b/ATRI/plugins/anti_effort/models.py
new file mode 100644
index 0000000..928c4ec
--- /dev/null
+++ b/ATRI/plugins/anti_effort/models.py
@@ -0,0 +1,10 @@
+from pydantic import BaseModel
+
+
+class AntiEffortUserModel(BaseModel):
+ user_id: int
+ w_user_name: str
+ w_user_id: str
+ waka_url: str
+ last_7_days_count: float
+ recent_count: float