diff options
author | Kyomotoi <[email protected]> | 2022-07-22 19:44:04 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2022-07-22 19:44:04 +0800 |
commit | 3f4194c94a51449ac45e3ad47b6e34857ea400df (patch) | |
tree | 0e20620b9f8c48a717faba6062accd4a831ab1ea | |
parent | d592237df00fb5516cd281940a4dc3a9620fb556 (diff) | |
download | ATRI-3f4194c94a51449ac45e3ad47b6e34857ea400df.tar.gz ATRI-3f4194c94a51449ac45e3ad47b6e34857ea400df.tar.bz2 ATRI-3f4194c94a51449ac45e3ad47b6e34857ea400df.zip |
✨🎨 完善插件, 优化代码
-rw-r--r-- | ATRI/plugins/anti_effort/__init__.py | 240 | ||||
-rw-r--r-- | ATRI/plugins/anti_effort/data_source.py | 216 | ||||
-rw-r--r-- | ATRI/plugins/anti_effort/models.py | 5 |
3 files changed, 371 insertions, 90 deletions
diff --git a/ATRI/plugins/anti_effort/__init__.py b/ATRI/plugins/anti_effort/__init__.py index 48f6ec3..c08dd3b 100644 --- a/ATRI/plugins/anti_effort/__init__.py +++ b/ATRI/plugins/anti_effort/__init__.py @@ -1,12 +1,25 @@ -from tabulate import tabulate -from datetime import datetime +import os +import re +import json +from random import choice -from nonebot.matcher import Matcher -from nonebot.params import CommandArg, ArgPlainText -from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent +from nonebot import get_bot +from nonebot.params import ArgPlainText +from nonebot.permission import SUPERUSER +from nonebot.adapters.onebot.v11 import ( + Message, + MessageEvent, + MessageSegment, + GroupMessageEvent, +) +from nonebot.adapters.onebot.v11.helpers import Cooldown -from .data_source import AntiEffort +from ATRI.utils.apscheduler import scheduler +from .data_source import AntiEffort, PLUGIN_DIR + + +_lmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "呜呜...别急"]) _GET_URL_MSG = """请键入wakatime share embed URL: 获取方法: @@ -22,32 +35,81 @@ add_user = AntiEffort().on_command("!我也要卷", "加入卷王统计榜") add_user_cmd = AntiEffort().cmd_as_group("join", "加入卷王统计榜") -@add_user.got("waka_url", _GET_URL_MSG) -@add_user_cmd.got("waka_url", _GET_URL_MSG) +@add_user.got( + "waka_url", + _GET_URL_MSG, +) +@add_user_cmd.got( + "waka_url", + _GET_URL_MSG, +) @add_user.got("rank_nickname", "如何在排行榜中称呼你捏") @add_user_cmd.got("rank_nickname", "如何在排行榜中称呼你捏") +@add_user.got("to_global", "你希望加入公共排行榜吗?(y/n)", [Cooldown(60, prompt=_lmt_notice)]) +@add_user_cmd.got("to_global", "你希望加入公共排行榜吗?(y/n)", [Cooldown(60, prompt=_lmt_notice)]) async def _deal_add_user( event: GroupMessageEvent, - _url: str = ArgPlainText("waka_url"), + url: str = ArgPlainText("waka_url"), user_nickname: str = ArgPlainText("rank_nickname"), + to_global: str = ArgPlainText("to_global"), ): group_id = event.group_id user_id = event.user_id + aititude = ["y", "Y", "是", "希望", "同意"] + if to_global in aititude: + await AntiEffort().add_user(int(), user_id, user_nickname, url) - result = AntiEffort().add_user(group_id, user_id, user_nickname, _url) + result = await AntiEffort().add_user(group_id, user_id, user_nickname, url) await add_user.finish(result) +join_global_rank = AntiEffort().on_command("!参加公共卷王", "加入公共卷王榜") +join_global_rank_cmd = AntiEffort().cmd_as_group("join.global", "加入公共卷王榜") + + +@join_global_rank.handle() +@join_global_rank_cmd.handle() +async def _join_global_rank(event: GroupMessageEvent): + group_id = event.group_id + user_id = event.user_id + + raw_data = AntiEffort().get_data(group_id) + if raw_data: + data = raw_data["data"] + for i in data: + if i["user_id"] == user_id: + user_nickname = i["user_nickname"] + url = i["waka_url"] + await AntiEffort().add_user(int(), user_id, user_nickname, url) + await join_global_rank.finish("完成~!") + + +@join_global_rank.got("waka_url", _GET_URL_MSG) +@join_global_rank_cmd.got("waka_url", _GET_URL_MSG) +@join_global_rank.got("rank_nickname", "如何在排行榜中称呼你捏") +@join_global_rank_cmd.got("rank_nickname", "如何在排行榜中称呼你捏") +async def _( + event: GroupMessageEvent, + url: str = ArgPlainText("waka_url"), + user_nickname: str = ArgPlainText("rank_nickname"), +): + user_id = event.user_id + + result = await AntiEffort().add_user(int(), user_id, user_nickname, url) + await join_global_rank.finish(result) + + user_leave = AntiEffort().on_command("!我不卷了", "退出卷王统计榜") user_leave_cmd = AntiEffort().cmd_as_group("leave", "退出卷王统计榜") -@user_leave.handle() -@user_leave_cmd.handle() +@user_leave.handle([Cooldown(60, prompt=_lmt_notice)]) +@user_leave_cmd.handle([Cooldown(60, prompt=_lmt_notice)]) async def _user_leave(event: GroupMessageEvent): group_id = event.group_id user_id = event.user_id + AntiEffort().del_user(int(), user_id) result = AntiEffort().del_user(group_id, user_id) await user_leave.finish(result) @@ -56,65 +118,133 @@ check_rank_today = AntiEffort().on_command("今日卷王", "查看今日卷王� check_rank_today_cmd = AntiEffort().cmd_as_group("rank.today", "查看今日卷王榜") -@check_rank_today.handle() -@check_rank_today_cmd.handle() +@check_rank_today.handle([Cooldown(15, prompt=_lmt_notice)]) +@check_rank_today_cmd.handle([Cooldown(15, prompt=_lmt_notice)]) async def _check_rank_today(event: GroupMessageEvent): await check_rank_today.send("别急!正在统计!") group_id = event.group_id - u = await AntiEffort().update_data() - if u == 114514: - await check_rank_today.finish("贵群还没有人加入卷王统计榜!") - - data = AntiEffort().get_data(group_id) - if not data: + user_id = event.user_id + raw_data = AntiEffort().get_data(group_id) + if not raw_data: await check_rank_today.finish("贵群还没有人加入卷王统计榜!") - data = sorted(data, key=lambda x: x["recent_count"], reverse=True) - table = [ - [ - f"{i + 1}", - f"{x['user_nickname']}", - f"{round(x['recent_count'], 2)}", - ] - for i, x in enumerate(data) - ] - table.insert(0, ["Rank", "Member", "Today"]) - result = tabulate(table, tablefmt="plain") - now_time = datetime.now().strftime("%Y/%m/%d") - repo = f"《今日卷王》 单位: 小时\nRank Date: {now_time}\n{result}" - await check_rank_today.finish(repo) + result = AntiEffort().gen_rank(raw_data, user_id, "today") + await check_rank_today.finish(result) check_rank_recent_week = AntiEffort().on_command("周卷王", "查看近一周卷王榜") check_rank_recent_week_cmd = AntiEffort().cmd_as_group("rank.week", "查看近一周卷王榜") -@check_rank_recent_week.handle() -@check_rank_recent_week_cmd.handle() +@check_rank_recent_week.handle([Cooldown(15, prompt=_lmt_notice)]) +@check_rank_recent_week_cmd.handle([Cooldown(15, prompt=_lmt_notice)]) async def _check_rank_recent_week(event: GroupMessageEvent): await check_rank_recent_week.send("别急!正在统计!") group_id = event.group_id - u = await AntiEffort().update_data() - if u == 114514: + user_id = event.user_id + raw_data = AntiEffort().get_data(group_id) + if not raw_data: await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!") - data = AntiEffort().get_data(group_id) - if not data: - await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!") + result = AntiEffort().gen_rank(raw_data, user_id, "recent_week") + await check_rank_recent_week.finish(result) + + +check_rank_global_today = AntiEffort().on_command("公共卷王", "查看公共卷王榜") +check_rank_global_today_cmd = AntiEffort().cmd_as_group("rank.global", "查看公共卷王榜") + + +@check_rank_global_today.handle([Cooldown(15, prompt=_lmt_notice)]) +@check_rank_global_today_cmd.handle([Cooldown(15, prompt=_lmt_notice)]) +async def _check_rank_global_today(event: MessageEvent): + await check_rank_global_today.send("别急!正在统计!") + + user_id = event.user_id + raw_data = AntiEffort().get_data(int()) + if not raw_data: + await check_rank_global_today.finish("还没有人加入公共卷王统计榜!") + + result = AntiEffort().gen_rank(raw_data, user_id, "global_today") + await check_rank_global_today.finish(result) + + +check_rank_global_recent_week = AntiEffort().on_command("公共周卷王", "查看公共卷王榜") +check_rank_global_recent_week_cmd = AntiEffort().cmd_as_group( + "rank.global.week", "查看公共卷王榜" +) + + +@check_rank_global_recent_week.handle([Cooldown(15, prompt=_lmt_notice)]) +@check_rank_global_recent_week_cmd.handle([Cooldown(15, prompt=_lmt_notice)]) +async def _check_rank_global_recent_week(event: MessageEvent): + await check_rank_global_recent_week.send("别急!正在统计!") + + user_id = event.user_id + raw_data = AntiEffort().get_data(int()) + if not raw_data: + await check_rank_global_recent_week.finish("还没有人加入公共卷王统计榜!") + + result = AntiEffort().gen_rank(raw_data, user_id, "global_recent_week") + await check_rank_global_recent_week.finish(result) + + +update_data = AntiEffort().cmd_as_group("update", "更新卷王统计榜数据", permission=SUPERUSER) + + +@update_data.handle() +async def _update_data(event: MessageEvent): + await AntiEffort().update_data() + await update_data.finish("更新完成~!") + + [email protected]_job("interval", name="卷王数据更新", minutes=15, misfire_grace_time=15) # type: ignore +async def _(): + await AntiEffort().update_data() + + [email protected]_job("cron", name="卷王数据存储", hour=0, misfire_grace_time=30) # type: ignore +async def _(): + await AntiEffort().update_data() + AntiEffort().store_user_data_recent() + + [email protected]_job("cron", name="对昨日卷王进行颁奖", hour=8, misfire_grace_time=30) # type: ignore +async def _(): + files = os.listdir(PLUGIN_DIR) + if not files: + return + + eb_g = list() + for f in files: + raw_data = f.split(".") + if raw_data[-1] != "json": + continue + + patt = r"([0-9].*?)-ld" + match = re.findall(patt, raw_data[0]) + if not match: + continue + + eb_g.append(match[0]) + + if not eb_g: + return + + bot = get_bot() + for g in eb_g: + file_path = PLUGIN_DIR / f"{g}-ld.json" + raw_data = json.loads(file_path.read_bytes()) + data = raw_data["data"] + data = sorted(data, key=lambda x: x["recent_count"], reverse=True) + winner = data[0] + winner_id = int(winner["user_id"]) + winner_nickname = winner["user_nickname"] + coding_time = float(winner["recent_count"]) + + img = await AntiEffort().gen_img(winner_id, winner_nickname, coding_time) + result = MessageSegment.image(img) - data = sorted(data, key=lambda x: x["last_7_days_count"], reverse=True) - table = [ - [ - f"{i + 1}", - f"{x['user_nickname']}", - f"{round(x['last_7_days_count'], 2)}", - ] - for i, x in enumerate(data) - ] - table.insert(0, ["Rank", "Member", "Last 7 Days"]) - result = tabulate(table, tablefmt="plain") - now_time = datetime.now().strftime("%Y/%m/%d") - repo = f"《近一周卷王》 单位: 小时\nRank Date: {now_time}\n{result}" - await check_rank_recent_week.finish(repo) + await bot.send_group_msg(group_id=g, message="昨日卷王已经产生!") + await bot.send_group_msg(group_id=g, message=Message(result)) diff --git a/ATRI/plugins/anti_effort/data_source.py b/ATRI/plugins/anti_effort/data_source.py index 0a0ef02..ba9cdf5 100644 --- a/ATRI/plugins/anti_effort/data_source.py +++ b/ATRI/plugins/anti_effort/data_source.py @@ -1,16 +1,23 @@ -import json import re import os +import json from pathlib import Path +from datetime import datetime +from tabulate import tabulate -from ATRI.service import Service +from ATRI import driver +from ATRI.service import Service, ServiceTools from ATRI.rule import is_in_service from ATRI.utils import request from ATRI.log import logger as log -from .models import AntiEffortUserModel +from .image_dealer import image_dealer +from .models import AntiEffortModel, AntiEffortUserModel +TENCENT_AVATER_URL = "https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640" +SOURCE_URL = "https://fastly.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/" + PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort" PLUGIN_DIR.mkdir(parents=True, exist_ok=True) @@ -21,10 +28,42 @@ class AntiEffort(Service): self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae" ) - def add_user( + def get_enabled_group(self) -> list: + groups = list() + + files = os.listdir(PLUGIN_DIR) + if not files: + return groups + + for f in files: + raw_data = f.split(".") + if raw_data[-1] != "json": + continue + + if "-ld" in raw_data[0]: + patt = r"([0-9].*?)-ld" + match = re.findall(patt, raw_data[0]) + if match: + group_id = int(match[0]) + groups.append(group_id) + else: + group_id = int(raw_data[0]) + groups.append(group_id) + + return groups + + def get_data(self, group_id: int) -> dict: + file_path = PLUGIN_DIR / f"{group_id}.json" + if not file_path.exists(): + return dict() + return json.loads(file_path.read_text()) + + async def add_user( self, group_id: int, user_id: int, user_nickname: str, waka_url: str ) -> str: - patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json" + patt = ( + r"https:\/\/wakatime.com\/share\/@([a-zA-Z0-9].*?)\/([a-zA-Z0-9].*?).json" + ) match = re.findall(patt, waka_url) if not match: return "哥,你提供的链接有问题啊" @@ -33,74 +72,100 @@ class AntiEffort(Service): file_path = PLUGIN_DIR / f"{group_id}.json" if not file_path.exists(): + now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S") + d = AntiEffortModel(update_time=now_time, data=list()).dict() with open(file_path, "w") as f: - f.write(json.dumps(list())) + f.write(json.dumps(d)) - data: list = json.loads(file_path.read_text()) + raw_data = json.loads(file_path.read_text()) + data: list = raw_data["data"] for i in data: if i["user_id"] == user_id: return "你已经在卷王统计榜力!" + try: + resp = await request.get(waka_url) + user_w_data = resp.json() # type: ignore + + d = user_w_data["data"] + last_7_days_count = float() + recent_count = float(d[-1]["grand_total"]["decimal"]) + for u in d: + last_7_days_count += float(u["grand_total"]["decimal"]) + except Exception: + log.error(f"获取卷王 {w_user_id} 数据失败!") + last_7_days_count = int() + recent_count = int() + data.append( AntiEffortUserModel( user_id=user_id, user_nickname=user_nickname, w_user_id=w_user_id, waka_url=waka_url, - last_7_days_count=int(), - recent_count=int(), + last_7_days_count=last_7_days_count, + recent_count=recent_count, ).dict() ) + raw_data["data"] = data with open(file_path, "w") as f: - f.write(json.dumps(data)) + f.write(json.dumps(raw_data)) return "成功加入卷王统计榜!" def del_user(self, group_id: int, user_id: int) -> str: - data = self.get_data(group_id) - file_path = PLUGIN_DIR / f"{group_id}.json" - if file_path.exists(): + raw_data = self.get_data(group_id) + if raw_data: + file_path = PLUGIN_DIR / f"{group_id}.json" + data = raw_data["data"] for i in data: if i["user_id"] == user_id: data.remove(i) - with open(PLUGIN_DIR / f"{group_id}.json", "w") as f: - f.write(json.dumps(data)) + raw_data["data"] = data + with open(file_path, "w") as f: + f.write(json.dumps(raw_data)) return "成功退出卷王统计榜!" return "你未加入卷王统计榜捏" - def get_data(self, group_id: int) -> list: - file_path = PLUGIN_DIR / f"{group_id}.json" - if not file_path.exists(): - return list() - return json.loads(file_path.read_text()) - def update_user(self, group_id: int, user_id: int, update_map: dict): - data = self.get_data(group_id) - if data: + raw_data = self.get_data(group_id) + if raw_data: + now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S") file_path = PLUGIN_DIR / f"{group_id}.json" + data = raw_data["data"] for i in data: if i["user_id"] == user_id: for k, v in update_map.items(): i[k] = v + raw_data["update_time"] = now_time + raw_data["data"] = data with open(file_path, "w") as f: - f.write(json.dumps(data)) + f.write(json.dumps(raw_data)) - async def update_data(self): - files = os.listdir(PLUGIN_DIR) - if not files: - return 114514 + def store_user_data_recent(self): + groups = self.get_enabled_group() + if not groups: + return - groups = list() - for f in files: - group_id = int(f.split(".")[0]) - groups.append(group_id) + for g in groups: + data = self.get_data(g) + if not data: + return + + file_path = PLUGIN_DIR / f"{g}-ld.json" + with open(file_path, "w") as f: + f.write(json.dumps(data)) + + async def update_data(self): + groups = self.get_enabled_group() for i in groups: - data = self.get_data(i) - if not data: + raw_data = self.get_data(i) + if not raw_data: continue + data = raw_data["data"] for j in data: try: resp = await request.get(j["waka_url"]) @@ -109,7 +174,10 @@ class AntiEffort(Service): log.error(f"获取卷王 {j['user_id']} 数据失败!") continue - d = user_w_data["data"] + d = user_w_data.get("data", dict()) + if not d: + continue + user_w_last_7_days_count = float() user_w_recent_count = float(d[-1]["grand_total"]["decimal"]) for u in d: @@ -120,3 +188,81 @@ class AntiEffort(Service): "recent_count": user_w_recent_count, } self.update_user(i, j["user_id"], u_data) + + async def gen_img( + self, user_id: int, user_nickname: str, coding_time: float + ) -> str: + + try: + resp = await request.get(TENCENT_AVATER_URL.format(user_id=user_id)) + except Exception: + log.error("插件 anti_effort 获取用户头像失败!") + return str() + + avatar_byt = resp.read() # type: ignore + result = image_dealer( + avatar_byt, user_nickname, str(round(coding_time, 2)) + " hrs" + ) + return f"file:///{result}" + + def gen_rank(self, raw_data: dict, user_id: int, typ: str) -> str: + table_type = "Today" + sort_type = "recent_count" + if typ == "today": + rank_type = "今日" + elif typ == "recent_week": + rank_type = "近一周" + table_type = "Last 7 Days" + sort_type = "last_7_days_count" + elif typ == "global_today": + rank_type = "今日公共" + else: + rank_type = "近一周公共" + table_type = "Last 7 Days" + sort_type = "last_7_days_count" + + data = raw_data["data"] + data = sorted(data, key=lambda x: x[sort_type], reverse=True) + + user_rank = 0 + for i, user in enumerate(data): + if user["user_id"] == user_id: + user_rank = i + 1 + break + + table = [ + [ + f"{i + 1}", + f"{x['user_nickname']}", + f"{round(x['recent_count'], 2)} hrs", + ] + for i, x in enumerate(data) + ][:10] + table.insert(0, ["Rank", "Member", table_type]) + result = tabulate(table, tablefmt="plain") + update_time = raw_data["update_time"] + rank = f"\n你位于第 {user_rank} 名" if user_rank else str() + repo = f"《{rank_type}十佳卷王》\nUpdate Time: {update_time}\n{result}{rank}" + return repo + + +async def init_source(): + files = ["xb-bg-0.png", "hwxw.ttf"] + + try: + for i in files: + file_path = PLUGIN_DIR / i + if not file_path.is_file(): + log.warning("插件 anti_effort 缺少所需资源,装载中") + url = SOURCE_URL + i + data = await request.get(url) + with open(file_path, "wb") as f: + f.write(data.read()) # type: ignore + except Exception: + ServiceTools.service_controller("谁是卷王", False) + log.error(f"插件 anti_effort 装载资源失败. 已自动禁用") + + log.success("插件 anti_effort 装载资源完成") + + +driver().on_startup(init_source) diff --git a/ATRI/plugins/anti_effort/models.py b/ATRI/plugins/anti_effort/models.py index aca8fa4..9a62314 100644 --- a/ATRI/plugins/anti_effort/models.py +++ b/ATRI/plugins/anti_effort/models.py @@ -1,6 +1,11 @@ from pydantic import BaseModel +class AntiEffortModel(BaseModel): + update_time: str + data: list + + class AntiEffortUserModel(BaseModel): user_id: int user_nickname: str |