summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-07-22 19:44:04 +0800
committerKyomotoi <[email protected]>2022-07-22 19:44:04 +0800
commit3f4194c94a51449ac45e3ad47b6e34857ea400df (patch)
tree0e20620b9f8c48a717faba6062accd4a831ab1ea
parentd592237df00fb5516cd281940a4dc3a9620fb556 (diff)
downloadATRI-3f4194c94a51449ac45e3ad47b6e34857ea400df.tar.gz
ATRI-3f4194c94a51449ac45e3ad47b6e34857ea400df.tar.bz2
ATRI-3f4194c94a51449ac45e3ad47b6e34857ea400df.zip
✨🎨 完善插件, 优化代码
-rw-r--r--ATRI/plugins/anti_effort/__init__.py240
-rw-r--r--ATRI/plugins/anti_effort/data_source.py216
-rw-r--r--ATRI/plugins/anti_effort/models.py5
3 files changed, 371 insertions, 90 deletions
diff --git a/ATRI/plugins/anti_effort/__init__.py b/ATRI/plugins/anti_effort/__init__.py
index 48f6ec3..c08dd3b 100644
--- a/ATRI/plugins/anti_effort/__init__.py
+++ b/ATRI/plugins/anti_effort/__init__.py
@@ -1,12 +1,25 @@
-from tabulate import tabulate
-from datetime import datetime
+import os
+import re
+import json
+from random import choice
-from nonebot.matcher import Matcher
-from nonebot.params import CommandArg, ArgPlainText
-from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent
+from nonebot import get_bot
+from nonebot.params import ArgPlainText
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.onebot.v11 import (
+ Message,
+ MessageEvent,
+ MessageSegment,
+ GroupMessageEvent,
+)
+from nonebot.adapters.onebot.v11.helpers import Cooldown
-from .data_source import AntiEffort
+from ATRI.utils.apscheduler import scheduler
+from .data_source import AntiEffort, PLUGIN_DIR
+
+
+_lmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "呜呜...别急"])
_GET_URL_MSG = """请键入wakatime share embed URL:
获取方法:
@@ -22,32 +35,81 @@ add_user = AntiEffort().on_command("!我也要卷", "加入卷王统计榜")
add_user_cmd = AntiEffort().cmd_as_group("join", "加入卷王统计榜")
-@add_user.got("waka_url", _GET_URL_MSG)
-@add_user_cmd.got("waka_url", _GET_URL_MSG)
+@add_user.got(
+ "waka_url",
+ _GET_URL_MSG,
+)
+@add_user_cmd.got(
+ "waka_url",
+ _GET_URL_MSG,
+)
@add_user.got("rank_nickname", "如何在排行榜中称呼你捏")
@add_user_cmd.got("rank_nickname", "如何在排行榜中称呼你捏")
+@add_user.got("to_global", "你希望加入公共排行榜吗?(y/n)", [Cooldown(60, prompt=_lmt_notice)])
+@add_user_cmd.got("to_global", "你希望加入公共排行榜吗?(y/n)", [Cooldown(60, prompt=_lmt_notice)])
async def _deal_add_user(
event: GroupMessageEvent,
- _url: str = ArgPlainText("waka_url"),
+ url: str = ArgPlainText("waka_url"),
user_nickname: str = ArgPlainText("rank_nickname"),
+ to_global: str = ArgPlainText("to_global"),
):
group_id = event.group_id
user_id = event.user_id
+ aititude = ["y", "Y", "是", "希望", "同意"]
+ if to_global in aititude:
+ await AntiEffort().add_user(int(), user_id, user_nickname, url)
- result = AntiEffort().add_user(group_id, user_id, user_nickname, _url)
+ result = await AntiEffort().add_user(group_id, user_id, user_nickname, url)
await add_user.finish(result)
+join_global_rank = AntiEffort().on_command("!参加公共卷王", "加入公共卷王榜")
+join_global_rank_cmd = AntiEffort().cmd_as_group("join.global", "加入公共卷王榜")
+
+
+@join_global_rank.handle()
+@join_global_rank_cmd.handle()
+async def _join_global_rank(event: GroupMessageEvent):
+ group_id = event.group_id
+ user_id = event.user_id
+
+ raw_data = AntiEffort().get_data(group_id)
+ if raw_data:
+ data = raw_data["data"]
+ for i in data:
+ if i["user_id"] == user_id:
+ user_nickname = i["user_nickname"]
+ url = i["waka_url"]
+ await AntiEffort().add_user(int(), user_id, user_nickname, url)
+ await join_global_rank.finish("完成~!")
+
+
+@join_global_rank.got("waka_url", _GET_URL_MSG)
+@join_global_rank_cmd.got("waka_url", _GET_URL_MSG)
+@join_global_rank.got("rank_nickname", "如何在排行榜中称呼你捏")
+@join_global_rank_cmd.got("rank_nickname", "如何在排行榜中称呼你捏")
+async def _(
+ event: GroupMessageEvent,
+ url: str = ArgPlainText("waka_url"),
+ user_nickname: str = ArgPlainText("rank_nickname"),
+):
+ user_id = event.user_id
+
+ result = await AntiEffort().add_user(int(), user_id, user_nickname, url)
+ await join_global_rank.finish(result)
+
+
user_leave = AntiEffort().on_command("!我不卷了", "退出卷王统计榜")
user_leave_cmd = AntiEffort().cmd_as_group("leave", "退出卷王统计榜")
-@user_leave.handle()
-@user_leave_cmd.handle()
+@user_leave.handle([Cooldown(60, prompt=_lmt_notice)])
+@user_leave_cmd.handle([Cooldown(60, prompt=_lmt_notice)])
async def _user_leave(event: GroupMessageEvent):
group_id = event.group_id
user_id = event.user_id
+ AntiEffort().del_user(int(), user_id)
result = AntiEffort().del_user(group_id, user_id)
await user_leave.finish(result)
@@ -56,65 +118,133 @@ check_rank_today = AntiEffort().on_command("今日卷王", "查看今日卷王�
check_rank_today_cmd = AntiEffort().cmd_as_group("rank.today", "查看今日卷王榜")
-@check_rank_today.handle()
-@check_rank_today_cmd.handle()
+@check_rank_today.handle([Cooldown(15, prompt=_lmt_notice)])
+@check_rank_today_cmd.handle([Cooldown(15, prompt=_lmt_notice)])
async def _check_rank_today(event: GroupMessageEvent):
await check_rank_today.send("别急!正在统计!")
group_id = event.group_id
- u = await AntiEffort().update_data()
- if u == 114514:
- await check_rank_today.finish("贵群还没有人加入卷王统计榜!")
-
- data = AntiEffort().get_data(group_id)
- if not data:
+ user_id = event.user_id
+ raw_data = AntiEffort().get_data(group_id)
+ if not raw_data:
await check_rank_today.finish("贵群还没有人加入卷王统计榜!")
- data = sorted(data, key=lambda x: x["recent_count"], reverse=True)
- table = [
- [
- f"{i + 1}",
- f"{x['user_nickname']}",
- f"{round(x['recent_count'], 2)}",
- ]
- for i, x in enumerate(data)
- ]
- table.insert(0, ["Rank", "Member", "Today"])
- result = tabulate(table, tablefmt="plain")
- now_time = datetime.now().strftime("%Y/%m/%d")
- repo = f"《今日卷王》 单位: 小时\nRank Date: {now_time}\n{result}"
- await check_rank_today.finish(repo)
+ result = AntiEffort().gen_rank(raw_data, user_id, "today")
+ await check_rank_today.finish(result)
check_rank_recent_week = AntiEffort().on_command("周卷王", "查看近一周卷王榜")
check_rank_recent_week_cmd = AntiEffort().cmd_as_group("rank.week", "查看近一周卷王榜")
-@check_rank_recent_week.handle()
-@check_rank_recent_week_cmd.handle()
+@check_rank_recent_week.handle([Cooldown(15, prompt=_lmt_notice)])
+@check_rank_recent_week_cmd.handle([Cooldown(15, prompt=_lmt_notice)])
async def _check_rank_recent_week(event: GroupMessageEvent):
await check_rank_recent_week.send("别急!正在统计!")
group_id = event.group_id
- u = await AntiEffort().update_data()
- if u == 114514:
+ user_id = event.user_id
+ raw_data = AntiEffort().get_data(group_id)
+ if not raw_data:
await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!")
- data = AntiEffort().get_data(group_id)
- if not data:
- await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!")
+ result = AntiEffort().gen_rank(raw_data, user_id, "recent_week")
+ await check_rank_recent_week.finish(result)
+
+
+check_rank_global_today = AntiEffort().on_command("公共卷王", "查看公共卷王榜")
+check_rank_global_today_cmd = AntiEffort().cmd_as_group("rank.global", "查看公共卷王榜")
+
+
+@check_rank_global_today.handle([Cooldown(15, prompt=_lmt_notice)])
+@check_rank_global_today_cmd.handle([Cooldown(15, prompt=_lmt_notice)])
+async def _check_rank_global_today(event: MessageEvent):
+ await check_rank_global_today.send("别急!正在统计!")
+
+ user_id = event.user_id
+ raw_data = AntiEffort().get_data(int())
+ if not raw_data:
+ await check_rank_global_today.finish("还没有人加入公共卷王统计榜!")
+
+ result = AntiEffort().gen_rank(raw_data, user_id, "global_today")
+ await check_rank_global_today.finish(result)
+
+
+check_rank_global_recent_week = AntiEffort().on_command("公共周卷王", "查看公共卷王榜")
+check_rank_global_recent_week_cmd = AntiEffort().cmd_as_group(
+ "rank.global.week", "查看公共卷王榜"
+)
+
+
+@check_rank_global_recent_week.handle([Cooldown(15, prompt=_lmt_notice)])
+@check_rank_global_recent_week_cmd.handle([Cooldown(15, prompt=_lmt_notice)])
+async def _check_rank_global_recent_week(event: MessageEvent):
+ await check_rank_global_recent_week.send("别急!正在统计!")
+
+ user_id = event.user_id
+ raw_data = AntiEffort().get_data(int())
+ if not raw_data:
+ await check_rank_global_recent_week.finish("还没有人加入公共卷王统计榜!")
+
+ result = AntiEffort().gen_rank(raw_data, user_id, "global_recent_week")
+ await check_rank_global_recent_week.finish(result)
+
+
+update_data = AntiEffort().cmd_as_group("update", "更新卷王统计榜数据", permission=SUPERUSER)
+
+
+@update_data.handle()
+async def _update_data(event: MessageEvent):
+ await AntiEffort().update_data()
+ await update_data.finish("更新完成~!")
+
+
[email protected]_job("interval", name="卷王数据更新", minutes=15, misfire_grace_time=15) # type: ignore
+async def _():
+ await AntiEffort().update_data()
+
+
[email protected]_job("cron", name="卷王数据存储", hour=0, misfire_grace_time=30) # type: ignore
+async def _():
+ await AntiEffort().update_data()
+ AntiEffort().store_user_data_recent()
+
+
[email protected]_job("cron", name="对昨日卷王进行颁奖", hour=8, misfire_grace_time=30) # type: ignore
+async def _():
+ files = os.listdir(PLUGIN_DIR)
+ if not files:
+ return
+
+ eb_g = list()
+ for f in files:
+ raw_data = f.split(".")
+ if raw_data[-1] != "json":
+ continue
+
+ patt = r"([0-9].*?)-ld"
+ match = re.findall(patt, raw_data[0])
+ if not match:
+ continue
+
+ eb_g.append(match[0])
+
+ if not eb_g:
+ return
+
+ bot = get_bot()
+ for g in eb_g:
+ file_path = PLUGIN_DIR / f"{g}-ld.json"
+ raw_data = json.loads(file_path.read_bytes())
+ data = raw_data["data"]
+ data = sorted(data, key=lambda x: x["recent_count"], reverse=True)
+ winner = data[0]
+ winner_id = int(winner["user_id"])
+ winner_nickname = winner["user_nickname"]
+ coding_time = float(winner["recent_count"])
+
+ img = await AntiEffort().gen_img(winner_id, winner_nickname, coding_time)
+ result = MessageSegment.image(img)
- data = sorted(data, key=lambda x: x["last_7_days_count"], reverse=True)
- table = [
- [
- f"{i + 1}",
- f"{x['user_nickname']}",
- f"{round(x['last_7_days_count'], 2)}",
- ]
- for i, x in enumerate(data)
- ]
- table.insert(0, ["Rank", "Member", "Last 7 Days"])
- result = tabulate(table, tablefmt="plain")
- now_time = datetime.now().strftime("%Y/%m/%d")
- repo = f"《近一周卷王》 单位: 小时\nRank Date: {now_time}\n{result}"
- await check_rank_recent_week.finish(repo)
+ await bot.send_group_msg(group_id=g, message="昨日卷王已经产生!")
+ await bot.send_group_msg(group_id=g, message=Message(result))
diff --git a/ATRI/plugins/anti_effort/data_source.py b/ATRI/plugins/anti_effort/data_source.py
index 0a0ef02..ba9cdf5 100644
--- a/ATRI/plugins/anti_effort/data_source.py
+++ b/ATRI/plugins/anti_effort/data_source.py
@@ -1,16 +1,23 @@
-import json
import re
import os
+import json
from pathlib import Path
+from datetime import datetime
+from tabulate import tabulate
-from ATRI.service import Service
+from ATRI import driver
+from ATRI.service import Service, ServiceTools
from ATRI.rule import is_in_service
from ATRI.utils import request
from ATRI.log import logger as log
-from .models import AntiEffortUserModel
+from .image_dealer import image_dealer
+from .models import AntiEffortModel, AntiEffortUserModel
+TENCENT_AVATER_URL = "https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
+SOURCE_URL = "https://fastly.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/"
+
PLUGIN_DIR = Path(".") / "data" / "plugins" / "anti_effort"
PLUGIN_DIR.mkdir(parents=True, exist_ok=True)
@@ -21,10 +28,42 @@ class AntiEffort(Service):
self, "谁是卷王", "谁是卷王!", rule=is_in_service("谁是卷王"), main_cmd="/ae"
)
- def add_user(
+ def get_enabled_group(self) -> list:
+ groups = list()
+
+ files = os.listdir(PLUGIN_DIR)
+ if not files:
+ return groups
+
+ for f in files:
+ raw_data = f.split(".")
+ if raw_data[-1] != "json":
+ continue
+
+ if "-ld" in raw_data[0]:
+ patt = r"([0-9].*?)-ld"
+ match = re.findall(patt, raw_data[0])
+ if match:
+ group_id = int(match[0])
+ groups.append(group_id)
+ else:
+ group_id = int(raw_data[0])
+ groups.append(group_id)
+
+ return groups
+
+ def get_data(self, group_id: int) -> dict:
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ if not file_path.exists():
+ return dict()
+ return json.loads(file_path.read_text())
+
+ async def add_user(
self, group_id: int, user_id: int, user_nickname: str, waka_url: str
) -> str:
- patt = r"/@([a-zA-Z0-9].*?)/([a-zA-Z0-9].*?).json"
+ patt = (
+ r"https:\/\/wakatime.com\/share\/@([a-zA-Z0-9].*?)\/([a-zA-Z0-9].*?).json"
+ )
match = re.findall(patt, waka_url)
if not match:
return "哥,你提供的链接有问题啊"
@@ -33,74 +72,100 @@ class AntiEffort(Service):
file_path = PLUGIN_DIR / f"{group_id}.json"
if not file_path.exists():
+ now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S")
+ d = AntiEffortModel(update_time=now_time, data=list()).dict()
with open(file_path, "w") as f:
- f.write(json.dumps(list()))
+ f.write(json.dumps(d))
- data: list = json.loads(file_path.read_text())
+ raw_data = json.loads(file_path.read_text())
+ data: list = raw_data["data"]
for i in data:
if i["user_id"] == user_id:
return "你已经在卷王统计榜力!"
+ try:
+ resp = await request.get(waka_url)
+ user_w_data = resp.json() # type: ignore
+
+ d = user_w_data["data"]
+ last_7_days_count = float()
+ recent_count = float(d[-1]["grand_total"]["decimal"])
+ for u in d:
+ last_7_days_count += float(u["grand_total"]["decimal"])
+ except Exception:
+ log.error(f"获取卷王 {w_user_id} 数据失败!")
+ last_7_days_count = int()
+ recent_count = int()
+
data.append(
AntiEffortUserModel(
user_id=user_id,
user_nickname=user_nickname,
w_user_id=w_user_id,
waka_url=waka_url,
- last_7_days_count=int(),
- recent_count=int(),
+ last_7_days_count=last_7_days_count,
+ recent_count=recent_count,
).dict()
)
+ raw_data["data"] = data
with open(file_path, "w") as f:
- f.write(json.dumps(data))
+ f.write(json.dumps(raw_data))
return "成功加入卷王统计榜!"
def del_user(self, group_id: int, user_id: int) -> str:
- data = self.get_data(group_id)
- file_path = PLUGIN_DIR / f"{group_id}.json"
- if file_path.exists():
+ raw_data = self.get_data(group_id)
+ if raw_data:
+ file_path = PLUGIN_DIR / f"{group_id}.json"
+ data = raw_data["data"]
for i in data:
if i["user_id"] == user_id:
data.remove(i)
- with open(PLUGIN_DIR / f"{group_id}.json", "w") as f:
- f.write(json.dumps(data))
+ raw_data["data"] = data
+ with open(file_path, "w") as f:
+ f.write(json.dumps(raw_data))
return "成功退出卷王统计榜!"
return "你未加入卷王统计榜捏"
- def get_data(self, group_id: int) -> list:
- file_path = PLUGIN_DIR / f"{group_id}.json"
- if not file_path.exists():
- return list()
- return json.loads(file_path.read_text())
-
def update_user(self, group_id: int, user_id: int, update_map: dict):
- data = self.get_data(group_id)
- if data:
+ raw_data = self.get_data(group_id)
+ if raw_data:
+ now_time = datetime.now().strftime("%Y/%m/%d - %H:%M:%S")
file_path = PLUGIN_DIR / f"{group_id}.json"
+ data = raw_data["data"]
for i in data:
if i["user_id"] == user_id:
for k, v in update_map.items():
i[k] = v
+ raw_data["update_time"] = now_time
+ raw_data["data"] = data
with open(file_path, "w") as f:
- f.write(json.dumps(data))
+ f.write(json.dumps(raw_data))
- async def update_data(self):
- files = os.listdir(PLUGIN_DIR)
- if not files:
- return 114514
+ def store_user_data_recent(self):
+ groups = self.get_enabled_group()
+ if not groups:
+ return
- groups = list()
- for f in files:
- group_id = int(f.split(".")[0])
- groups.append(group_id)
+ for g in groups:
+ data = self.get_data(g)
+ if not data:
+ return
+
+ file_path = PLUGIN_DIR / f"{g}-ld.json"
+ with open(file_path, "w") as f:
+ f.write(json.dumps(data))
+
+ async def update_data(self):
+ groups = self.get_enabled_group()
for i in groups:
- data = self.get_data(i)
- if not data:
+ raw_data = self.get_data(i)
+ if not raw_data:
continue
+ data = raw_data["data"]
for j in data:
try:
resp = await request.get(j["waka_url"])
@@ -109,7 +174,10 @@ class AntiEffort(Service):
log.error(f"获取卷王 {j['user_id']} 数据失败!")
continue
- d = user_w_data["data"]
+ d = user_w_data.get("data", dict())
+ if not d:
+ continue
+
user_w_last_7_days_count = float()
user_w_recent_count = float(d[-1]["grand_total"]["decimal"])
for u in d:
@@ -120,3 +188,81 @@ class AntiEffort(Service):
"recent_count": user_w_recent_count,
}
self.update_user(i, j["user_id"], u_data)
+
+ async def gen_img(
+ self, user_id: int, user_nickname: str, coding_time: float
+ ) -> str:
+
+ try:
+ resp = await request.get(TENCENT_AVATER_URL.format(user_id=user_id))
+ except Exception:
+ log.error("插件 anti_effort 获取用户头像失败!")
+ return str()
+
+ avatar_byt = resp.read() # type: ignore
+ result = image_dealer(
+ avatar_byt, user_nickname, str(round(coding_time, 2)) + " hrs"
+ )
+ return f"file:///{result}"
+
+ def gen_rank(self, raw_data: dict, user_id: int, typ: str) -> str:
+ table_type = "Today"
+ sort_type = "recent_count"
+ if typ == "today":
+ rank_type = "今日"
+ elif typ == "recent_week":
+ rank_type = "近一周"
+ table_type = "Last 7 Days"
+ sort_type = "last_7_days_count"
+ elif typ == "global_today":
+ rank_type = "今日公共"
+ else:
+ rank_type = "近一周公共"
+ table_type = "Last 7 Days"
+ sort_type = "last_7_days_count"
+
+ data = raw_data["data"]
+ data = sorted(data, key=lambda x: x[sort_type], reverse=True)
+
+ user_rank = 0
+ for i, user in enumerate(data):
+ if user["user_id"] == user_id:
+ user_rank = i + 1
+ break
+
+ table = [
+ [
+ f"{i + 1}",
+ f"{x['user_nickname']}",
+ f"{round(x['recent_count'], 2)} hrs",
+ ]
+ for i, x in enumerate(data)
+ ][:10]
+ table.insert(0, ["Rank", "Member", table_type])
+ result = tabulate(table, tablefmt="plain")
+ update_time = raw_data["update_time"]
+ rank = f"\n你位于第 {user_rank} 名" if user_rank else str()
+ repo = f"《{rank_type}十佳卷王》\nUpdate Time: {update_time}\n{result}{rank}"
+ return repo
+
+
+async def init_source():
+ files = ["xb-bg-0.png", "hwxw.ttf"]
+
+ try:
+ for i in files:
+ file_path = PLUGIN_DIR / i
+ if not file_path.is_file():
+ log.warning("插件 anti_effort 缺少所需资源,装载中")
+ url = SOURCE_URL + i
+ data = await request.get(url)
+ with open(file_path, "wb") as f:
+ f.write(data.read()) # type: ignore
+ except Exception:
+ ServiceTools.service_controller("谁是卷王", False)
+ log.error(f"插件 anti_effort 装载资源失败. 已自动禁用")
+
+ log.success("插件 anti_effort 装载资源完成")
+
+
+driver().on_startup(init_source)
diff --git a/ATRI/plugins/anti_effort/models.py b/ATRI/plugins/anti_effort/models.py
index aca8fa4..9a62314 100644
--- a/ATRI/plugins/anti_effort/models.py
+++ b/ATRI/plugins/anti_effort/models.py
@@ -1,6 +1,11 @@
from pydantic import BaseModel
+class AntiEffortModel(BaseModel):
+ update_time: str
+ data: list
+
+
class AntiEffortUserModel(BaseModel):
user_id: int
user_nickname: str