1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
|
import os
import re
import json
from random import choice
from nonebot import get_bot
from nonebot.params import ArgPlainText
from nonebot.adapters.onebot.v11 import (
Message,
MessageEvent,
MessageSegment,
GroupMessageEvent,
)
from nonebot.adapters.onebot.v11.helpers import Cooldown
from ATRI.service import Service
from ATRI.message import MessageBuilder
from ATRI.permission import MASTER
from ATRI.utils.apscheduler import scheduler
from .data_source import AntiEffort, PLUGIN_DIR
plugin = Service("谁是卷王").document("谁是卷王!").main_cmd("/ae")
ae = AntiEffort()
_lmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "呜呜...别急"])
_GET_URL_MSG = (
MessageBuilder("请键入wakatime share embed URL:")
.text("- 前往 wakatime.com/share/embed")
.text("- Format 选择 JSON")
.text("- Chart Type 选择 Coding Activity")
.text("- Date Range 选择 Last 7 Days")
.text("- 所需url为下一栏 HTML 中的 url")
)
add_user = plugin.on_command("!我也要卷", "加入卷王统计榜")
@add_user.got(
"waka_url",
_GET_URL_MSG,
)
@add_user.got("rank_nickname", "如何在排行榜中称呼你捏")
@add_user.got("to_global", "你希望加入公共排行榜吗?(y/n)", [Cooldown(60, prompt=_lmt_notice)])
async def _deal_add_user(
event: GroupMessageEvent,
url: str = ArgPlainText("waka_url"),
user_nickname: str = ArgPlainText("rank_nickname"),
to_global: str = ArgPlainText("to_global"),
):
group_id = event.group_id
user_id = event.user_id
aititude = ["y", "Y", "是", "希望", "同意"]
if to_global in aititude:
await ae.add_user(int(), user_id, user_nickname, url)
result = await ae.add_user(group_id, user_id, user_nickname, url)
await add_user.finish(result)
join_global_rank = plugin.on_command("!参加公共卷", "加入公共卷王榜")
@join_global_rank.handle()
async def _join_global_rank(event: GroupMessageEvent):
group_id = event.group_id
user_id = event.user_id
raw_data = ae.get_data(group_id)
if raw_data:
data = raw_data["data"]
for i in data:
if i["user_id"] == user_id:
user_nickname = i["user_nickname"]
url = i["waka_url"]
await ae.add_user(int(), user_id, user_nickname, url)
await join_global_rank.finish("完成~!")
@join_global_rank.got("waka_url", _GET_URL_MSG)
@join_global_rank.got(
"rank_nickname", "如何在排行榜中称呼你捏", [Cooldown(60, prompt=_lmt_notice)]
)
async def _(
event: GroupMessageEvent,
url: str = ArgPlainText("waka_url"),
user_nickname: str = ArgPlainText("rank_nickname"),
):
user_id = event.user_id
result = await ae.add_user(int(), user_id, user_nickname, url)
await join_global_rank.finish(result)
user_leave = plugin.on_command("!我不卷了", "退出卷王统计榜")
@user_leave.handle([Cooldown(60, prompt=_lmt_notice)])
async def _user_leave(event: GroupMessageEvent):
group_id = event.group_id
user_id = event.user_id
ae.del_user(int(), user_id)
result = ae.del_user(group_id, user_id)
await user_leave.finish(result)
check_rank_today = plugin.on_command("今日卷王", "查看今日卷王榜", aliases={"日卷王"})
@check_rank_today.handle([Cooldown(15, prompt=_lmt_notice)])
async def _check_rank_today(event: GroupMessageEvent):
await check_rank_today.send("别急!正在统计!")
group_id = event.group_id
user_id = event.user_id
raw_data = ae.get_data(group_id)
if not raw_data:
await check_rank_today.finish("贵群还没有人加入卷王统计榜!")
result = ae.gen_rank(raw_data, user_id, "today")
await check_rank_today.finish(result)
check_rank_recent_week = plugin.on_command("周卷王", "查看近一周卷王榜")
@check_rank_recent_week.handle([Cooldown(15, prompt=_lmt_notice)])
async def _check_rank_recent_week(event: GroupMessageEvent):
await check_rank_recent_week.send("别急!正在统计!")
group_id = event.group_id
user_id = event.user_id
raw_data = ae.get_data(group_id)
if not raw_data:
await check_rank_recent_week.finish("贵群还没有人加入卷王统计榜!")
result = ae.gen_rank(raw_data, user_id, "recent_week")
await check_rank_recent_week.finish(result)
check_rank_global_today = plugin.on_command("公共卷王", "查看今日公共卷王榜")
@check_rank_global_today.handle([Cooldown(15, prompt=_lmt_notice)])
async def _check_rank_global_today(event: MessageEvent):
await check_rank_global_today.send("别急!正在统计!")
user_id = event.user_id
raw_data = ae.get_data(int())
if not raw_data:
await check_rank_global_today.finish("还没有人加入公共卷王统计榜!")
result = ae.gen_rank(raw_data, user_id, "global_today")
await check_rank_global_today.finish(result)
check_rank_global_recent_week = plugin.on_command("公共周卷王", "查看近一周公共卷王榜")
@check_rank_global_recent_week.handle([Cooldown(15, prompt=_lmt_notice)])
async def _check_rank_global_recent_week(event: MessageEvent):
await check_rank_global_recent_week.send("别急!正在统计!")
user_id = event.user_id
raw_data = ae.get_data(int())
if not raw_data:
await check_rank_global_recent_week.finish("还没有人加入公共卷王统计榜!")
result = ae.gen_rank(raw_data, user_id, "global_recent_week")
await check_rank_global_recent_week.finish(result)
update_data = plugin.cmd_as_group("update", "更新卷王统计榜数据", permission=MASTER)
@update_data.handle()
async def _update_data(event: MessageEvent):
await ae.update_data()
await update_data.finish("更新完成~!")
@scheduler.scheduled_job("interval", name="卷王数据更新", minutes=15, misfire_grace_time=15) # type: ignore
async def _():
await ae.update_data()
@scheduler.scheduled_job("cron", name="卷王数据存储", hour=0, misfire_grace_time=30) # type: ignore
async def _():
await ae.update_data()
ae.store_user_data_recent()
@scheduler.scheduled_job("cron", name="对昨日卷王进行颁奖", hour=8, misfire_grace_time=30) # type: ignore
async def _():
files = os.listdir(PLUGIN_DIR)
if not files:
return
eb_g = list()
for f in files:
raw_data = f.split(".")
if raw_data[-1] != "json":
continue
patt = r"([0-9].*?)-ld"
match = re.findall(patt, raw_data[0])
if not match:
continue
eb_g.append(match[0])
if not eb_g:
return
bot = get_bot()
for g in eb_g:
if not int(g):
continue
file_path = PLUGIN_DIR / f"{g}-ld.json"
raw_data = json.loads(file_path.read_bytes())
data = raw_data["data"]
data = sorted(data, key=lambda x: x["recent_count"], reverse=True)
if len(data) == 0:
continue
winner = data[0]
winner_id = int(winner["user_id"])
winner_nickname = winner["user_nickname"]
coding_time = float(winner["recent_count"])
img = await ae.gen_img(winner_id, winner_nickname, coding_time)
result = MessageSegment.image(img)
try:
await bot.send_group_msg(group_id=g, message="昨日卷王已经产生!")
await bot.send_group_msg(group_id=g, message=Message(result))
except Exception:
continue
|