diff options
| author | Kyomotoi <0w0@imki.moe> | 2022-07-19 11:54:58 +0800 | 
|---|---|---|
| committer | Kyomotoi <0w0@imki.moe> | 2022-07-19 11:54:58 +0800 | 
| commit | 871d6f0bc8693b1b1fb35760636a610931e57cf9 (patch) | |
| tree | 492db9cefb60d30d2a7870eef5925e77cbefd07b /ATRI | |
| parent | 8b0111a6e23745f6c51e2037564a2c8972bf84e9 (diff) | |
| download | ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.tar.gz ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.tar.bz2 ATRI-871d6f0bc8693b1b1fb35760636a610931e57cf9.zip | |
✨ 新增插件: 谁是卷王
Diffstat (limited to 'ATRI')
| -rw-r--r-- | ATRI/plugins/anti_effort/__init__.py | 121 | ||||
| -rw-r--r-- | ATRI/plugins/anti_effort/data_source.py | 117 | ||||
| -rw-r--r-- | ATRI/plugins/anti_effort/models.py | 10 | 
3 files changed, 248 insertions, 0 deletions
| diff --git a/ATRI/plugins/anti_effort/__init__.py b/ATRI/plugins/anti_effort/__init__.py new file mode 100644 index 0000000..febfdda --- /dev/null +++ b/ATRI/plugins/anti_effort/__init__.py @@ -0,0 +1,121 @@ +from tabulate import tabulate +from datetime import datetime + +from nonebot.matcher import Matcher +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent + +from .data_source import AntiEffort + + +_GET_URL_MSG = """请键入wakatime share embed URL: +获取方法: +  - 前往 wakatime.com/share/embed +  - Format 选择 JSON +  - Chart Type 选择 Coding Activity +  - Date Range 选择 Last 7 Days +  - 所需url就在下一栏 HTML 中的 url +""".strip() + + +add_user = AntiEffort().on_command("!我也要卷", "加入卷王统计榜") +add_user_cmd = AntiEffort().cmd_as_group("join", "加入卷王统计榜") + + +@add_user.handle() +@add_user_cmd.handle() +async def _add_user(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("waka_url", args) + + +@add_user.got("waka_url", _GET_URL_MSG) +async def _deal_add_user( +    event: GroupMessageEvent, _url: str = ArgPlainText("waka_url") +): +    group_id = event.group_id +    user_id = event.user_id + +    result = AntiEffort().add_user(group_id, user_id, (_url)) +    await add_user.finish(result) + + +user_leave = AntiEffort().cmd_as_group("leave", "退出卷王统计榜") + + +@user_leave.handle() +async def _user_leave(event: GroupMessageEvent): +    group_id = event.group_id +    user_id = event.user_id + +    result = AntiEffort().del_user(group_id, user_id) +    await user_leave.finish(result) + + +check_rank_today = AntiEffort().on_command("今日卷王", "查看今日卷王榜") +check_rank_today_cmd = AntiEffort().cmd_as_group("rank.today", "查看今日卷王榜") + + +@check_rank_today.handle() +@check_rank_today_cmd.handle() +async def _check_rank_today(event: GroupMessageEvent): +    await check_rank_today.send("别急!正在统计!") + +    group_id = event.group_id +    u = await AntiEffort().update_data() +    if u == 114514: +        await check_rank_today.finish("贵群还没有人加入卷王统计榜!") + +    data = AntiEffort().get_data(group_id) +    if not data: +        await check_rank_today.finish("贵群还没有人加入卷王统计榜!") + +    data = sorted(data, key=lambda x: x["recent_count"], reverse=True) +    table = [ +        [ +            f"{i + 1}", +            f"{x['w_user_name']}", +            f"{round(x['recent_count'], 2)}", +        ] +        for i, x in enumerate(data) +    ] +    table.insert(0, ["Rank", "Member", "Today"]) +    result = tabulate(table, tablefmt="plain") +    now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S") +    repo = f"《今日卷王》 单位: 小时\nDate: {now_time}\n{result}" +    await check_rank_today.finish(repo) + + +check_rank_recent_week = AntiEffort().on_command("周卷王", "查看近一周卷王榜") +check_rank_recent_week_cmd = AntiEffort().cmd_as_group("rank.week", "查看近一周卷王榜") + + +@check_rank_recent_week.handle() +@check_rank_recent_week_cmd.handle() +async def _check_rank_recent_week(event: GroupMessageEvent): +    await check_rank_recent_week.send("别急!正在统计!") + +    group_id = event.group_id +    u = await AntiEffort().update_data() +    if u == 114514: +        await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!") + +    data = AntiEffort().get_data(group_id) +    if not data: +        await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!") + +    data = sorted(data, key=lambda x: x["last_7_days_count"], reverse=True) +    table = [ +        [ +            f"{i + 1}", +            f"{x['w_user_name']}", +            f"{round(x['last_7_days_count'], 2)}", +        ] +        for i, x in enumerate(data) +    ] +    table.insert(0, ["Rank", "Member", "Last 7 Days"]) +    result = tabulate(table, tablefmt="plain") +    now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S") +    repo = f"《近一周卷王》 单位: 小时\nDate: {now_time}\n{result}" +    await check_rank_recent_week.finish(repo) diff --git a/ATRI/plugins/anti_effort/data_source.py b/ATRI/plugins/anti_effort/data_source.py new file mode 100644 index 0000000..f9a943f --- /dev/null +++ b/ATRI/plugins/anti_effort/data_source.py @@ -0,0 +1,117 @@ +import json +import re +import os +from pathlib import Path + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.utils import request +from ATRI.log import logger as log + +from .models import AntiEffortUserModel + + +PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort" +PLUGIN_DIR.mkdir(parents=True, exist_ok=True) + + +class AntiEffort(Service): +    def __init__(self): +        Service.__init__( +            self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae" +        ) + +    def add_user(self, group_id: int, user_id: int, waka_url: str) -> str: +        patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json" +        match = re.findall(patt, waka_url) +        if not match: +            return "哥,你提供的链接有问题啊" + +        w_user_name = match[0][0] +        w_user_id = match[0][1] + +        file_path = PLUGIN_DIR / f"{group_id}.json" +        if not file_path.exists(): +            with open(file_path, "w") as f: +                f.write(json.dumps(list())) + +        data: list = json.loads(file_path.read_text()) +        data.append( +            AntiEffortUserModel( +                user_id=user_id, +                w_user_name=w_user_name, +                w_user_id=w_user_id, +                waka_url=waka_url, +                last_7_days_count=int(), +                recent_count=int(), +            ).dict() +        ) +        with open(file_path, "w") as f: +            f.write(json.dumps(data)) + +        return "成功加入卷王统计榜!" + +    def del_user(self, group_id: int, user_id: int) -> str: +        data = self.get_data(group_id) +        file_path = PLUGIN_DIR / f"{group_id}.json" +        if file_path.exists(): +            for i in data: +                if i["user_id"] == user_id: +                    data.remove(i) +                    with open(PLUGIN_DIR / f"{group_id}.json", "w") as f: +                        f.write(json.dumps(data)) +                    return "成功退出卷王统计榜!" + +        return "贵群还没有人加入卷王统计榜!" + +    def get_data(self, group_id: int) -> list: +        file_path = PLUGIN_DIR / f"{group_id}.json" +        if not file_path.exists(): +            return list() +        return json.loads(file_path.read_text()) + +    def update_user(self, group_id: int, user_id: int, update_map: dict): +        data = self.get_data(group_id) +        if data: +            file_path = PLUGIN_DIR / f"{group_id}.json" +            for i in data: +                if i["user_id"] == user_id: +                    for k, v in update_map.items(): +                        i[k] = v +                    with open(file_path, "w") as f: +                        f.write(json.dumps(data)) + +    async def update_data(self): +        files = os.listdir(PLUGIN_DIR) +        if not files: +            return 114514 + +        groups = list() +        for f in files: +            group_id = int(f.split(".")[0]) +            groups.append(group_id) + +        for i in groups: +            data = self.get_data(i) +            if not data: +                continue + +            for j in data: +                try: +                    resp = await request.get(j["waka_url"]) +                except Exception: +                    log.error(f"获取卷王 {j['user_id']} 数据失败!") +                    continue + +                user_w_data = resp.json()  # type: ignore +                d = user_w_data["data"] +                user_w_last_7_days_count = float() +                user_w_recent_count = float(d[-1]["grand_total"]["decimal"]) +                for u in d: +                    user_w_last_7_days_count += float(u["grand_total"]["decimal"]) + +                u_data = { +                    "last_7_days_count": user_w_last_7_days_count, +                    "recent_count": user_w_recent_count, +                } +                self.update_user(i, j["user_id"], u_data) diff --git a/ATRI/plugins/anti_effort/models.py b/ATRI/plugins/anti_effort/models.py new file mode 100644 index 0000000..928c4ec --- /dev/null +++ b/ATRI/plugins/anti_effort/models.py @@ -0,0 +1,10 @@ +from pydantic import BaseModel + + +class AntiEffortUserModel(BaseModel): +    user_id: int +    w_user_name: str +    w_user_id: str +    waka_url: str +    last_7_days_count: float +    recent_count: float | 
