From fef46d6fb71df12e35068cad5d83da5e070629aa Mon Sep 17 00:00:00 2001 From: Kyomotoi <0w0@imki.moe> Date: Fri, 2 Dec 2022 08:54:23 +0800 Subject: =?UTF-8?q?=F0=9F=8E=A8=20=E6=96=B0=E5=A2=9E=E5=86=B7=E5=8D=B4?= =?UTF-8?q?=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ATRI/utils/__init__.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'ATRI/utils') diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py index aeeb9db..a1a7f3b 100644 --- a/ATRI/utils/__init__.py +++ b/ATRI/utils/__init__.py @@ -5,11 +5,13 @@ import pytz import yaml import time import string +import asyncio import aiofiles from pathlib import Path from random import sample from datetime import datetime from PIL import Image, ImageFile +from collections import defaultdict from aiofiles.threadpool.text import AsyncTextIOWrapper @@ -220,3 +222,33 @@ class Translate: output_str_list.append(self.text[i]) return "".join(output_str_list) + + +class Limiter: + def __init__(self, max_count: int, down_time: float): + """冷却设置 + + Args: + max_count (int): 最大次数 + down_time (float): 到达次数后的冷却时间 + """ + self.max_count = max_count + self.down_time = down_time + self.count = defaultdict(int) + + def check(self, key: str) -> bool: + if self.count[key] >= self.max_count: + loop = asyncio.get_running_loop() + loop.call_later(self.down_time, self.reset) + return False + + return True + + def increase(self, key: str, times: int = 1) -> None: + self.count[key] += times + + def reset(self, key: str) -> None: + self.count[key] = 0 + + def get_times(self, key: str) -> int: + return self.count[key] -- cgit v1.2.3 From b5584e0c8536c818a8073b3b856a9ac8c11adfab Mon Sep 17 00:00:00 2001 From: Kyomotoi <0w0@imki.moe> Date: Fri, 2 Dec 2022 21:53:03 +0800 Subject: =?UTF-8?q?=F0=9F=8E=A8=20=E9=87=8D=E6=9E=84=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=B7=A5=E5=85=B7=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ATRI/utils/__init__.py | 67 ++++++++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 29 deletions(-) (limited to 'ATRI/utils') diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py index a1a7f3b..9ce3735 100644 --- a/ATRI/utils/__init__.py +++ b/ATRI/utils/__init__.py @@ -2,8 +2,6 @@ import os import re import json import pytz -import yaml -import time import string import asyncio import aiofiles @@ -15,42 +13,53 @@ from collections import defaultdict from aiofiles.threadpool.text import AsyncTextIOWrapper -def timestamp2datetimestr(timestamp: int) -> str: - format = "%Y-%m-%d %H:%M:%S" - tt = time.localtime(timestamp) - dt = time.strftime(format, tt) - return dt +def gen_random_str(k: int) -> str: + return str().join(sample(string.ascii_letters + string.digits, k)) -def timestamp2datetime(value: float) -> datetime: - tz = pytz.timezone("Asia/Shanghai") - return datetime.fromtimestamp(value, tz=tz) +class TimeDealer: + def __init__(self, timestamp: float): + """对时间进行处理 + Args: + timestamp (int): _description_ + """ + self.timestamp = timestamp -def now_time() -> float: - """获取当前时间的整数.""" - now_ = datetime.now() - hour = now_.hour - minute = now_.minute - now = hour + minute / 60 - return now + def to_str( + self, tz=pytz.timezone("Asia/Shanghai"), format: str = "%Y-%m-%d %H:%M:%S" + ) -> str: + """将时间戳转换为格式化形式 + Args: + tz: 时区. 默认: `pytz.timezone("Asia/Shanghai")`. + format: 时间格式. 默认: `"%Y-%m-%d %H:%M:%S"`. -def load_yml(file: Path, encoding="utf-8") -> dict: - """打开 yaml 格式的文件.""" - with open(file, "r", encoding=encoding) as f: - data = yaml.safe_load(f) - return data + Returns: + str: 格式化后的时间戳 + """ + return datetime.fromtimestamp(self.timestamp, tz).strftime(format) + def to_datetime(self, tz=pytz.timezone("Asia/Shanghai")) -> datetime: + """将时间戳转化成 datetime 类型 -def safe_string(value): - if isinstance(value, bytes): - return value.decode() - return str(value) + Args: + tz: 时区. 默认: `pytz.timezone("Asia/Shanghai")`. + Returns: + datetime: 转换后的 datetime 类型 + """ + return datetime.fromtimestamp(self.timestamp, tz) -def gen_random_str(k: int) -> str: - return str().join(sample(string.ascii_letters + string.digits, k)) + def int_now(self) -> float: + """将时间戳转换为一天中整数的时间. + e.g. 9:30 > 9.50 + + Returns: + float: 转换后的整数时间 + """ + time = datetime.fromtimestamp(self.timestamp) + return time.hour + time.minute / 60 class ListDealer: @@ -241,7 +250,7 @@ class Limiter: loop = asyncio.get_running_loop() loop.call_later(self.down_time, self.reset) return False - + return True def increase(self, key: str, times: int = 1) -> None: -- cgit v1.2.3