diff options
author | Kyomotoi <[email protected]> | 2022-07-19 11:54:58 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2022-07-19 11:54:58 +0800 |
commit | 871d6f0bc8693b1b1fb35760636a610931e57cf9 (patch) | |
tree | 492db9cefb60d30d2a7870eef5925e77cbefd07b | |
parent | 8b0111a6e23745f6c51e2037564a2c8972bf84e9 (diff) | |
download | ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.tar.gz ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.tar.bz2 ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.zip |
✨ 新增插件: 谁是卷王
-rw-r--r-- | ATRI/plugins/anti_effort/__init__.py | 121 | ||||
-rw-r--r-- | ATRI/plugins/anti_effort/data_source.py | 117 | ||||
-rw-r--r-- | ATRI/plugins/anti_effort/models.py | 10 |
3 files changed, 248 insertions, 0 deletions
diff --git a/ATRI/plugins/anti_effort/__init__.py b/ATRI/plugins/anti_effort/__init__.py new file mode 100644 index 0000000..febfdda --- /dev/null +++ b/ATRI/plugins/anti_effort/__init__.py @@ -0,0 +1,121 @@ +from tabulate import tabulate +from datetime import datetime + +from nonebot.matcher import Matcher +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent + +from .data_source import AntiEffort + + +_GET_URL_MSG = """请键入wakatime share embed URL: +获取方法: + - 前往 wakatime.com/share/embed + - Format 选择 JSON + - Chart Type 选择 Coding Activity + - Date Range 选择 Last 7 Days + - 所需url就在下一栏 HTML 中的 url +""".strip() + + +add_user = AntiEffort().on_command("!我也要卷", "加入卷王统计榜") +add_user_cmd = AntiEffort().cmd_as_group("join", "加入卷王统计榜") + + +@add_user.handle() +@add_user_cmd.handle() +async def _add_user(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("waka_url", args) + + +@add_user.got("waka_url", _GET_URL_MSG) +async def _deal_add_user( + event: GroupMessageEvent, _url: str = ArgPlainText("waka_url") +): + group_id = event.group_id + user_id = event.user_id + + result = AntiEffort().add_user(group_id, user_id, (_url)) + await add_user.finish(result) + + +user_leave = AntiEffort().cmd_as_group("leave", "退出卷王统计榜") + + +@user_leave.handle() +async def _user_leave(event: GroupMessageEvent): + group_id = event.group_id + user_id = event.user_id + + result = AntiEffort().del_user(group_id, user_id) + await user_leave.finish(result) + + +check_rank_today = AntiEffort().on_command("今日卷王", "查看今日卷王榜") +check_rank_today_cmd = AntiEffort().cmd_as_group("rank.today", "查看今日卷王榜") + + +@check_rank_today.handle() +@check_rank_today_cmd.handle() +async def _check_rank_today(event: GroupMessageEvent): + await check_rank_today.send("别急!正在统计!") + + group_id = event.group_id + u = await AntiEffort().update_data() + if u == 114514: + await check_rank_today.finish("贵群还没有人加入卷王统计榜!") + + data = AntiEffort().get_data(group_id) + if not data: + await check_rank_today.finish("贵群还没有人加入卷王统计榜!") + + data = sorted(data, key=lambda x: x["recent_count"], reverse=True) + table = [ + [ + f"{i + 1}", + f"{x['w_user_name']}", + f"{round(x['recent_count'], 2)}", + ] + for i, x in enumerate(data) + ] + table.insert(0, ["Rank", "Member", "Today"]) + result = tabulate(table, tablefmt="plain") + now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S") + repo = f"《今日卷王》 单位: 小时\nDate: {now_time}\n{result}" + await check_rank_today.finish(repo) + + +check_rank_recent_week = AntiEffort().on_command("周卷王", "查看近一周卷王榜") +check_rank_recent_week_cmd = AntiEffort().cmd_as_group("rank.week", "查看近一周卷王榜") + + +@check_rank_recent_week.handle() +@check_rank_recent_week_cmd.handle() +async def _check_rank_recent_week(event: GroupMessageEvent): + await check_rank_recent_week.send("别急!正在统计!") + + group_id = event.group_id + u = await AntiEffort().update_data() + if u == 114514: + await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!") + + data = AntiEffort().get_data(group_id) + if not data: + await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!") + + data = sorted(data, key=lambda x: x["last_7_days_count"], reverse=True) + table = [ + [ + f"{i + 1}", + f"{x['w_user_name']}", + f"{round(x['last_7_days_count'], 2)}", + ] + for i, x in enumerate(data) + ] + table.insert(0, ["Rank", "Member", "Last 7 Days"]) + result = tabulate(table, tablefmt="plain") + now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S") + repo = f"《近一周卷王》 单位: 小时\nDate: {now_time}\n{result}" + await check_rank_recent_week.finish(repo) diff --git a/ATRI/plugins/anti_effort/data_source.py b/ATRI/plugins/anti_effort/data_source.py new file mode 100644 index 0000000..f9a943f --- /dev/null +++ b/ATRI/plugins/anti_effort/data_source.py @@ -0,0 +1,117 @@ +import json +import re +import os +from pathlib import Path + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.utils import request +from ATRI.log import logger as log + +from .models import AntiEffortUserModel + + +PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort" +PLUGIN_DIR.mkdir(parents=True, exist_ok=True) + + +class AntiEffort(Service): + def __init__(self): + Service.__init__( + self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae" + ) + + def add_user(self, group_id: int, user_id: int, waka_url: str) -> str: + patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json" + match = re.findall(patt, waka_url) + if not match: + return "哥,你提供的链接有问题啊" + + w_user_name = match[0][0] + w_user_id = match[0][1] + + file_path = PLUGIN_DIR / f"{group_id}.json" + if not file_path.exists(): + with open(file_path, "w") as f: + f.write(json.dumps(list())) + + data: list = json.loads(file_path.read_text()) + data.append( + AntiEffortUserModel( + user_id=user_id, + w_user_name=w_user_name, + w_user_id=w_user_id, + waka_url=waka_url, + last_7_days_count=int(), + recent_count=int(), + ).dict() + ) + with open(file_path, "w") as f: + f.write(json.dumps(data)) + + return "成功加入卷王统计榜!" + + def del_user(self, group_id: int, user_id: int) -> str: + data = self.get_data(group_id) + file_path = PLUGIN_DIR / f"{group_id}.json" + if file_path.exists(): + for i in data: + if i["user_id"] == user_id: + data.remove(i) + with open(PLUGIN_DIR / f"{group_id}.json", "w") as f: + f.write(json.dumps(data)) + return "成功退出卷王统计榜!" + + return "贵群还没有人加入卷王统计榜!" + + def get_data(self, group_id: int) -> list: + file_path = PLUGIN_DIR / f"{group_id}.json" + if not file_path.exists(): + return list() + return json.loads(file_path.read_text()) + + def update_user(self, group_id: int, user_id: int, update_map: dict): + data = self.get_data(group_id) + if data: + file_path = PLUGIN_DIR / f"{group_id}.json" + for i in data: + if i["user_id"] == user_id: + for k, v in update_map.items(): + i[k] = v + with open(file_path, "w") as f: + f.write(json.dumps(data)) + + async def update_data(self): + files = os.listdir(PLUGIN_DIR) + if not files: + return 114514 + + groups = list() + for f in files: + group_id = int(f.split(".")[0]) + groups.append(group_id) + + for i in groups: + data = self.get_data(i) + if not data: + continue + + for j in data: + try: + resp = await request.get(j["waka_url"]) + except Exception: + log.error(f"获取卷王 {j['user_id']} 数据失败!") + continue + + user_w_data = resp.json() # type: ignore + d = user_w_data["data"] + user_w_last_7_days_count = float() + user_w_recent_count = float(d[-1]["grand_total"]["decimal"]) + for u in d: + user_w_last_7_days_count += float(u["grand_total"]["decimal"]) + + u_data = { + "last_7_days_count": user_w_last_7_days_count, + "recent_count": user_w_recent_count, + } + self.update_user(i, j["user_id"], u_data) diff --git a/ATRI/plugins/anti_effort/models.py b/ATRI/plugins/anti_effort/models.py new file mode 100644 index 0000000..928c4ec --- /dev/null +++ b/ATRI/plugins/anti_effort/models.py @@ -0,0 +1,10 @@ +from pydantic import BaseModel + + +class AntiEffortUserModel(BaseModel): + user_id: int + w_user_name: str + w_user_id: str + waka_url: str + last_7_days_count: float + recent_count: float |