1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
import time
import pytz
import functools
from threading import RLock
from collections import defaultdict, deque
from datetime import datetime, timedelta
class LimitBucket:
"""
限制某功能运行中某段在一定速率下
"""
def __init__(self, capacity, fill_rate, is_lock: bool = False) -> None:
"""
:param capacity: 容量总数
:param fill_rate: 重新装填速率(单位:秒)
"""
self._capacity = float(capacity)
self._tokens = float(capacity)
self._fill_rate = float(fill_rate)
self._last_time = time()
self._is_lock = is_lock
self._lock = RLock()
def _get_cur_tokens(self):
if self._tokens < self._capacity:
now = time()
delta = self._fill_rate * (now - self._last_time)
self._tokens = min(self._capacity, self._tokens + delta)
self._last_time = now
return self._tokens
def get_cur_tokens(self):
if self._is_lock:
with self._lock:
return self._get_cur_tokens()
else:
return self._get_cur_tokens()
def _consume(self, tokens) -> bool:
if tokens <= self.get_cur_tokens():
self._tokens -= tokens
return True
return False
def consume(self, tokens):
if self._is_lock:
with self._lock:
return self._consume(tokens)
else:
return self._consume(tokens)
class RateLimiting:
"""
限制该功能全体速率
"""
def __init__(self, max_calls, period=1.0):
if period <= 0:
raise ValueError("Rate limiting period should be > 0")
if max_calls <= 0:
raise ValueError("Rate limiting number of calls should be > 0")
self.calls = deque()
self.period = period
self.max_calls = max_calls
def __call__(self, func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
with self:
return func(*args, **kwargs)
return wrapped
def __enter__(self):
if len(self.calls) >= self.max_calls:
time.sleep(self.period - self._timespan)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.calls.append(time.time())
while self._timespan >= self.period:
self.calls.popleft()
@property
def _timespan(self):
return self.calls[-1] - self.calls[0]
class FreqLimiter:
"""
Copy from: https://github.com/Ice-Cirno/HoshinoBot/blob/master/hoshino/util/__init__.py
"""
def __init__(self, default_cd_seconds):
self.next_time = defaultdict(float)
self.default_cd = default_cd_seconds
def check(self, key) -> bool:
return bool(time.time() >= self.next_time[key])
def start_cd(self, key, cd_time=0):
self.next_time[key] = time.time() + (
cd_time if cd_time > 0 else self.default_cd
)
def left_time(self, key) -> float:
return self.next_time[key] - time.time()
class DailyLimiter:
"""
Copy from: https://github.com/Ice-Cirno/HoshinoBot/blob/master/hoshino/util/__init__.py
"""
tz = pytz.timezone("Asia/Shanghai")
def __init__(self, max_num):
self.today = -1
self.count = defaultdict(int)
self.max = max_num
def check(self, key) -> bool:
now = datetime.now(self.tz)
day = (now - timedelta(hours=6)).day
if day != self.today:
self.today = day
self.count.clear()
return bool(self.count[key] < self.max)
def get_num(self, key):
return self.count[key]
def increase(self, key, num=1):
self.count[key] += num
def reset(self, key):
self.count[key] = 0
|