summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ATRI/exceptions.py40
-rw-r--r--ATRI/plugins/essential/models.py1
-rw-r--r--ATRI/plugins/funny/__init__.py11
-rw-r--r--ATRI/plugins/funny/data_source.py53
-rw-r--r--ATRI/plugins/rss/rss_mikanan/__init__.py1
-rw-r--r--ATRI/plugins/setu/__init__.py27
-rw-r--r--ATRI/plugins/setu/data_source.py110
-rw-r--r--ATRI/plugins/setu/models.py6
-rw-r--r--ATRI/plugins/setu/nsfw_checker.py4
-rw-r--r--ATRI/plugins/status.py127
-rw-r--r--ATRI/plugins/status/__init__.py45
-rw-r--r--ATRI/plugins/status/data_source.py88
-rw-r--r--ATRI/utils/__init__.py32
13 files changed, 255 insertions, 290 deletions
diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py
index 22f643f..457acc8 100644
--- a/ATRI/exceptions.py
+++ b/ATRI/exceptions.py
@@ -7,13 +7,12 @@ from pydantic.main import BaseModel
from nonebot.matcher import Matcher
from nonebot.adapters.onebot.v11 import ActionFailed
-from nonebot.adapters.onebot.v11 import Bot, PrivateMessageEvent, GroupMessageEvent
+from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent
from nonebot.message import run_postprocessor
-from ATRI import conf
-
from .log import log
-from .utils import gen_random_str
+from .message import MessageBuilder
+from .utils import Limiter, gen_random_str
ERROR_DIR = Path(".") / "data" / "errors"
@@ -103,6 +102,9 @@ class RssError(BaseBotException):
prompt = "RSS订阅错误"
+limiter = Limiter(3, 600)
+
+
@run_postprocessor
async def _(bot: Bot, event, matcher: Matcher, exception: Optional[Exception]):
if not exception:
@@ -117,21 +119,23 @@ async def _(bot: Bot, event, matcher: Matcher, exception: Optional[Exception]):
prompt = "请参考协议端输出"
track_id = _save_error(prompt, format_exc())
except Exception as err:
- prompt = "Unknown ERROR->" + err.__class__.__name__
+ prompt = "UnkErr " + err.__class__.__name__
track_id = _save_error(prompt, format_exc())
- if isinstance(event, PrivateMessageEvent):
- _id = "用户" + event.get_user_id()
- elif isinstance(event, GroupMessageEvent):
- _id = "群" + str(event.group_id)
- else:
- _id = "unknown"
-
log.error(f"Error Track ID: {track_id}")
- msg = f"呜——出错了...追踪: {track_id}\n来自: {_id}"
- for superusers in conf.BotConfig.superusers:
- try:
- await bot.send_private_msg(user_id=superusers, message=msg)
- except BaseBotException:
- return
+ msg = (
+ MessageBuilder("呜——出错了...请反馈维护者")
+ .text(f"信息: {prompt}")
+ .text(f"追踪ID: {track_id}")
+ )
+ if isinstance(event, GroupMessageEvent):
+ group_id = str(event.group_id)
+ if not limiter.check(group_id):
+ msg = MessageBuilder("该群报错提示已达限制, 将冷却10min").text("如需反馈请: 来杯红茶")
+ limiter.increase(group_id)
+
+ try:
+ await matcher.finish(msg)
+ except Exception:
+ return
diff --git a/ATRI/plugins/essential/models.py b/ATRI/plugins/essential/models.py
index 4aa5693..dd2a801 100644
--- a/ATRI/plugins/essential/models.py
+++ b/ATRI/plugins/essential/models.py
@@ -1,4 +1,3 @@
-from typing import List, Optional
from pydantic import BaseModel
diff --git a/ATRI/plugins/funny/__init__.py b/ATRI/plugins/funny/__init__.py
index 094e569..8b060cc 100644
--- a/ATRI/plugins/funny/__init__.py
+++ b/ATRI/plugins/funny/__init__.py
@@ -63,14 +63,3 @@ async def _deal_fake(
await bot.send_group_forward_msg(group_id=group_id, messages=node)
except Exception:
await fake_msg.finish("构造失败惹...可能是被制裁了(")
-
-
-eat_what = plugin.on_regex(r"大?[今明后]天(.*?)吃[什啥]么?", "我来决定你吃什么!")
-
-
-@eat_what.handle([Cooldown(15, prompt="慢慢吃,不要贪心哦!")])
-async def _eat_what(event: MessageEvent):
- msg = str(event.get_message())
- user_name = event.sender.nickname or "裙友"
- eat = await Funny().eat_what(user_name, msg)
- await eat_what.finish(Message(eat))
diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py
index d3f88cc..9437034 100644
--- a/ATRI/plugins/funny/data_source.py
+++ b/ATRI/plugins/funny/data_source.py
@@ -58,56 +58,3 @@ class Funny:
dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}}
node.append(dic)
return node
-
- @staticmethod
- async def eat_what(name: str, msg: str) -> str:
- EAT_URL = "https://wtf.hiigara.net/api/run/"
- params = {"event": "ManualRun"}
- pattern_0 = r"大?[今明后]天(.*?)吃[什啥]么?"
- pattern_1 = r"[今|明|后|大后]天"
- arg = re.findall(pattern_0, msg)[0]
- day = re.findall(pattern_1, msg)[0]
-
- if arg == "中午":
- a = f"LdS4K6/{randint(0, 1145141919810)}"
- url = EAT_URL + a
- try:
- data = await request.post(url, params=params)
- data = data.json()
- except Exception:
- raise RequestError("Request failed!")
-
- text = Translate(data["text"]).to_simple().replace("今天", day)
- get_a = re.search(r"非常(.*?)的", text).group(0) # type: ignore
- result = text.replace(get_a, "")
-
- elif arg == "晚上":
- a = f"KaTMS/{randint(0, 1145141919810)}"
- url = EAT_URL + a
- try:
- data = await request.post(url, params=params)
- data = data.json()
- except Exception:
- raise RequestError("Request failed!")
-
- result = Translate(data["text"]).to_simple().replace("今天", day)
-
- else:
- rd = randint(1, 10)
- if rd == 5:
- result = ["吃我吧 ❤", "(脸红)请...请享用咱吧......", "都可以哦~不能挑食呢~"]
- return choice(result)
- else:
- a = f"JJr1hJ/{randint(0, 1145141919810)}"
- url = EAT_URL + a
- try:
- data = await request.post(url, params=params)
- data = data.json()
- except Exception:
- raise RequestError("Request failed!")
-
- text = Translate(data["text"]).to_simple().replace("今天", day)
- get_a = re.match(r"(.*?)的智商", text).group(0) # type: ignore
- result = text.replace(get_a, f"{name}的智商")
-
- return result
diff --git a/ATRI/plugins/rss/rss_mikanan/__init__.py b/ATRI/plugins/rss/rss_mikanan/__init__.py
index ec8dd60..2c577ee 100644
--- a/ATRI/plugins/rss/rss_mikanan/__init__.py
+++ b/ATRI/plugins/rss/rss_mikanan/__init__.py
@@ -17,7 +17,6 @@ from ATRI.log import log
from ATRI.service import Service
from ATRI.permission import ADMIN
from ATRI.message import MessageBuilder
-from ATRI.plugins.rss.rss_rsshub.data_source import RssHubSubscriptor
from ATRI.utils import timestamp2datetime
from ATRI.utils.apscheduler import scheduler
from ATRI.database import RssMikananiSubcription
diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py
index 46675fc..edb6532 100644
--- a/ATRI/plugins/setu/__init__.py
+++ b/ATRI/plugins/setu/__init__.py
@@ -29,13 +29,14 @@ async def _():
async def _random_setu(bot: Bot, event: MessageEvent):
loop = asyncio.get_running_loop()
- repo, se = await Setu.random_setu()
- await bot.send(event, repo)
+ setu, setu_data = await Setu.new()
+ setu_info = f"Title: {setu_data.title}\nPid: {setu_data.pid}"
+ await bot.send(event, setu_info)
try:
- msg_1 = await bot.send(event, Message(se))
+ msg_1 = await bot.send(event, setu)
except Exception:
- await random_setu.finish("hso(发不出")
+ await random_setu.finish("hso (发不出")
msg_id = msg_1["message_id"]
loop.call_later(60, lambda: loop.create_task(bot.delete_msg(message_id=msg_id)))
@@ -53,23 +54,21 @@ async def _(think: str = ArgPlainText("r_rush_after_think")):
tag_setu = plugin.on_regex(r"来[张点丶份](.*?)的[涩色🐍]图", "根据提供的tag查找涩图,冷却2分钟")
-@tag_setu.handle([Cooldown(120, prompt="慢...慢一..点❤")])
+@tag_setu.handle([Cooldown(120, prompt="")])
async def _tag_setu(bot: Bot, event: MessageEvent):
loop = asyncio.get_running_loop()
msg = str(event.get_message()).strip()
pattern = r"来[张点丶份](.*?)的[涩色🐍]图"
tag = re.findall(pattern, msg)[0]
- repo, se = await Setu.tag_setu(tag)
- if not plugin:
- await tag_setu.finish(repo)
-
- await bot.send(event, repo)
+ setu, setu_data = await Setu.new(tag)
+ setu_info = f"Title: {setu_data.title}\nPid: {setu_data.pid}"
+ await bot.send(event, setu_info)
try:
- msg_1 = await bot.send(event, Message(se))
+ msg_1 = await bot.send(event, setu)
except Exception:
- await random_setu.finish("hso(发不出")
+ await random_setu.finish("hso (发不出")
msg_id = msg_1["message_id"]
loop.call_later(60, lambda: loop.create_task(bot.delete_msg(message_id=msg_id)))
@@ -99,7 +98,7 @@ async def _setu_catcher(bot: Bot, event: MessageEvent):
hso = list()
for i in args:
try:
- data = await Setu.detecter(i, _catcher_max_file_size)
+ data = await Setu(i).detecter(_catcher_max_file_size)
except Exception:
return
if data > 0.7:
@@ -138,7 +137,7 @@ async def _deal_check(bot: Bot, event: MessageEvent):
if not args:
await nsfw_checker.reject("请发送图片而不是其他东西!!")
- hso = await Setu.detecter(args[0], _catcher_max_file_size)
+ hso = await Setu(args[0]).detecter(_catcher_max_file_size)
if not hso:
await nsfw_checker.finish("图太小了!不测!")
diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py
index 40cddd1..8a1a26d 100644
--- a/ATRI/plugins/setu/data_source.py
+++ b/ATRI/plugins/setu/data_source.py
@@ -1,76 +1,72 @@
-import asyncio
-from nonebot.adapters.onebot.v11 import Bot, MessageSegment
+from typing import Tuple
+from nonebot.adapters.onebot.v11 import MessageSegment
from ATRI import conf
from ATRI.utils import request
+from ATRI.exceptions import RequestError
+
+from .models import SetuInfo
from .nsfw_checker import detect_image, init_model
-LOLICON_URL = "https://api.lolicon.app/setu/v2"
-DEFAULT_SETU = (
- "https://i.pixiv.cat/img-original/img/2021/02/28/22/44/49/88124144_p0.jpg"
-)
+__LOLICON_URL = "https://api.lolicon.app/setu/v2"
class Setu:
- @staticmethod
- def _use_proxy(url: str) -> str:
- if conf.Setu.reverse_proxy:
- return url.replace("i.pixiv.cat", conf.Setu.reverse_proxy_domain)
- else:
- return url
+ def __init__(self, url: str):
+ self.url = url
@classmethod
- async def random_setu(cls) -> tuple:
- """
- 随机涩图.
- """
- res = await request.get(LOLICON_URL)
- data: dict = res.json()
- temp_data: dict = data.get("data", list())
- if not temp_data:
- return "涩批爬", None
+ async def new(cls, tag: str = str()) -> Tuple[MessageSegment, SetuInfo]:
+ """new 一个涩图
- data: dict = temp_data[0]
- title = data.get("title", "木陰のねこ")
- p_id = data.get("pid", 88124144)
- url: str = data["urls"].get("original", "ignore")
+ Args:
+ tag (str, optional): 附加 tag, 默认无
- setu = MessageSegment.image(cls._use_proxy(url), timeout=114514)
- repo = f"Title: {title}\nPid: {p_id}"
- return repo, setu
+ Raises:
+ RequestError: 涩图请求失败
- @classmethod
- async def tag_setu(cls, tag: str) -> tuple:
- """
- 指定tag涩图.
+ Returns:
+ Tuple[MessageSegment, dict]: 涩图本体, 涩图信息
"""
- url = LOLICON_URL + f"?tag={tag}"
- res = await request.get(url)
- data: dict = res.json()
-
- temp_data: dict = data.get("data", list())
- if not temp_data:
- return f"没有 {tag} 的涩图呢...", None
-
- data = temp_data[0]
- title = data.get("title", "木陰のねこ")
- p_id = data.get("pid", 88124144)
- url = data["urls"].get(
- "original",
- cls._use_proxy(DEFAULT_SETU),
- )
- setu = MessageSegment.image(url, timeout=114514)
- repo = f"Title: {title}\nPid: {p_id}"
- return repo, setu
-
- @staticmethod
- async def detecter(url: str, file_size: int) -> float:
- """
- 涩值检测.
+ url = __LOLICON_URL
+ if tag:
+ url = __LOLICON_URL + f"?tag={tag}"
+ try:
+ req = await request.get(url)
+ except Exception:
+ raise RequestError("setu: 请求失败")
+
+ data = req.json()
+ cache_data = data.get("data")
+ if not cache_data:
+ raise RequestError("今天不可以涩")
+
+ data = cache_data[0]
+ title = data["title"]
+ pid = data["pid"]
+ setu = data["urls"].get("original", "ignore")
+
+ if conf.Setu.reverse_proxy:
+ setu = MessageSegment.image(
+ file=setu.replace("i.pixiv.cat", conf.Setu.reverse_proxy_domain),
+ timeout=114514,
+ )
+
+ setu_data = SetuInfo(title=title, pid=pid)
+
+ return setu, setu_data
+
+ async def detecter(self, max_size: int) -> float:
+ """图片涩值检测
+
+ Args:
+ max_size (int): 检测文件大小限制
+
+ Returns:
+ float: 百分比涩值
"""
- data = await detect_image(url, file_size)
- return data
+ return await detect_image(self.url, max_size)
from ATRI import driver
diff --git a/ATRI/plugins/setu/models.py b/ATRI/plugins/setu/models.py
new file mode 100644
index 0000000..7144f27
--- /dev/null
+++ b/ATRI/plugins/setu/models.py
@@ -0,0 +1,6 @@
+from pydantic import BaseModel
+
+
+class SetuInfo(BaseModel):
+ title: str
+ pid: str
diff --git a/ATRI/plugins/setu/nsfw_checker.py b/ATRI/plugins/setu/nsfw_checker.py
index c0bd2ba..53546f6 100644
--- a/ATRI/plugins/setu/nsfw_checker.py
+++ b/ATRI/plugins/setu/nsfw_checker.py
@@ -40,14 +40,14 @@ def prepare_image(img):
return image
-async def detect_image(url: str, file_size: int) -> float:
+async def detect_image(url: str, max_size: int) -> float:
try:
req = await request.get(url)
except Exception:
raise RequestError("Get info from download image failed!")
img_byte = getsizeof(req.read()) // 1024
- if img_byte < file_size:
+ if img_byte < max_size:
return 0
try:
diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py
new file mode 100644
index 0000000..e7ed842
--- /dev/null
+++ b/ATRI/plugins/status.py
@@ -0,0 +1,127 @@
+import os
+import time
+import psutil
+from typing import Tuple
+from datetime import datetime
+
+from nonebot import get_bot
+from nonebot.adapters.onebot.v11 import unescape
+
+from ATRI.log import log
+from ATRI.service import Service
+from ATRI.message import MessageBuilder
+from ATRI.exceptions import GetStatusError
+from ATRI.utils.apscheduler import scheduler
+
+
+plugin = Service("状态").document("检查自身状态")
+
+
+ping = plugin.on_command("/ping", "检测bot是否存活")
+
+
+async def _():
+ await ping.finish("I'm fine.")
+
+
+status = plugin.on_command("/status", "检查bot运行资源占用")
+
+
+async def _():
+ msg, _ = get_status()
+ print(msg)
+ await status.finish(msg)
+
+
[email protected]_job("interval", name="状态检查", minutes=15, misfire_grace_time=15)
+async def _():
+ log.info("检查资源消耗中...")
+ msg, stat = get_status()
+ if not stat:
+ log.warning("资源消耗异常")
+
+ try:
+ bot = get_bot()
+ except Exception:
+ bot = None
+
+ if bot: await plugin.send_to_master(msg)
+ else:
+ log.info("资源消耗正常")
+
+
+_STATUS_MSG = (
+ MessageBuilder("[Status Overview]")
+ .text("[CPU: {b_cpu}% of {p_cpu}%]")
+ .text("[Memory: {b_mem} of {p_mem}%]")
+ .text("[Disk usage: {p_disk}%]")
+ .text("")
+ .text("[Net sent: {inteSENT}MB]")
+ .text("[Net recv: {inteRECV}MB]")
+ .text("")
+ .text("[Run Duration]")
+ .text("[Bot: {bot_time}]")
+ .text("[Platform: {boot_time}]")
+ .text("{msg}")
+ .done()
+)
+
+
+def get_status() -> Tuple[str, bool]:
+ try:
+ cpu = psutil.cpu_percent(interval=1)
+ mem = psutil.virtual_memory().percent
+ disk = psutil.disk_usage("/").percent
+ inte_send = psutil.net_io_counters().bytes_sent / 1000000
+ inte_recv = psutil.net_io_counters().bytes_recv / 1000000
+
+ process = psutil.Process(os.getpid())
+ b_cpu = process.cpu_percent(interval=1)
+ b_mem = process.memory_percent(memtype="rss")
+
+ now = time.time()
+ boot = psutil.boot_time()
+ b = process.create_time()
+ boot_time = str(
+ datetime.utcfromtimestamp(now).replace(microsecond=0)
+ - datetime.utcfromtimestamp(boot).replace(microsecond=0)
+ )
+ bot_time = str(
+ datetime.utcfromtimestamp(now).replace(microsecond=0)
+ - datetime.utcfromtimestamp(b).replace(microsecond=0)
+ )
+ except Exception:
+ raise GetStatusError("Failed to get status.")
+
+ msg = "アトリは、高性能ですから!"
+ if cpu > 90:
+ msg = "咱感觉有些头晕..."
+ is_ok = False
+ if mem > 90:
+ msg = "咱感觉有点头晕并且有点累..."
+ is_ok = False
+ elif mem > 90:
+ msg = "咱感觉有点累..."
+ is_ok = False
+ elif disk > 90:
+ msg = "咱感觉身体要被塞满了..."
+ is_ok = False
+ else:
+ is_ok = True
+
+ msg0 = _STATUS_MSG.format(
+ p_cpu=cpu,
+ p_mem=mem,
+ p_disk=disk,
+ b_cpu=b_cpu,
+ b_mem="%.1f%%" % b_mem,
+ inteSENT=inte_send,
+ inteRECV=inte_recv,
+ bot_time=bot_time,
+ boot_time=boot_time,
+ msg=msg,
+ )
+
+ return unescape(msg0), is_ok \ No newline at end of file
diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py
deleted file mode 100644
index 08e099c..0000000
--- a/ATRI/plugins/status/__init__.py
+++ /dev/null
@@ -1,45 +0,0 @@
-from nonebot import get_bot
-
-from ATRI import conf
-from ATRI.log import log
-from ATRI.service import Service
-from ATRI.utils.apscheduler import scheduler
-
-from .data_source import Status
-
-
-plugin = Service("状态").document("检查自身状态")
-
-
-ping = plugin.on_command("/ping", "检测bot简单信息处理速度")
-
-
-async def _():
- await ping.finish(Status.ping())
-
-
-status = plugin.on_command("/status", "查看运行资源占用")
-
-
-async def _():
- msg, _ = Status.get_status()
- await status.finish(msg)
-
-
-info_msg = "アトリは高性能ですから!"
-
-
[email protected]_job("interval", name="状态检查", minutes=10, misfire_grace_time=15) # type: ignore
-async def _():
- log.info("开始检查资源消耗...")
- msg, stat = Status.get_status()
- if not stat:
- log.warning(msg)
-
- bot = get_bot()
- for super in conf.BotConfig.superusers:
- await bot.send_private_msg(user_id=super, message=msg)
-
- log.info("资源消耗正常")
diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py
deleted file mode 100644
index 10093c4..0000000
--- a/ATRI/plugins/status/data_source.py
+++ /dev/null
@@ -1,88 +0,0 @@
-import os
-import time
-import psutil
-from datetime import datetime
-
-from ATRI.message import MessageBuilder
-from ATRI.exceptions import GetStatusError
-
-
-_STATUS_MSG = (
- MessageBuilder("[Status Overview]")
- .text("[CPU: {b_cpu}% of {p_cpu}%]")
- .text("[Memory: {b_mem} of {p_mem}%]")
- .text("[Disk usage: {p_disk}%]")
- .text("")
- .text("[Net sent: {inteSENT}MB]")
- .text("[Net recv: {inteRECV}MB]")
- .text("")
- .text("[Run Duration]")
- .text("[Bot: {bot_time}]")
- .text("[Platform: {boot_time}]")
- .text("{msg}")
- .done()
-)
-
-
-class Status:
- @staticmethod
- def ping() -> str:
- return "I'm fine."
-
- @staticmethod
- def get_status() -> tuple:
- try:
- cpu = psutil.cpu_percent(interval=1)
- mem = psutil.virtual_memory().percent
- disk = psutil.disk_usage("/").percent
- inte_send = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
- inte_recv = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
-
- process = psutil.Process(os.getpid())
- b_cpu = process.cpu_percent(interval=1)
- b_mem = process.memory_percent(memtype="rss")
-
- now = time.time()
- boot = psutil.boot_time()
- b = process.create_time()
- boot_time = str(
- datetime.utcfromtimestamp(now).replace(microsecond=0)
- - datetime.utcfromtimestamp(boot).replace(microsecond=0)
- )
- bot_time = str(
- datetime.utcfromtimestamp(now).replace(microsecond=0)
- - datetime.utcfromtimestamp(b).replace(microsecond=0)
- )
- except Exception:
- raise GetStatusError("Failed to get status.")
-
- msg = "アトリは、高性能ですから!"
- if cpu > 90: # type: ignore
- msg = "咱感觉有些头晕..."
- is_ok = False
- if mem > 90:
- msg = "咱感觉有点头晕并且有点累..."
- is_ok = False
- elif mem > 90:
- msg = "咱感觉有点累..."
- is_ok = False
- elif disk > 90:
- msg = "咱感觉身体要被塞满了..."
- is_ok = False
- else:
- is_ok = True
-
- msg0 = _STATUS_MSG.format(
- p_cpu=cpu,
- p_mem=mem,
- p_disk=disk,
- b_cpu=b_cpu,
- b_mem="%.1f%%" % b_mem,
- inteSENT=inte_send,
- inteRECV=inte_recv,
- bot_time=bot_time,
- boot_time=boot_time,
- msg=msg,
- )
-
- return msg0, is_ok
diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py
index aeeb9db..a1a7f3b 100644
--- a/ATRI/utils/__init__.py
+++ b/ATRI/utils/__init__.py
@@ -5,11 +5,13 @@ import pytz
import yaml
import time
import string
+import asyncio
import aiofiles
from pathlib import Path
from random import sample
from datetime import datetime
from PIL import Image, ImageFile
+from collections import defaultdict
from aiofiles.threadpool.text import AsyncTextIOWrapper
@@ -220,3 +222,33 @@ class Translate:
output_str_list.append(self.text[i])
return "".join(output_str_list)
+
+
+class Limiter:
+ def __init__(self, max_count: int, down_time: float):
+ """冷却设置
+
+ Args:
+ max_count (int): 最大次数
+ down_time (float): 到达次数后的冷却时间
+ """
+ self.max_count = max_count
+ self.down_time = down_time
+ self.count = defaultdict(int)
+
+ def check(self, key: str) -> bool:
+ if self.count[key] >= self.max_count:
+ loop = asyncio.get_running_loop()
+ loop.call_later(self.down_time, self.reset)
+ return False
+
+ return True
+
+ def increase(self, key: str, times: int = 1) -> None:
+ self.count[key] += times
+
+ def reset(self, key: str) -> None:
+ self.count[key] = 0
+
+ def get_times(self, key: str) -> int:
+ return self.count[key]