summaryrefslogtreecommitdiff
path: root/ATRI
diff options
context:
space:
mode:
authorLint Action <[email protected]>2021-07-31 08:17:46 +0000
committerLint Action <[email protected]>2021-07-31 08:17:46 +0000
commit36d26d1dc61c36b4601aaf75e148060c5bcb98a7 (patch)
tree3b466c4b0db4b9e160c5390a1d1c7ddb0322660f /ATRI
parent336eb9d9e98b0bee952c27a50820dbdb350bcc03 (diff)
downloadATRI-36d26d1dc61c36b4601aaf75e148060c5bcb98a7.tar.gz
ATRI-36d26d1dc61c36b4601aaf75e148060c5bcb98a7.tar.bz2
ATRI-36d26d1dc61c36b4601aaf75e148060c5bcb98a7.zip
:rotating_light: 自动进行代码格式化
Diffstat (limited to 'ATRI')
-rw-r--r--ATRI/exceptions.py4
-rw-r--r--ATRI/plugins/anime_search.py18
-rw-r--r--ATRI/plugins/chat/__init__.py37
-rw-r--r--ATRI/plugins/chat/data_source.py37
-rw-r--r--ATRI/plugins/code_runner/__init__.py7
-rw-r--r--ATRI/plugins/code_runner/data_source.py25
-rw-r--r--ATRI/plugins/console/__init__.py10
-rw-r--r--ATRI/plugins/console/data_source.py13
-rw-r--r--ATRI/plugins/console/drivers.py18
-rw-r--r--ATRI/plugins/console/view.py15
-rw-r--r--ATRI/plugins/curse.py13
-rw-r--r--ATRI/plugins/essential.py61
-rw-r--r--ATRI/plugins/funny/__init__.py16
-rw-r--r--ATRI/plugins/funny/data_source.py41
-rw-r--r--ATRI/plugins/help/__init__.py12
-rw-r--r--ATRI/plugins/help/data_source.py30
-rw-r--r--ATRI/plugins/manege/__init__.py108
-rw-r--r--ATRI/plugins/manege/data_source.py61
-rw-r--r--ATRI/plugins/repo.py17
-rw-r--r--ATRI/plugins/rich/__init__.py3
-rw-r--r--ATRI/plugins/rich/data_source.py18
-rw-r--r--ATRI/plugins/saucenao/__init__.py7
-rw-r--r--ATRI/plugins/saucenao/data_source.py25
-rw-r--r--ATRI/plugins/setu/__init__.py24
-rw-r--r--ATRI/plugins/setu/data_source.py29
-rw-r--r--ATRI/plugins/status/__init__.py2
-rw-r--r--ATRI/plugins/status/data_source.py11
-rw-r--r--ATRI/plugins/util/__init__.py20
-rw-r--r--ATRI/plugins/util/data_source.py12
-rw-r--r--ATRI/plugins/wife/__init__.py68
-rw-r--r--ATRI/plugins/wife/data_source.py25
-rw-r--r--ATRI/service.py143
32 files changed, 485 insertions, 445 deletions
diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py
index b49d3a0..b8d4764 100644
--- a/ATRI/exceptions.py
+++ b/ATRI/exceptions.py
@@ -34,7 +34,7 @@ def _save_error(prompt: str, content: str) -> str:
track_id=track_id,
prompt=prompt,
time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
- content=content
+ content=content,
)
path = ERROR_DIR / f"{track_id}.json"
with open(path, "w", encoding="utf-8") as r:
@@ -110,7 +110,7 @@ async def _track_error(
logger.debug(f"A bug has been cumming!!! Track ID: {track_id}")
msg = f"呜——出错了...追踪: {track_id}"
-
+
for superusers in BotSelfConfig.superusers:
try:
await bot.send_private_msg(user_id=superusers, message=msg)
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py
index de64501..80d8a94 100644
--- a/ATRI/plugins/anime_search.py
+++ b/ATRI/plugins/anime_search.py
@@ -24,10 +24,9 @@ __doc__ = """
class Anime(Service):
-
def __init__(self):
Service.__init__(self, "以图搜番", __doc__, rule=is_in_service("以图搜番"))
-
+
@staticmethod
async def _request(url: str) -> dict:
aim = URL + url
@@ -37,12 +36,12 @@ class Anime(Service):
raise RequestError("Request failed!")
result = await res.json()
return result
-
+
@classmethod
async def search(cls, url: str) -> str:
data = await cls._request(url)
data = data["docs"]
-
+
d = dict()
for i in range(len(data)):
if data[i]["title_chinese"] in d.keys():
@@ -61,7 +60,7 @@ class Anime(Service):
f"第{n}集",
f"{int(m)}分{int(s)}秒处",
]
-
+
result = sorted(d.items(), key=lambda x: x[1], reverse=True)
t = 0
msg0 = str()
@@ -74,7 +73,7 @@ class Anime(Service):
f"Name: {i[0]}\n"
f"Time: {i[1][1]} {i[1][2]}"
)
-
+
if len(result) == 2:
return msg0
else:
@@ -90,6 +89,7 @@ class Anime(Service):
anime_search = Anime().on_command("以图搜番", "发送一张图以搜索可能的番剧")
+
@anime_search.args_parser # type: ignore
async def _get_anime(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -101,16 +101,18 @@ async def _get_anime(bot: Bot, event: MessageEvent, state: T_State):
else:
state["anime"] = msg
+
@anime_search.handle()
async def _ready_sear(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
if not _anime_flmt.check(user_id):
await anime_search.finish(_anime_flmt_notice)
-
+
msg = str(event.message).strip()
if msg:
state["anime"] = msg
+
@anime_search.got("anime", "图呢?")
async def _deal_sear(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
@@ -118,7 +120,7 @@ async def _deal_sear(bot: Bot, event: MessageEvent, state: T_State):
img = re.findall(r"url=(.*?)]", msg)
if not img:
await anime_search.reject("请发送图片而不是其它东西!!")
-
+
a = await Anime().search(img[0])
result = f"> {MessageSegment.at(user_id)}\n" + a
_anime_flmt.start_cd(user_id)
diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py
index 79664cc..9f7f26c 100644
--- a/ATRI/plugins/chat/__init__.py
+++ b/ATRI/plugins/chat/__init__.py
@@ -15,20 +15,23 @@ _chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~
chat = Chat().on_message("闲聊(文爱")
+
@chat.handle()
async def _chat(bot: Bot, event: MessageEvent):
print(1)
user_id = event.get_user_id()
if not _chat_flmt.check(user_id):
await chat.finish(_chat_flmt_notice)
-
+
msg = str(event.message)
repo = await Chat().deal(msg, user_id)
_chat_flmt.start_cd(user_id)
await chat.finish(repo)
+
my_name_is = Chat().on_command("叫我", "更改闲聊(划掉 文爱)时的称呼", aliases={"我是"}, priority=1)
+
@my_name_is.args_parser # type: ignore
async def _get_name(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -40,26 +43,30 @@ async def _get_name(bot: Bot, event: MessageEvent, state: T_State):
else:
state["name"] = msg
+
@my_name_is.handle()
async def _name(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
if not _chat_flmt.check(user_id):
await my_name_is.finish(_chat_flmt_notice)
-
+
msg = str(event.message).strip()
if msg:
state["name"] = msg
+
@my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0")
async def _deal_name(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
new_name = state["name"]
- repo = choice([
- f"好~w 那咱以后就称呼你为{new_name}!",
- f"噢噢噢!原来你叫{new_name}阿~",
- f"好欸!{new_name}ちゃん~~~",
- "很不错的称呼呢w"
- ])
+ repo = choice(
+ [
+ f"好~w 那咱以后就称呼你为{new_name}!",
+ f"噢噢噢!原来你叫{new_name}阿~",
+ f"好欸!{new_name}ちゃん~~~",
+ "很不错的称呼呢w",
+ ]
+ )
Chat().name_is(user_id, new_name)
_chat_flmt.start_cd(user_id)
await my_name_is.finish(repo)
@@ -67,6 +74,7 @@ async def _deal_name(bot: Bot, event: MessageEvent, state: T_State):
say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1)
+
@say.args_parser # type: ignore
async def _get_say(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -78,29 +86,26 @@ async def _get_say(bot: Bot, event: MessageEvent, state: T_State):
else:
state["say"] = msg
+
@say.handle()
async def _ready_say(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
if not _chat_flmt.check(user_id):
await say.finish(_chat_flmt_notice)
-
+
msg = str(event.message)
if msg:
state["say"] = msg
+
@say.got("say")
async def _deal_say(bot: Bot, event: MessageEvent, state: T_State):
msg = state["say"]
check = CoolqCodeChecker(msg).check
if not check:
- repo = choice([
- "不要...",
- "这个咱不想复读!",
- "不可以",
- "不好!"
- ])
+ repo = choice(["不要...", "这个咱不想复读!", "不可以", "不好!"])
await say.finish(repo)
-
+
user_id = event.get_user_id()
_chat_flmt.start_cd(user_id)
await say.finish(msg)
diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/chat/data_source.py
index 15c495d..4c3578d 100644
--- a/ATRI/plugins/chat/data_source.py
+++ b/ATRI/plugins/chat/data_source.py
@@ -21,16 +21,17 @@ KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json"
class Chat(Service):
-
def __init__(self):
- Service.__init__(self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5)
-
+ Service.__init__(
+ self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5
+ )
+
@staticmethod
async def _request(url: str) -> dict:
res = await request.get(url)
data = await res.json()
return data
-
+
@classmethod
async def _generate_data(cls) -> None:
file_name = "kimo.json"
@@ -44,18 +45,18 @@ class Chat(Service):
log.info("生成完成")
except WriteError:
raise WriteError("Writing kimo words failed!")
-
+
@classmethod
async def _load_data(cls) -> dict:
file_name = "kimo.json"
path = CHAT_PATH / file_name
if not path.is_file():
await cls._generate_data()
-
+
with open(path, "r", encoding="utf-8") as r:
data = json.loads(r.read())
return data
-
+
@classmethod
async def update_data(cls) -> None:
log.info("更新闲聊词库ing...")
@@ -63,17 +64,17 @@ class Chat(Service):
path = CHAT_PATH / file_name
if not path.is_file():
await cls._generate_data()
-
+
updata_data = await cls._request(KIMO_URL)
data = json.loads(path.read_bytes())
for i in updata_data:
if i not in data:
data[i] = updata_data[i]
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
log.info("闲聊词库更新完成")
-
+
@staticmethod
def name_is(user_id: str, new_name: str):
file_name = "users.json"
@@ -82,7 +83,7 @@ class Chat(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = {}
-
+
data = json.loads(path.read_bytes())
data[user_id] = new_name
try:
@@ -90,7 +91,7 @@ class Chat(Service):
w.write(json.dumps(data, indent=4))
except ReadFileError:
raise ReadFileError("Update user name failed!")
-
+
@staticmethod
def load_name(user_id: str) -> str:
file_name = "users.json"
@@ -99,21 +100,21 @@ class Chat(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
return "你"
-
+
data = json.loads(path.read_bytes())
try:
result = data[user_id]
except BaseException:
result = "你"
return result
-
+
@classmethod
async def deal(cls, msg: str, user_id: str) -> str:
keywords = posseg.lcut(msg)
shuffle(keywords)
-
+
data = await cls._load_data()
-
+
repo = str()
for i in keywords:
a = i.word
@@ -125,14 +126,14 @@ class Chat(Service):
pass
if a in data:
repo = data.get(a, str())
-
+
if not repo:
temp_data = list(data)
shuffle(temp_data)
for i in temp_data:
if i in msg:
repo = data.get(i, str())
-
+
a = choice(repo) if type(repo) is list else repo
user_name = cls.load_name(user_id)
repo = a.replace("你", user_name)
diff --git a/ATRI/plugins/code_runner/__init__.py b/ATRI/plugins/code_runner/__init__.py
index dfe6162..3f5697b 100644
--- a/ATRI/plugins/code_runner/__init__.py
+++ b/ATRI/plugins/code_runner/__init__.py
@@ -13,15 +13,16 @@ _flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
code_runner = CodeRunner().on_command("/code", "在线运行一段代码,帮助:/code help")
+
@code_runner.handle()
async def _code_runner(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
if not _flmt.check(user_id):
await code_runner.finish(_flmt_notice)
-
+
msg = str(event.get_message())
args = msg.split("\n")
-
+
if not args:
content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!"
elif args[0] == "help":
@@ -30,6 +31,6 @@ async def _code_runner(bot: Bot, event: MessageEvent):
content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().list_supp_lang()
else:
content = MessageSegment.at(user_id) + await CodeRunner().runner(msg)
-
+
_flmt.start_cd(user_id)
await code_runner.finish(Message(content))
diff --git a/ATRI/plugins/code_runner/data_source.py b/ATRI/plugins/code_runner/data_source.py
index bbe0d2e..4338697 100644
--- a/ATRI/plugins/code_runner/data_source.py
+++ b/ATRI/plugins/code_runner/data_source.py
@@ -40,10 +40,9 @@ __doc__ = """
class CodeRunner(Service):
-
def __init__(self):
Service.__init__(self, "在线跑代码", __doc__, rule=is_in_service("在线跑代码"))
-
+
@staticmethod
def help() -> str:
return (
@@ -53,23 +52,23 @@ class CodeRunner(Service):
"/code python\n"
"print('hello world')"
)
-
+
@staticmethod
def list_supp_lang() -> str:
msg0 = "咱现在支持的语言如下:\n"
msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys()))
return msg0
-
+
@staticmethod
async def runner(msg: str):
args = msg.split("\n")
if not args:
return "请检查键入内容..."
-
+
lang = args[0].replace("\r", "")
if lang not in SUPPORTED_LANGUAGES:
return "该语言暂不支持..."
-
+
del args[0]
code = "\n".join(map(str, args))
url = RUN_API_URL_FORMAT.format(lang)
@@ -84,14 +83,14 @@ class CodeRunner(Service):
}
],
"stdin": "",
- "command": ""
+ "command": "",
}
-
+
try:
res = await request.post(url, json=js)
except RequestError:
raise RequestError("Request failed!")
-
+
payload = await res.json()
sent = False
for k in ["stdout", "stderr", "error"]:
@@ -100,12 +99,12 @@ class CodeRunner(Service):
lines, remained_lines = lines[:10], lines[10:]
out = "\n".join(lines)
out, remained_out = out[: 60 * 10], out[60 * 10 :]
-
+
if remained_lines or remained_out:
out += f"\n(太多了太多了...)"
-
+
if out:
return f"\n{k}:\n{out}"
-
+
if not sent:
- return "\n运行完成,没任何输出呢..." \ No newline at end of file
+ return "\n运行完成,没任何输出呢..."
diff --git a/ATRI/plugins/console/__init__.py b/ATRI/plugins/console/__init__.py
index 60cc305..d7240c9 100644
--- a/ATRI/plugins/console/__init__.py
+++ b/ATRI/plugins/console/__init__.py
@@ -32,6 +32,7 @@ error_freq = 0
record_msg = Console().on_message(block=False)
+
@record_msg.handle()
async def _record_msg(bot: Bot, event: Event):
global msg_freq
@@ -44,7 +45,7 @@ async def _record_is_error(
exception: Optional[Exception],
bot: Bot,
event: Event,
- state: T_State
+ state: T_State,
):
global health_freq, error_freq
if matcher.type != "message":
@@ -60,11 +61,7 @@ async def _record_data():
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data = {
"time": now_time,
- "freq_data": {
- "msg": msg_freq,
- "health": health_freq,
- "error": error_freq
- }
+ "freq_data": {"msg": msg_freq, "health": health_freq, "error": error_freq},
}
Console().record_data(data)
msg_freq, health_freq, error_freq = 0, 0, 0
@@ -73,4 +70,5 @@ async def _record_data():
def init():
register_route()
+
init()
diff --git a/ATRI/plugins/console/data_source.py b/ATRI/plugins/console/data_source.py
index 8fe6c04..990b1c9 100644
--- a/ATRI/plugins/console/data_source.py
+++ b/ATRI/plugins/console/data_source.py
@@ -14,25 +14,24 @@ is_connect = False
class Console(Service):
-
def __init__(self):
Service.__init__(self, "控制台")
-
+
@staticmethod
def record_data(data: dict) -> None:
now_time = datetime.now().strftime("%Y-%m-%d")
file_name = f"{now_time}-runtime.json"
path = CONSOLE_DIR / file_name
if not path.is_file():
- with open(path ,"w", encoding="utf-8") as w:
+ with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(list()))
temp_data = list()
-
+
temp_data: list = json.loads(path.read_bytes())
temp_data.append(data)
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(temp_data, indent=4))
-
+
@staticmethod
def load_data() -> list:
now_time = datetime.now().strftime("%Y-%m-%d")
@@ -42,7 +41,7 @@ class Console(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(list()))
return list()
-
+
data: list = json.loads(path.read_bytes())
return data
@@ -50,7 +49,7 @@ class Console(Service):
def store_connect_stat(i: bool):
global is_connect
is_connect = i
-
+
@staticmethod
def is_connect() -> bool:
return is_connect
diff --git a/ATRI/plugins/console/drivers.py b/ATRI/plugins/console/drivers.py
index e691a7c..55157cd 100644
--- a/ATRI/plugins/console/drivers.py
+++ b/ATRI/plugins/console/drivers.py
@@ -12,7 +12,7 @@ origins = [
"https://localhost.tiangolo.com",
"http://localhost",
"http://localhost:8080",
- "http://localhost:20000"
+ "http://localhost:20000",
]
@@ -23,15 +23,15 @@ def register_route():
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
- allow_headers=["*"]
+ allow_headers=["*"],
)
-
- static_path = str((Path(".") / "ATRI" / "plugins" / "console" / "atri-manege" / "dist").absolute())
-
+
+ static_path = str(
+ (Path(".") / "ATRI" / "plugins" / "console" / "atri-manege" / "dist").absolute()
+ )
+
app.get("/bot/is_connect")(handle_is_connect)
app.get("/bot/status")(handle_status)
app.get("/bot/dashboard_info")(handle_dashboard_info)
-
- app.mount("/",
- StaticFiles(directory=static_path, html=True),
- name="bot")
+
+ app.mount("/", StaticFiles(directory=static_path, html=True), name="bot")
diff --git a/ATRI/plugins/console/view.py b/ATRI/plugins/console/view.py
index dde3533..9de39b4 100644
--- a/ATRI/plugins/console/view.py
+++ b/ATRI/plugins/console/view.py
@@ -8,22 +8,13 @@ driver = ATRI.driver()
async def handle_is_connect():
data = Console().is_connect()
- return {
- "status": 200,
- "is_connect": data
- }
+ return {"status": 200, "is_connect": data}
async def handle_status():
- return {
- "status": 200,
- "message": info_msg
- }
+ return {"status": 200, "message": info_msg}
async def handle_dashboard_info():
data = Console().load_data()
- return {
- "status": 200,
- "data": data
- }
+ return {"status": 200, "data": data}
diff --git a/ATRI/plugins/curse.py b/ATRI/plugins/curse.py
index b2dbc05..edd6249 100644
--- a/ATRI/plugins/curse.py
+++ b/ATRI/plugins/curse.py
@@ -20,10 +20,9 @@ __doc__ = """
class Curse(Service):
-
def __init__(self):
Service.__init__(self, "口臭", __doc__, rule=is_in_service("口臭"))
-
+
@staticmethod
async def now() -> str:
res = await request.get(URL)
@@ -31,14 +30,17 @@ class Curse(Service):
return result
-normal_curse = Curse().on_command("口臭一下", "主命令,骂你一下", aliases={"骂我", "口臭"}, rule=to_bot())
+normal_curse = Curse().on_command(
+ "口臭一下", "主命令,骂你一下", aliases={"骂我", "口臭"}, rule=to_bot()
+)
+
@normal_curse.handle()
async def _deal_n_curse(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
if not _curse_flmt.check(user_id):
await normal_curse.finish(_curse_flmt_notice)
-
+
result = await Curse().now()
_curse_flmt.start_cd(user_id)
await normal_curse.finish(result)
@@ -46,12 +48,13 @@ async def _deal_n_curse(bot: Bot, event: MessageEvent):
super_curse = Curse().on_regex(r"[来求有](.*?)骂我吗?", "有求必应")
+
@super_curse.handle()
async def _deal_s_curse(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
if not _curse_flmt.check(user_id):
await normal_curse.finish(_curse_flmt_notice)
-
+
result = await Curse().now()
_curse_flmt.start_cd(user_id)
await normal_curse.finish(result)
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
index 2b367a5..9980518 100644
--- a/ATRI/plugins/essential.py
+++ b/ATRI/plugins/essential.py
@@ -62,9 +62,9 @@ async def _check_block(
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
data = json.loads(path.read_bytes())
-
+
user_id = event.get_user_id()
if user_id in data:
raise IgnoredException(f"Block user: {user_id}")
@@ -76,9 +76,9 @@ async def _check_block(
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
data = json.loads(path.read_bytes())
-
+
group_id = str(event.group_id)
if group_id in data:
raise IgnoredException(f"Block group: {user_id}")
@@ -104,13 +104,13 @@ __doc__ = """
class Essential(Service):
-
def __init__(self):
Service.__init__(self, "基础部件", __doc__)
friend_add_event = Essential().on_request("好友添加")
+
@friend_add_event.handle()
async def _friend_add(bot: Bot, event: FriendRequestEvent):
"""
@@ -130,22 +130,19 @@ async def _friend_add(bot: Bot, event: FriendRequestEvent):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
apply_code = event.flag
apply_comment = event.comment
user_id = event.get_user_id()
now_time = datetime.now()
-
+
data = json.loads(path.read_bytes())
data[apply_code] = FriendRequestInfo(
- user_id=user_id,
- comment=apply_comment,
- time=now_time,
- is_approve=False
+ user_id=user_id, comment=apply_comment, time=now_time, is_approve=False
)
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data.dict(), indent=4))
-
+
repo = (
"咱收到一条好友请求...\n"
f"请求人:{user_id}\n"
@@ -159,6 +156,7 @@ async def _friend_add(bot: Bot, event: FriendRequestEvent):
group_invite_event = Essential().on_request("邀请入群")
+
@group_invite_event.handle()
async def _group_invite(bot: Bot, event: GroupRequestEvent):
"""
@@ -178,22 +176,19 @@ async def _group_invite(bot: Bot, event: GroupRequestEvent):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
apply_code = event.flag
apply_comment = event.comment
user_id = event.get_user_id()
now_time = datetime.now()
-
+
data = json.loads(path.read_bytes())
data[apply_code] = GroupRequestInfo(
- user_id=user_id,
- comment=apply_comment,
- time=now_time,
- is_approve=False
+ user_id=user_id, comment=apply_comment, time=now_time, is_approve=False
)
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data.dict(), indent=4))
-
+
repo = (
"咱收到一条群聊邀请请求...\n"
f"请求人:{user_id}\n"
@@ -207,15 +202,14 @@ async def _group_invite(bot: Bot, event: GroupRequestEvent):
group_member_event = Essential().on_notice("群成员变动")
+
@group_member_event.handle()
async def _group_member_join(bot: Bot, event: GroupIncreaseNoticeEvent):
await asyncio.sleep(randint(1, 6))
- msg = (
- "好欸!事新人!\n"
- f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!"
- )
+ msg = "好欸!事新人!\n" f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!"
await group_member_event.finish(msg)
+
@group_member_event.handle()
async def _group_member_left(bot: Bot, event: GroupDecreaseNoticeEvent):
await asyncio.sleep(randint(1, 6))
@@ -224,6 +218,7 @@ async def _group_member_left(bot: Bot, event: GroupDecreaseNoticeEvent):
group_admin_event = Essential().on_notice("群管理变动")
+
@group_admin_event.handle()
async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent):
if not event.is_tome():
@@ -237,6 +232,7 @@ async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent):
group_ban_event = Essential().on_notice("群禁言变动")
+
@group_ban_event.handle()
async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent):
if not event.is_tome():
@@ -258,11 +254,12 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent):
recall_event = Essential().on_notice("撤回事件")
+
@recall_event.handle()
async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent):
if event.is_tome():
return
-
+
try:
repo = await bot.get_msg(message_id=event.message_id)
except BaseException:
@@ -275,12 +272,7 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent):
if not check:
repo = repo.replace("CQ", "QC")
- msg = (
- "主人,咱拿到了一条撤回信息!\n"
- f"{user}@[群:{group}]\n"
- "撤回了\n"
- f"{repo}"
- )
+ msg = "主人,咱拿到了一条撤回信息!\n" f"{user}@[群:{group}]\n" "撤回了\n" f"{repo}"
for superuser in BotSelfConfig.superusers:
await bot.send_private_msg(user_id=int(superuser), message=msg)
@@ -289,7 +281,7 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent):
async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent):
if event.is_tome():
return
-
+
try:
repo = await bot.get_msg(message_id=event.message_id)
except BaseException:
@@ -301,11 +293,6 @@ async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent):
if not check:
repo = repo.replace("CQ", "QC")
- msg = (
- "主人,咱拿到了一条撤回信息!\n"
- f"{user}@[私聊]"
- "撤回了\n"
- f"{repo}"
- )
+ msg = "主人,咱拿到了一条撤回信息!\n" f"{user}@[私聊]" "撤回了\n" f"{repo}"
for superuser in BotSelfConfig.superusers:
await bot.send_private_msg(user_id=int(superuser), message=msg)
diff --git a/ATRI/plugins/funny/__init__.py b/ATRI/plugins/funny/__init__.py
index 4779dda..ea60375 100644
--- a/ATRI/plugins/funny/__init__.py
+++ b/ATRI/plugins/funny/__init__.py
@@ -10,6 +10,7 @@ from .data_source import Funny
get_laugh = Funny().on_command("来句笑话", "隐晦的笑话...")
+
@get_laugh.handle()
async def _get_laugh(bot: Bot, event: MessageEvent):
user_name = event.sender.nickname or "该裙友"
@@ -18,6 +19,7 @@ async def _get_laugh(bot: Bot, event: MessageEvent):
me_re_you = Funny().on_regex(r"我", "我也不懂咋解释", block=False)
+
@me_re_you.handle()
async def _me_re_you(bot: Bot, event: MessageEvent):
if randint(0, 15) == 5:
@@ -27,13 +29,16 @@ async def _me_re_you(bot: Bot, event: MessageEvent):
await me_re_you.finish(content)
-fake_msg = Funny().on_command("/fakemsg", "伪造假转发内容,格式:qq-name-content\n可构造多条,使用空格隔开,仅限群聊")
+fake_msg = Funny().on_command(
+ "/fakemsg", "伪造假转发内容,格式:qq-name-content\n可构造多条,使用空格隔开,仅限群聊"
+)
_fake_daliy_max = DailyLimiter(3)
_fake_max_notice = "不能继续下去了!明早再来"
_fake_flmt = FreqLimiter(60)
_fake_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
+
@fake_msg.args_parser # type: ignore
async def _perp_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -45,6 +50,7 @@ async def _perp_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
else:
state["content"] = msg
+
@fake_msg.handle()
async def _ready_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
user_id = event.get_user_id()
@@ -52,11 +58,12 @@ async def _ready_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
await fake_msg.finish(_fake_max_notice)
if not _fake_flmt.check(user_id):
await fake_msg.finish(_fake_flmt_notice)
-
+
msg = str(event.message).strip()
if msg:
state["content"] = msg
+
@fake_msg.got("content", "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开")
async def _deal_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
content = state["content"]
@@ -64,7 +71,7 @@ async def _deal_fake(bot: Bot, event: GroupMessageEvent, state: T_State):
user_id = event.get_user_id()
node = Funny().fake_msg(content)
await bot.send_group_forward_msg(group_id=group_id, messages=node)
-
+
_fake_flmt.start_cd(user_id)
_fake_daliy_max.increase(user_id)
@@ -73,12 +80,13 @@ eat_what = Funny().on_regex(r"大?[今明后]天(.*?)吃[什啥]么?", "我来�
_eat_flmt = FreqLimiter(15)
+
@eat_what.handle()
async def _eat_what(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
if not _eat_flmt.check(user_id):
return
-
+
msg = str(event.get_message())
user_name = event.sender.nickname or "裙友"
eat = await Funny().eat_what(user_name, msg)
diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py
index f29b0be..89b1392 100644
--- a/ATRI/plugins/funny/data_source.py
+++ b/ATRI/plugins/funny/data_source.py
@@ -23,45 +23,46 @@ __doc__ = """
class Funny(Service):
-
def __init__(self):
Service.__init__(self, "乐", __doc__, rule=is_in_service("乐"))
-
+
@staticmethod
async def idk_laugh(name: str) -> str:
laugh_list = list()
-
+
file_name = "laugh.txt"
path = FUNNY_DIR / file_name
if not path.is_file():
logger.warning("未发现笑话相关数据,正在下载并保存...")
- url = "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/laugh.txt"
+ url = (
+ "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/laugh.txt"
+ )
res = await request.get(url)
context = await res.text() # type: ignore
with open(path, "w", encoding="utf-8") as w:
w.write(context)
logger.warning("完成")
-
+
with open(path, "r", encoding="utf-8") as r:
for line in r:
laugh_list.append(line.strip("\n"))
-
+
rd: str = choice(laugh_list)
result = rd.replace("%name", name)
return result
-
+
@staticmethod
def me_re_you(msg: str) -> tuple:
if "我" in msg and "[CQ" not in msg:
return msg.replace("我", "你"), True
else:
return msg, False
-
+
@staticmethod
def fake_msg(text: str) -> list:
arg = text.split(" ")
node = list()
-
+
for i in arg:
args = i.split("-")
qq = args[0]
@@ -70,7 +71,7 @@ class Funny(Service):
dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}}
node.append(dic)
return node
-
+
@staticmethod
async def eat_what(name: str, msg: str) -> str:
EAT_URL = "https://wtf.hiigara.net/api/run/"
@@ -79,7 +80,7 @@ class Funny(Service):
pattern_1 = r"(今|明|后|大后)天"
arg = re.findall(pattern_0, msg)[0]
day = re.match(pattern_1, msg).group(0) # type: ignore
-
+
if arg == "中午":
a = f"LdS4K6/{randint(0, 1145141919810)}"
url = EAT_URL + a
@@ -88,11 +89,11 @@ class Funny(Service):
data = await data.json()
except RequestError:
raise RequestError("Request failed!")
-
+
text = Translate(data["text"]).to_simple().replace("今天", day)
get_a = re.search(r"非常(.*?)的", text).group(0) # type: ignore
result = text.replace(get_a, "")
-
+
elif arg == "晚上":
a = f"KaTMS/{randint(0, 1145141919810)}"
url = EAT_URL + a
@@ -101,17 +102,13 @@ class Funny(Service):
data = await data.json()
except RequestError:
raise RequestError("Request failed!")
-
+
result = Translate(data["text"]).to_simple().replace("今天", day)
-
+
else:
rd = randint(1, 10)
if rd == 5:
- result = [
- "吃我吧 ❤",
- "(脸红)请...请享用咱吧......",
- "都可以哦~不能挑食呢~"
- ]
+ result = ["吃我吧 ❤", "(脸红)请...请享用咱吧......", "都可以哦~不能挑食呢~"]
return choice(result)
else:
a = f"JJr1hJ/{randint(0, 1145141919810)}"
@@ -121,9 +118,9 @@ class Funny(Service):
data = await data.json()
except RequestError:
raise RequestError("Request failed!")
-
+
text = Translate(data["text"]).to_simple().replace("今天", day)
get_a = re.match(r"(.*?)的智商", text).group(0) # type: ignore
result = text.replace(get_a, f"{name}的智商")
-
+
return result
diff --git a/ATRI/plugins/help/__init__.py b/ATRI/plugins/help/__init__.py
index 357a337..339ffa2 100644
--- a/ATRI/plugins/help/__init__.py
+++ b/ATRI/plugins/help/__init__.py
@@ -5,7 +5,10 @@ from ATRI.rule import to_bot
from .data_source import Helper
-main_help = Helper().on_command("菜单", "获取食用bot的方法", rule=to_bot(), aliases={"/help", "menu"})
+main_help = Helper().on_command(
+ "菜单", "获取食用bot的方法", rule=to_bot(), aliases={"/help", "menu"}
+)
+
@main_help.handle()
async def _main_help(bot: Bot, event: MessageEvent):
@@ -15,6 +18,7 @@ async def _main_help(bot: Bot, event: MessageEvent):
about_me = Helper().on_command("关于", "获取关于bot的信息", rule=to_bot(), aliases={"about"})
+
@about_me.handle()
async def _about_me(bot: Bot, event: MessageEvent):
repo = Helper().about()
@@ -23,6 +27,7 @@ async def _about_me(bot: Bot, event: MessageEvent):
service_list = Helper().on_command("服务列表", "查看所有可用服务", rule=to_bot(), aliases={"功能列表"})
+
@service_list.handle()
async def _service_list(bot: Bot, event: MessageEvent):
repo = Helper().service_list()
@@ -31,6 +36,7 @@ async def _service_list(bot: Bot, event: MessageEvent):
service_info = Helper().on_command("帮助", "获取服务详细帮助", rule=to_bot())
+
@service_info.handle()
async def _ready_service_info(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).split(" ")
@@ -39,10 +45,10 @@ async def _ready_service_info(bot: Bot, event: MessageEvent, state: T_State):
cmd = msg[1]
except BaseException:
cmd = str()
-
+
if not cmd:
repo = Helper().service_info(service)
await service_info.finish(repo)
-
+
repo = Helper().cmd_info(service, cmd)
await service_info.finish(repo)
diff --git a/ATRI/plugins/help/data_source.py b/ATRI/plugins/help/data_source.py
index f5104e5..638bee2 100644
--- a/ATRI/plugins/help/data_source.py
+++ b/ATRI/plugins/help/data_source.py
@@ -24,10 +24,9 @@ COMMAND_INFO_FORMAT = """
class Helper(Service):
-
def __init__(self):
Service.__init__(self, "帮助", "bot的食用指南~")
-
+
@staticmethod
def menu() -> str:
return (
@@ -37,7 +36,7 @@ class Helper(Service):
"帮助 [服务] -以查看对应服务帮助\n"
"Tip: 均需要at触发。菜单 以打开此页面"
)
-
+
@staticmethod
def about() -> str:
temp_list = list()
@@ -51,7 +50,7 @@ class Helper(Service):
"想进一步了解:\n"
"https://github.com/Kyomotoi/ATRI"
)
-
+
@staticmethod
def service_list() -> str:
files = os.listdir(SERVICES_DIR)
@@ -59,42 +58,42 @@ class Helper(Service):
for i in files:
service = i.replace(".json", "")
temp_list.append(service)
-
+
msg0 = "咱搭载了以下服务~\n"
services = " | ".join(map(str, temp_list))
msg0 = msg0 + services
repo = msg0 + "\n@ 帮助 [服务] -以查看对应服务帮助"
return repo
-
+
@staticmethod
def service_info(service: str) -> str:
try:
data = ServiceTools().load_service(service)
except ReadFileError:
return "请检查是否输入错误呢..."
-
+
service_name = data.get("service", "error")
service_docs = data.get("docs", "error")
service_enabled = data.get("enabled", True)
-
+
_service_cmd_list = list(data.get("cmd_list", {"error"}))
service_cmd_list = "\n".join(map(str, _service_cmd_list))
-
+
repo = SERVICE_INFO_FORMAT.format(
service=service_name,
docs=service_docs,
cmd_list=service_cmd_list,
- enabled=service_enabled
+ enabled=service_enabled,
)
return repo
-
+
@staticmethod
def cmd_info(service: str, cmd: str) -> str:
try:
data = ServiceTools().load_service(service)
except ReadFileError:
return "请检查是否输入错误..."
-
+
cmd_list: dict = data["cmd_list"]
cmd_info = cmd_list.get(cmd, dict())
if not cmd_info:
@@ -102,11 +101,8 @@ class Helper(Service):
cmd_type = cmd_info.get("type", "ignore")
docs = cmd_info.get("docs", "ignore")
aliases = cmd_info.get("aliases", "ignore")
-
+
repo = COMMAND_INFO_FORMAT.format(
- cmd=cmd,
- cmd_type=cmd_type,
- docs=docs,
- aliases=aliases
+ cmd=cmd, cmd_type=cmd_type, docs=docs, aliases=aliases
)
return repo
diff --git a/ATRI/plugins/manege/__init__.py b/ATRI/plugins/manege/__init__.py
index 5180362..7a1d45c 100644
--- a/ATRI/plugins/manege/__init__.py
+++ b/ATRI/plugins/manege/__init__.py
@@ -10,137 +10,152 @@ from .data_source import Manege
block_user = Manege().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER)
+
@block_user.handle()
async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["block_user"] = msg
+
@block_user.got("block_user", "哪位?GKD!")
async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State):
user_id = state["block_user"]
quit_list = ["算了", "罢了"]
if user_id in quit_list:
await block_user.finish("...看来有人逃过一劫呢")
-
+
is_ok = Manege().block_user(user_id)
if not is_ok:
await block_user.finish("kuso!封禁失败了...")
-
+
await block_user.finish(f"用户 {user_id} 危!")
unblock_user = Manege().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER)
+
@unblock_user.handle()
async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["unblock_user"] = msg
+
@unblock_user.got("unblock_user", "哪位?GKD!")
async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State):
user_id = state["unblock_user"]
quit_list = ["算了", "罢了"]
if user_id in quit_list:
await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了")
-
+
is_ok = Manege().unblock_user(user_id)
if not is_ok:
await unblock_user.finish("kuso!解封失败了...")
-
+
await unblock_user.finish(f"好欸!{user_id} 重获新生!")
block_group = Manege().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER)
+
@block_group.handle()
async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["block_group"] = msg
+
@block_group.got("block_group", "哪个群?GKD!")
async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State):
group_id = state["block_group"]
quit_list = ["算了", "罢了"]
if group_id in quit_list:
await block_group.finish("...看来有一群逃过一劫呢")
-
+
is_ok = Manege().block_group(group_id)
if not is_ok:
await block_group.finish("kuso!封禁失败了...")
-
+
await block_group.finish(f"群 {group_id} 危!")
unblock_group = Manege().on_command("解封群", "对目标群进行解封", permission=SUPERUSER)
+
@unblock_group.handle()
async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["unblock_group"] = msg
+
@unblock_group.got("unblock_group", "哪个群?GKD!")
async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State):
group_id = state["unblock_group"]
quit_list = ["算了", "罢了"]
if group_id in quit_list:
await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了")
-
+
is_ok = Manege().unblock_group(group_id)
if not is_ok:
await unblock_group.finish("kuso!解封失败了...")
-
+
await unblock_group.finish(f"好欸!群 {group_id} 重获新生!")
global_block_service = Manege().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER)
+
@global_block_service.handle()
async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["global_block_service"] = msg
+
@global_block_service.got("global_block_service", "阿...是哪个服务呢")
async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State):
block_service = state["global_block_service"]
quit_list = ["算了", "罢了"]
if block_service in quit_list:
await global_block_service.finish("好吧...")
-
+
is_ok = Manege().control_global_service(block_service, False)
if not is_ok:
await global_block_service.finish("kuso!禁用失败了...")
-
+
await global_block_service.finish(f"服务 {block_service} 已被禁用")
global_unblock_service = Manege().on_command("全局启用", "全局启用某服务", permission=SUPERUSER)
+
@global_unblock_service.handle()
async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["global_unblock_service"] = msg
+
@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢")
async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State):
unblock_service = state["global_unblock_service"]
quit_list = ["算了", "罢了"]
if unblock_service in quit_list:
await global_unblock_service.finish("好吧...")
-
+
is_ok = Manege().control_global_service(unblock_service, True)
if not is_ok:
await global_unblock_service.finish("kuso!启用服务失败了...")
-
+
await global_unblock_service.finish(f"服务 {unblock_service} 已启用")
-user_block_service = Manege().on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER)
+user_block_service = Manege().on_regex(
+ r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER
+)
+
@user_block_service.handle()
async def _user_block_service(bot: Bot, event: MessageEvent):
@@ -149,15 +164,17 @@ async def _user_block_service(bot: Bot, event: MessageEvent):
reg = re.findall(pattern, msg)
aim_user = reg[0]
aim_service = reg[1]
-
+
is_ok = Manege().control_user_service(aim_service, aim_user, False)
if not is_ok:
await user_block_service.finish("禁用失败...请检查服务名是否正确")
await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}")
-
-user_unblock_service = Manege().on_regex(r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER)
+user_unblock_service = Manege().on_regex(
+ r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER
+)
+
@user_unblock_service.handle()
async def _user_unblock_service(bot: Bot, event: MessageEvent):
@@ -166,21 +183,27 @@ async def _user_unblock_service(bot: Bot, event: MessageEvent):
reg = re.findall(pattern, msg)
aim_user = reg[0]
aim_service = reg[1]
-
+
is_ok = Manege().control_user_service(aim_service, aim_user, True)
if not is_ok:
await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中")
await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}")
-group_block_service = Manege().on_command("禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN)
+group_block_service = Manege().on_command(
+ "禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
@group_block_service.handle()
-async def _ready_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _ready_group_block_service(
+ bot: Bot, event: GroupMessageEvent, state: T_State
+):
msg = str(event.message).strip()
if msg:
state["group_block_service"] = msg
+
@group_block_service.got("group_block_service", "阿...是哪个服务呢")
async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State):
aim_service = state["group_block_service"]
@@ -188,29 +211,37 @@ async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T
quit_list = ["算了", "罢了"]
if aim_service in quit_list:
await group_block_service.finish("好吧...")
-
+
is_ok = Manege().control_group_service(aim_service, group_id, False)
if not is_ok:
await group_block_service.finish("禁用失败...请检查服务名是否输入正确")
await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}")
-group_unblock_service = Manege().on_command("启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN)
+group_unblock_service = Manege().on_command(
+ "启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
@group_unblock_service.handle()
-async def _ready_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _ready_group_unblock_service(
+ bot: Bot, event: GroupMessageEvent, state: T_State
+):
msg = str(event.message).strip()
if msg:
state["group_unblock_service"] = msg
+
@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢")
-async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State):
+async def _deal_group_unblock_service(
+ bot: Bot, event: GroupMessageEvent, state: T_State
+):
aim_service = state["group_unblock_service"]
group_id = str(event.group_id)
quit_list = ["算了", "罢了"]
if aim_service in quit_list:
await group_unblock_service.finish("好吧...")
-
+
is_ok = Manege().control_group_service(aim_service, group_id, True)
if not is_ok:
await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中")
@@ -219,6 +250,7 @@ async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state:
get_friend_add_list = Manege().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER)
+
@get_friend_add_list.handle()
async def _get_friend_add_list(bot: Bot, event: MessageEvent):
data = Manege().load_friend_apply_list()
@@ -233,23 +265,25 @@ async def _get_friend_add_list(bot: Bot, event: MessageEvent):
msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list))
msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定"
await get_friend_add_list.finish(msg1)
-
+
approve_friend_add = Manege().on_command("同意好友", "同意好友申请", permission=SUPERUSER)
+
@approve_friend_add.handle()
async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["approve_friend_add"]
+
@approve_friend_add.got("approve_friend_add", "申请码GKD!")
async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["approve_friend_add"]
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await approve_friend_add.finish("好吧...")
-
+
try:
await bot.set_friend_add_request(flag=apply_code, approve=True)
except BaseException:
@@ -262,19 +296,21 @@ async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State
refuse_friend_add = Manege().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER)
+
@refuse_friend_add.handle()
async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["refuse_friend_add"]
+
@refuse_friend_add.got("refuse_friend_add", "申请码GKD!")
async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["refuse_friend_add"]
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await refuse_friend_add.finish("好吧...")
-
+
try:
await bot.set_friend_add_request(flag=apply_code, approve=False)
except BaseException:
@@ -287,6 +323,7 @@ async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State)
get_group_invite_list = Manege().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER)
+
@get_group_invite_list.handle()
async def _get_group_invite_list(bot: Bot, event: MessageEvent):
data = Manege().load_invite_apply_list()
@@ -305,21 +342,25 @@ async def _get_group_invite_list(bot: Bot, event: MessageEvent):
approve_group_invite = Manege().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER)
+
@approve_group_invite.handle()
async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["approve_group_invite"]
+
@approve_group_invite.got("approve_group_invite", "申请码GKD!")
async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["approve_group_invite"]
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await approve_group_invite.finish("好吧...")
-
+
try:
- await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=True)
+ await bot.set_group_add_request(
+ flag=apply_code, sub_type="invite", approve=True
+ )
except BaseException:
await approve_group_invite.finish("同意失败...尝试下手动?")
data = Manege().load_invite_apply_list()
@@ -330,21 +371,25 @@ async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_Sta
refuse_group_invite = Manege().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER)
+
@refuse_group_invite.handle()
async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["refuse_group_invite"]
+
@refuse_group_invite.got("refuse_group_invite", "申请码GKD!")
async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State):
apply_code = state["refuse_group_invite"]
quit_list = ["算了", "罢了"]
if apply_code in quit_list:
await refuse_group_invite.finish("好吧...")
-
+
try:
- await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=False)
+ await bot.set_group_add_request(
+ flag=apply_code, sub_type="invite", approve=False
+ )
except BaseException:
await refuse_group_invite.finish("拒绝失败...尝试下手动?")
data = Manege().load_invite_apply_list()
@@ -355,6 +400,7 @@ async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_Stat
track_error = Manege().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"})
+
@track_error.handle()
async def _track_error(bot: Bot, event: MessageEvent):
track_id = str(event.message).strip()
diff --git a/ATRI/plugins/manege/data_source.py b/ATRI/plugins/manege/data_source.py
index 1eb2048..e936ef5 100644
--- a/ATRI/plugins/manege/data_source.py
+++ b/ATRI/plugins/manege/data_source.py
@@ -29,7 +29,6 @@ __doc__ = """
class Manege(Service):
-
def __init__(self):
Service.__init__(self, "管理", __doc__, True)
@@ -49,11 +48,10 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
data = json.loads(path.read_bytes())
return data
-
-
+
@staticmethod
def _save_block_user_list(data: dict) -> None:
file_name = "block_user.json"
@@ -62,10 +60,10 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
-
+
@staticmethod
def _load_block_group_list() -> dict:
"""
@@ -82,10 +80,10 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
data = json.loads(path.read_bytes())
return data
-
+
@staticmethod
def _save_block_group_list(data: dict) -> None:
file_name = "block_group.json"
@@ -94,55 +92,51 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = dict()
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
-
+
@classmethod
def block_user(cls, user_id: str) -> bool:
data = cls._load_block_user_list()
now_time = datetime.now()
- data[user_id] = {
- "time": now_time
- }
+ data[user_id] = {"time": now_time}
try:
cls._save_block_user_list(data)
return True
except BaseException:
return False
-
+
@classmethod
def unblock_user(cls, user_id: str) -> bool:
data: dict = cls._load_block_user_list()
if user_id not in data:
return False
-
+
try:
data.pop(user_id)
cls._save_block_user_list(data)
return True
except BaseException:
return False
-
+
@classmethod
def block_group(cls, group_id: str) -> bool:
data = cls._load_block_group_list()
now_time = datetime.now()
- data[group_id] = {
- "time": now_time
- }
+ data[group_id] = {"time": now_time}
try:
cls._save_block_group_list(data)
return True
except BaseException:
return False
-
+
@classmethod
def unblock_group(cls, group_id: str) -> bool:
data: dict = cls._load_block_group_list()
if group_id not in data:
return False
-
+
try:
data.pop(group_id)
cls._save_block_group_list(data)
@@ -162,7 +156,7 @@ class Manege(Service):
data["enabled"] = is_enabled
ServiceTools().save_service(data, service)
return True
-
+
@staticmethod
def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool:
"""
@@ -173,7 +167,7 @@ class Manege(Service):
except BaseException:
return False
temp_list: list = data.get("disable_user", list())
-
+
if is_enabled:
try:
temp_list.remove(user_id)
@@ -184,7 +178,7 @@ class Manege(Service):
data["disable_user"] = temp_list
ServiceTools().save_service(data, service)
return True
-
+
@staticmethod
def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool:
"""
@@ -196,7 +190,7 @@ class Manege(Service):
except BaseException:
return False
temp_list: list = data.get("disable_group", list())
-
+
if is_enabled:
try:
temp_list.remove(group_id)
@@ -219,7 +213,7 @@ class Manege(Service):
data = json.loads(path.read_bytes())
return data
-
+
@staticmethod
def save_friend_apply_list(data: dict) -> None:
file_name = "friend_add.json"
@@ -230,7 +224,7 @@ class Manege(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
-
+
@staticmethod
def load_invite_apply_list() -> dict:
file_name = "group_invite.json"
@@ -242,7 +236,7 @@ class Manege(Service):
data = json.loads(path.read_bytes())
return data
-
+
@staticmethod
def save_invite_apply_list(data: dict) -> None:
file_name = "group_invite.json"
@@ -260,22 +254,19 @@ class Manege(Service):
data = load_error(track_id)
except ReadFileError:
return "请检查ID是否正确..."
-
+
prompt = data.get("prompt", "ignore")
time = data.get("time", "ignore")
content = data.get("content", "ignore")
-
+
msg0 = TRACK_BACK_FORMAT.format(
- track_id=track_id,
- prompt=prompt,
- time=time,
- content=content
+ track_id=track_id, prompt=prompt, time=time, content=content
)
f_data = FormData()
f_data.add_field("poster", "ATRI running log")
f_data.add_field("syntax", "text")
f_data.add_field("expiration", "day")
f_data.add_field("content", msg0)
-
+
repo = f"详细请移步此处~\n{await UbuntuPaste(f_data).paste()}"
return repo
diff --git a/ATRI/plugins/repo.py b/ATRI/plugins/repo.py
index 7939fc6..9c52610 100644
--- a/ATRI/plugins/repo.py
+++ b/ATRI/plugins/repo.py
@@ -21,13 +21,13 @@ REPO_FORMAT = """
class Repo(Service):
-
def __init__(self):
Service.__init__(self, "反馈", "向维护者发送消息")
-
+
repo = Repo().on_command("来杯红茶", "向维护者发送消息", aliases={"反馈", "报告"})
+
@repo.args_parser # type: ignore
async def _get_repo(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -39,6 +39,7 @@ async def _get_repo(bot: Bot, event: MessageEvent, state: T_State):
else:
state["repo"] = msg
+
@repo.handle()
async def _ready_repo(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
@@ -46,26 +47,24 @@ async def _ready_repo(bot: Bot, event: MessageEvent, state: T_State):
await repo.finish(_repo_flmt_notice)
if not _repo_dlmt.check(user_id):
await repo.finish(_repo_dlmt_notice)
-
+
msg = str(event.message).strip()
if msg:
state["repo"] = msg
+
@repo.got("repo", "需要反馈的内容呢?~")
async def _deal_repo(bot: Bot, event: MessageEvent, state: T_State):
msg = state["repo"]
user_id = event.get_user_id()
- repo_0 = REPO_FORMAT.format(
- user=user_id,
- msg=msg
- )
-
+ repo_0 = REPO_FORMAT.format(user=user_id, msg=msg)
+
for superuser in BotSelfConfig.superusers:
try:
await bot.send_private_msg(user_id=superuser, message=repo_0)
except BaseException:
await repo.finish("发送失败了呢...")
-
+
_repo_flmt.start_cd(user_id)
_repo_dlmt.increase(user_id)
await repo.finish("吾辈的心愿已由咱转告维护者!")
diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py
index 87a2b02..4c5b624 100644
--- a/ATRI/plugins/rich/__init__.py
+++ b/ATRI/plugins/rich/__init__.py
@@ -9,12 +9,13 @@ _rich_flmt = FreqLimiter(2)
bili_rich = Rich().on_message("小程序爪巴", block=False)
+
@bili_rich.handle()
async def _fk_bili(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
if not _rich_flmt.check(user_id):
return
-
+
msg = str(event.message)
try:
result, is_ok = await Rich().fk_bili(msg)
diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py
index a59c922..96f77cf 100644
--- a/ATRI/plugins/rich/data_source.py
+++ b/ATRI/plugins/rich/data_source.py
@@ -25,10 +25,9 @@ __doc__ = """
class Rich(Service):
-
def __init__(self):
Service.__init__(self, "小程序处理", __doc__, rule=is_in_service("小程序处理"))
-
+
@staticmethod
def _bv_dec(x) -> str:
r = 0
@@ -36,7 +35,7 @@ class Rich(Service):
r += tr[x[s[i]]] * 58 ** i
result = "av" + str((r - add) ^ xor)
return result
-
+
@staticmethod
def _bv_enc(x) -> str:
x = (x ^ xor) + add
@@ -44,7 +43,7 @@ class Rich(Service):
for i in range(6):
r[s[i]] = table[x // 58 ** i % 58]
return "".join(r)
-
+
@classmethod
async def fk_bili(cls, text: str) -> tuple:
"""
@@ -53,7 +52,7 @@ class Rich(Service):
"""
msg = text.replace("\\", "")
bv = False
-
+
if "qqdocurl" not in msg:
if "av" in msg:
av = re.findall(r"(av\d+)", msg)
@@ -71,7 +70,7 @@ class Rich(Service):
if not bv_url:
return "Get value (bv url) failed!", False
bv_url = bv_url[3]
-
+
try:
res = await request.get(bv_url)
except RequestError:
@@ -80,7 +79,7 @@ class Rich(Service):
if not bv:
return "Get value (bv) failed!", False
av = cls._bv_dec(bv[0])
-
+
if not bv:
if "av" in msg:
av = re.findall(r"(av\d+)", msg)
@@ -89,7 +88,7 @@ class Rich(Service):
av = av[0].replace("av", "")
else:
return "Not found av", False
-
+
url = URL + av
try:
res = await request.get(url)
@@ -97,11 +96,10 @@ class Rich(Service):
return "Request failed!", False
res_data = await res.json()
data = res_data["data"]
-
+
result = (
f"{data['bvid']} INFO:\n"
f"Title: {data['title']}\n"
f"Link: {data['short_link']}"
)
return result, True
- \ No newline at end of file
diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py
index 468b001..b55f18c 100644
--- a/ATRI/plugins/saucenao/__init__.py
+++ b/ATRI/plugins/saucenao/__init__.py
@@ -16,6 +16,7 @@ _search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇�
saucenao = SaouceNao().on_command("以图搜图", "透过一张图搜索可能的来源")
+
@saucenao.args_parser # type: ignore
async def _get_img(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -27,16 +28,18 @@ async def _get_img(bot: Bot, event: MessageEvent, state: T_State):
else:
state["img"] = msg
+
@saucenao.handle()
async def _ready_search(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
if not _search_flmt.check(user_id):
await saucenao.finish(_search_flmt_notice)
-
+
msg = str(event.message).strip()
if msg:
state["img"] = msg
+
@saucenao.got("img", "图呢?")
async def _deal_search(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
@@ -44,7 +47,7 @@ async def _deal_search(bot: Bot, event: MessageEvent, state: T_State):
img = findall(r"url=(.*?)]", msg)
if not img:
await saucenao.reject("请发送图片而不是其他东西!!")
-
+
a = SaouceNao(SauceNAO.key)
result = f"> {MessageSegment.at(user_id)}" + await a.search(img[0])
_search_flmt.start_cd(user_id)
diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py
index d948f91..6e72c98 100644
--- a/ATRI/plugins/saucenao/data_source.py
+++ b/ATRI/plugins/saucenao/data_source.py
@@ -16,8 +16,15 @@ __doc__ = """
class SaouceNao(Service):
-
- def __init__(self, api_key: str = None, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5):
+ def __init__(
+ self,
+ api_key: str = None,
+ output_type=2,
+ testmode=1,
+ dbmaski=32768,
+ db=5,
+ numres=5,
+ ):
Service.__init__(self, "以图搜图", __doc__, rule=is_in_service("以图搜图"))
params = dict()
@@ -28,31 +35,31 @@ class SaouceNao(Service):
params["db"] = db
params["numres"] = numres
self.params = params
-
+
async def _request(self, url: str):
self.params["url"] = url
-
+
try:
res = await request.post(URL, params=self.params)
except RequestError:
raise RequestError("Request failed!")
data = await res.json()
return data
-
+
async def search(self, url: str) -> str:
data = await self._request(url)
res = data["results"]
-
+
result = list()
for i in range(len(res)):
data = res[i]
-
+
_result = dict()
_result["similarity"] = data["header"]["similarity"]
_result["index_name"] = data["header"]["index_name"]
_result["url"] = choice(data["data"].get("ext_urls", ["None"]))
result.append(_result)
-
+
msg0 = str()
for i in result:
msg0 += (
@@ -61,7 +68,7 @@ class SaouceNao(Service):
f"Name: {i['index_name']}\n"
f"URL: {i['url'].replace('https://', '')}"
)
-
+
if len(res) <= 3:
return msg0
else:
diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py
index 7eb2c61..d67d75b 100644
--- a/ATRI/plugins/setu/__init__.py
+++ b/ATRI/plugins/setu/__init__.py
@@ -12,7 +12,10 @@ _setu_flmt = FreqLimiter(120)
_setu_dlmt = DailyLimiter(5)
-random_setu = Setu().on_command("来张涩图", "来张随机涩图,冷却2分钟,每天限5张", aliases={"涩图来", "来点涩图", "来份涩图"})
+random_setu = Setu().on_command(
+ "来张涩图", "来张随机涩图,冷却2分钟,每天限5张", aliases={"涩图来", "来点涩图", "来份涩图"}
+)
+
@random_setu.handle()
async def _random_setu(bot: Bot, event: MessageEvent):
@@ -21,12 +24,9 @@ async def _random_setu(bot: Bot, event: MessageEvent):
await random_setu.finish()
if not _setu_dlmt.check(user_id):
await random_setu.finish()
-
+
setu, title, p_id = await Setu().random_setu()
- repo = (
- f"Title: {title}\n"
- f"Pid: {p_id}"
- )
+ repo = f"Title: {title}\n" f"Pid: {p_id}"
await bot.send(event, repo)
msg_1 = await bot.send(event, Message(setu))
event_id = msg_1["message_id"]
@@ -38,6 +38,7 @@ async def _random_setu(bot: Bot, event: MessageEvent):
tag_setu = Setu().on_regex(r"来[张点丶份](.*?)的[涩色🐍]图", "根据提供的tag查找涩图")
+
@tag_setu.handle()
async def _tag_setu(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
@@ -45,18 +46,15 @@ async def _tag_setu(bot: Bot, event: MessageEvent):
await random_setu.finish()
if not _setu_dlmt.check(user_id):
await random_setu.finish()
-
+
msg = str(event.message).strip()
pattern = r"来[张点丶份](.*?)的[涩色🐍]图"
tag = re.findall(pattern, msg)[0]
setu, title, p_id, is_ok = await Setu().tag_setu(tag)
if not is_ok:
await tag_setu.finish(f"没有 {tag} 的涩图呢...")
- repo_0 = (
- f"Title: {title}\n"
- f"Pid: {p_id}"
- )
-
+ repo_0 = f"Title: {title}\n" f"Pid: {p_id}"
+
await bot.send(event, repo_0)
msg_1 = await bot.send(event, Message(setu))
event_id = msg_1["message_id"]
@@ -77,6 +75,6 @@ async def _scheduler_setu(bot):
message_id = msg_0["message_id"]
await asyncio.sleep(60)
await bot.delete_msg(message_id=message_id)
-
+
except BaseException:
pass
diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py
index 22fa69f..47f76b6 100644
--- a/ATRI/plugins/setu/data_source.py
+++ b/ATRI/plugins/setu/data_source.py
@@ -1,4 +1,5 @@
import base64
+
# from pathlib import Path
from random import choice
from nonebot.adapters.cqhttp import MessageSegment
@@ -16,10 +17,9 @@ SCHEDULER_FORMAT = """
class Setu(Service):
-
def __init__(self):
Service.__init__(self, "涩图", "hso!", rule=is_in_service("涩图"))
-
+
@staticmethod
async def random_setu() -> tuple:
"""
@@ -28,14 +28,14 @@ class Setu(Service):
res = await request.get(LOLICON_URL)
data: dict = await res.json()
temp_data: dict = data.get("data", list())[0]
-
- title = temp_data.get("title", "木陰のねこ")
+
+ title = temp_data.get("title", "木陰のねこ")
p_id = temp_data.get("pid", 88124144)
url = temp_data["urls"].get("original", "ignore")
-
+
setu = MessageSegment.image(url, timeout=114514)
return setu, title, p_id
-
+
@staticmethod
async def tag_setu(tag: str) -> tuple:
"""
@@ -44,18 +44,18 @@ class Setu(Service):
url = LOLICON_URL + f"?tag={tag}"
res = await request.get(url)
data: dict = await res.json()
-
+
temp_data: dict = data.get("data", list())[0]
if not temp_data:
is_ok = False
is_ok = True
-
- title = temp_data.get("title", "木陰のねこ")
+
+ title = temp_data.get("title", "木陰のねこ")
p_id = temp_data.get("pid", 88124144)
url = temp_data["urls"].get("original", "ignore")
setu = MessageSegment.image(url, timeout=114514)
return setu, title, p_id, is_ok
-
+
@staticmethod
async def scheduler() -> str:
"""
@@ -67,13 +67,10 @@ class Setu(Service):
res = await request.get(LOLICON_URL)
data: dict = await res.json()
temp_data: dict = data.get("data", list())[0]
-
+
tag = choice(temp_data.get("tags", ["女孩子"]))
-
+
url = temp_data["urls"].get("original", "ignore")
setu = MessageSegment.image(url, timeout=114514)
- repo = SCHEDULER_FORMAT.format(
- tag=tag,
- setu=setu
- )
+ repo = SCHEDULER_FORMAT.format(tag=tag, setu=setu)
return repo
diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py
index 0567bb4..2746953 100644
--- a/ATRI/plugins/status/__init__.py
+++ b/ATRI/plugins/status/__init__.py
@@ -6,6 +6,7 @@ from .data_source import IsSurvive
ping = IsSurvive().on_command("/ping", "检测bot简单信息处理速度")
+
@ping.handle()
async def _ping(bot: Bot, event: MessageEvent):
await ping.finish(IsSurvive.ping())
@@ -13,6 +14,7 @@ async def _ping(bot: Bot, event: MessageEvent):
status = IsSurvive().on_command("/status", "查看运行资源占用")
+
@status.handle()
async def _status(bot: Bot, event: MessageEvent):
msg, _ = IsSurvive.get_status()
diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py
index c353313..189f568 100644
--- a/ATRI/plugins/status/data_source.py
+++ b/ATRI/plugins/status/data_source.py
@@ -14,14 +14,13 @@ __doc__ = """
class IsSurvive(Service):
-
def __init__(self):
Service.__init__(self, "状态", __doc__, rule=is_in_service("状态"))
@staticmethod
def ping() -> str:
return "I'm fine."
-
+
@staticmethod
def get_status():
log.info("开始检查资源消耗...")
@@ -31,7 +30,7 @@ class IsSurvive(Service):
disk = psutil.disk_usage("/").percent
inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore
inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore
-
+
now = time.time()
boot = psutil.boot_time()
up_time = str(
@@ -40,7 +39,7 @@ class IsSurvive(Service):
)
except GetStatusError:
raise GetStatusError("Failed to get status.")
-
+
msg = "アトリは、高性能ですから!"
if cpu > 90: # type: ignore
msg = "咱感觉有些头晕..."
@@ -57,7 +56,7 @@ class IsSurvive(Service):
else:
log.info("资源占用正常")
is_ok = True
-
+
msg0 = (
"Self status:\n"
f"* CPU: {cpu}%\n"
@@ -67,5 +66,5 @@ class IsSurvive(Service):
f"* netRECV: {inteRECV}MB\n"
f"* Runtime: {up_time}\n"
) + msg
-
+
return msg0, is_ok
diff --git a/ATRI/plugins/util/__init__.py b/ATRI/plugins/util/__init__.py
index 1e31bff..541ca1b 100644
--- a/ATRI/plugins/util/__init__.py
+++ b/ATRI/plugins/util/__init__.py
@@ -10,6 +10,7 @@ from .data_source import Encrypt, Utils, Yinglish
roll = Utils().on_command("/roll", "骰子~用法:1d10 或 2d10+2d10+more")
+
@roll.args_parser # type: ignore
async def _get_roll(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -21,26 +22,29 @@ async def _get_roll(bot: Bot, event: MessageEvent, state: T_State):
else:
state["roll"] = msg
+
@roll.handle()
async def _ready_roll(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["roll"] = msg
+
@roll.got("roll", "参数呢?!格式:1d10 或 2d10+2d10+more")
async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State):
text = state["roll"]
match = re.match(r"^([\dd+\s]+?)$", text)
-
+
if not match:
await roll.finish("阿——!参数不对!格式:1d10 或 2d10+2d10+more")
-
+
msg = Utils().roll_dice(text)
await roll.finish(msg)
encrypt_en = Utils().on_command("加密", "我们之前的秘密❤")
+
@encrypt_en.args_parser # type: ignore
async def _get_encr_en_text(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -52,12 +56,14 @@ async def _get_encr_en_text(bot: Bot, event: MessageEvent, state: T_State):
else:
state["encr_en_text"] = msg
+
@encrypt_en.handle()
async def _ready_en(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["encr_en_text"] = msg
+
@encrypt_en.got("encr_en_text", "内容呢?!")
async def _deal_en(bot: Bot, event: MessageEvent, state: T_State):
text = state["encr_en_text"]
@@ -71,6 +77,7 @@ async def _deal_en(bot: Bot, event: MessageEvent, state: T_State):
encrypt_de = Utils().on_command("解密", "解开我们的秘密❤")
+
@encrypt_de.args_parser # type: ignore
async def _get_encr_de_text(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -82,12 +89,14 @@ async def _get_encr_de_text(bot: Bot, event: MessageEvent, state: T_State):
else:
state["encr_de_text"] = msg
+
@encrypt_de.handle()
async def _ready_de(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
if msg:
state["encr_de_text"] = msg
+
@encrypt_de.got("encr_de_text", "内容呢?!")
async def _deal_de(bot: Bot, event: MessageEvent, state: T_State):
text = state["encr_de_text"]
@@ -101,6 +110,7 @@ sepi = Utils().on_command("涩批一下", "将正常的句子涩一涩~")
_sepi_flmt = FreqLimiter(3)
_sepi_flmt_notice = ["涩批爬", "✌🥵✌"]
+
@sepi.args_parser # type: ignore
async def _get_sepi(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
@@ -112,23 +122,25 @@ async def _get_sepi(bot: Bot, event: MessageEvent, state: T_State):
else:
state["sepi_text"] = msg
+
@sepi.handle()
async def _ready_sepi(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
if not _sepi_flmt.check(user_id):
await sepi.finish(choice(_sepi_flmt_notice))
-
+
msg = str(event.message).strip()
if msg:
state["sepi_text"] = msg
+
@sepi.got("sepi_text", "内容呢?!")
async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
msg = state["sepi_text"]
if len(msg) < 4:
await sepi.finish("这么短?涩不起来!")
-
+
result = Yinglish.deal(msg, random())
_sepi_flmt.start_cd(user_id)
await sepi.finish(result)
diff --git a/ATRI/plugins/util/data_source.py b/ATRI/plugins/util/data_source.py
index a6afdf3..7b36ebe 100644
--- a/ATRI/plugins/util/data_source.py
+++ b/ATRI/plugins/util/data_source.py
@@ -14,10 +14,9 @@ __doc__ = """
class Utils(Service):
-
def __init__(self):
Service.__init__(self, "小工具", __doc__, rule=is_in_service("小工具"))
-
+
@staticmethod
def roll_dice(par: str) -> str:
result = 0
@@ -49,13 +48,14 @@ class Utils(Service):
result = f"{par}=({proc})={result}"
return result
-
+
+
class Encrypt(Utils):
"""
某nb改的(逃
总之就是非常nb
"""
-
+
cr = "ĀāĂ㥹ÀÁÂÃÄÅ"
cc = "ŢţŤťŦŧṪṫṬṭṮṯṰṱ"
cn = "ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr"
@@ -186,9 +186,9 @@ class Encrypt(Utils):
return self._decodeBytes(s).decode(encoding)
except UnicodeDecodeError:
raise ValueError("Decoding failed")
-
+
+
class Yinglish(Utils):
-
@staticmethod
def _to_ying(x, y, ying) -> str:
if random() > ying:
diff --git a/ATRI/plugins/wife/__init__.py b/ATRI/plugins/wife/__init__.py
index a113b8a..7773057 100644
--- a/ATRI/plugins/wife/__init__.py
+++ b/ATRI/plugins/wife/__init__.py
@@ -14,14 +14,17 @@ from .data_source import Wife
_tietie_flmt = FreqLimiter(600)
-tietie_superuser = Wife().on_message("只与维护者贴贴w", rule=Rule(), permission=SUPERUSER, block=False)
+tietie_superuser = Wife().on_message(
+ "只与维护者贴贴w", rule=Rule(), permission=SUPERUSER, block=False
+)
+
@tietie_superuser.handle()
async def _tietie_superuser(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
if not _tietie_flmt.check(user_id):
await tietie_superuser.finish()
-
+
result = Wife().to_superuser(user_id)
_tietie_flmt.start_cd(user_id)
await tietie_superuser.finish(Message(result))
@@ -29,6 +32,7 @@ async def _tietie_superuser(bot: Bot, event: MessageEvent):
_wife_flmt = FreqLimiter(10)
+
class MarryInfo(BaseModel):
name: str
sex: str
@@ -37,46 +41,42 @@ class MarryInfo(BaseModel):
get_wife = Wife().on_command("抽老婆", "随机选择一位幸运裙友成为老婆!")
+
@get_wife.handle()
async def _get_wife(bot: Bot, event: GroupMessageEvent):
user_id = event.get_user_id()
if not _wife_flmt.check(user_id):
await get_wife.finish()
-
+
group_id = event.group_id
- req_user_info: dict = await bot.get_group_member_info(group_id=group_id, user_id=int(user_id))
+ req_user_info: dict = await bot.get_group_member_info(
+ group_id=group_id, user_id=int(user_id)
+ )
req_user_card = req_user_info["card"]
req_user_sex = req_user_info["sex"]
is_nick = "老公" if req_user_sex == "male" else "老婆"
-
- repo_0 = (
- "现在咱将随机抽取一位幸运裙友\n"
- f"成为{req_user_card}的{is_nick}!"
- )
+
+ repo_0 = "现在咱将随机抽取一位幸运裙友\n" f"成为{req_user_card}的{is_nick}!"
await bot.send(event, repo_0)
await asyncio.sleep(10)
-
+
prep_list = await bot.get_group_member_list(group_id=group_id)
prep_list = [prep.get("user_id", 114514) for prep in prep_list]
-
+
lucky_user = choice(prep_list)
- lucky_user_info: dict = await bot.get_group_member_info(group_id=group_id, user_id=lucky_user)
+ lucky_user_info: dict = await bot.get_group_member_info(
+ group_id=group_id, user_id=lucky_user
+ )
lucky_user_card = lucky_user_info["card"]
lucky_user_sex = lucky_user_info["sex"]
-
+
data = Wife().load_marry_list()
- data[lucky_user] = MarryInfo(
- name=req_user_card,
- sex=is_nick,
- wife=user_id
- ).dict()
+ data[lucky_user] = MarryInfo(name=req_user_card, sex=is_nick, wife=user_id).dict()
data[user_id] = MarryInfo(
- name=lucky_user_card,
- sex=lucky_user_sex,
- wife=lucky_user
+ name=lucky_user_card, sex=lucky_user_sex, wife=lucky_user
).dict()
Wife().save_marry_list(data)
-
+
repo_1 = f"好欸!{lucky_user_card}成为了{req_user_card}的{is_nick}"
_wife_flmt.start_cd(user_id)
await get_wife.finish(repo_1)
@@ -85,18 +85,21 @@ async def _get_wife(bot: Bot, event: GroupMessageEvent):
_call_wife_flmt = FreqLimiter(60)
-call_wife = Wife().on_command("老婆", "呼唤老婆/老公!", aliases={"老公", "老婆!", "老公!"}, permission=USER("114514"))
+call_wife = Wife().on_command(
+ "老婆", "呼唤老婆/老公!", aliases={"老公", "老婆!", "老公!"}, permission=USER("114514")
+)
+
@call_wife.handle()
async def _call_wife(bot: Bot, event: MessageEvent):
user_id = event.get_user_id()
if not _wife_flmt.check(user_id):
await call_wife.finish()
-
+
data = Wife().load_marry_list()
if user_id not in data:
return
-
+
wife = data[user_id].get("wife", "ignore")
sex = data[user_id].get("sex", "male")
is_nick = "老公" if sex == "male" else "老婆"
@@ -107,30 +110,33 @@ async def _call_wife(bot: Bot, event: MessageEvent):
discard_wife = Wife().on_command("我要离婚", "离婚!")
+
@discard_wife.handle()
async def _discard_wife(bot: Bot, event: GroupMessageEvent):
user_id = event.get_user_id()
if not _wife_flmt.check(user_id):
await discard_wife.finish()
-
+
await bot.send(event, "真的吗...(y/是)")
msg = str(event.message).strip()
rd_list = ["y", "Y", "是", "确认", "对"]
if msg not in rd_list:
await discard_wife.finish("")
-
+
group_id = event.group_id
- group_info: dict = await bot.get_group_member_info(group_id=group_id, user_id=int(user_id))
+ group_info: dict = await bot.get_group_member_info(
+ group_id=group_id, user_id=int(user_id)
+ )
user_card = group_info.get("card", "老婆")
-
+
data = Wife().load_marry_list()
if user_id not in data:
await discard_wife.finish("你还没对象呐...")
-
+
discard_user_info = data[user_id]
discard_user_card = discard_user_info["name"]
discard_user_id = discard_user_info["wife"]
-
+
data.pop(user_id)
data.pop(discard_user_id)
Wife().save_marry_list(data)
diff --git a/ATRI/plugins/wife/data_source.py b/ATRI/plugins/wife/data_source.py
index 38e8be4..670f862 100644
--- a/ATRI/plugins/wife/data_source.py
+++ b/ATRI/plugins/wife/data_source.py
@@ -18,10 +18,9 @@ __doc__ = """
class Wife(Service):
-
def __init__(self):
Service.__init__(self, "老婆", __doc__, rule=is_in_service("老婆"))
-
+
def to_superuser(self, user_id: str):
"""
全自动贴贴机,限制只有超级管理员才能贴贴
@@ -30,10 +29,18 @@ class Wife(Service):
[
"mua!",
"贴贴!",
- MessageSegment.image(file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife0.jpg"),
- MessageSegment.image(file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife1.jpg"),
- MessageSegment.image(file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife2.jpg"),
- MessageSegment.image(file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife3.jpg")
+ MessageSegment.image(
+ file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife0.jpg"
+ ),
+ MessageSegment.image(
+ file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife1.jpg"
+ ),
+ MessageSegment.image(
+ file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife2.jpg"
+ ),
+ MessageSegment.image(
+ file="https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/wife3.jpg"
+ ),
]
)
result = MessageSegment.at(user_id) + content
@@ -49,10 +56,10 @@ class Wife(Service):
if not path.is_file():
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
-
+
data = json.loads(path.read_bytes())
return data
-
+
@staticmethod
def save_marry_list(data: dict) -> None:
"""
@@ -63,6 +70,6 @@ class Wife(Service):
if not path.is_file():
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
diff --git a/ATRI/service.py b/ATRI/service.py
index cc3243b..0501fca 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -58,17 +58,19 @@ class Service:
"disable_group": []
}
"""
-
- def __init__(self,
- service: str,
- docs: str = None,
- only_admin: bool = False,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- handlers: Optional[List[T_Handler]] = None,
- temp: bool = False,
- priority: int = 1,
- state: Optional[T_State] = None):
+
+ def __init__(
+ self,
+ service: str,
+ docs: str = None,
+ only_admin: bool = False,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ handlers: Optional[List[T_Handler]] = None,
+ temp: bool = False,
+ priority: int = 1,
+ state: Optional[T_State] = None,
+ ):
self.service = service
self.docs = docs
self.only_admin = only_admin
@@ -84,7 +86,7 @@ class Service:
service = self.service
if not docs:
docs = self.docs or str()
-
+
path = SERVICES_DIR / f"{service}.json"
data = ServiceInfo(
service=service,
@@ -93,33 +95,33 @@ class Service:
enabled=True,
only_admin=self.only_admin,
disable_user=list(),
- disable_group=list()
+ disable_group=list(),
)
try:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data.dict(), indent=4))
except WriteError:
raise WriteError("Write service info failed!")
-
+
def save_service(self, service_data: dict, service: str = None) -> None:
if not service:
service = self.service
-
+
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
self._generate_service_config()
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(service_data, indent=4))
-
+
def load_service(self, service: str = None) -> dict:
if not service:
service = self.service
-
+
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
self._generate_service_config()
-
+
try:
data = json.loads(path.read_bytes())
except ReadFileError:
@@ -128,29 +130,31 @@ class Service:
self._generate_service_config()
data = json.loads(path.read_bytes())
return data
-
+
def _save_cmds(self, cmds: dict) -> None:
data = self.load_service(self.service)
temp_data: dict = data["cmd_list"]
temp_data.update(cmds)
self.save_service(data)
-
+
def _load_cmds(self) -> dict:
path = SERVICES_DIR / f"{self.service}.json"
if not path.is_file():
self._generate_service_config()
-
+
data = json.loads(path.read_bytes())
return data["cmd_list"]
- def on_message(self,
- docs: str = None,
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- permission: Optional[Permission] = None,
- handlers: Optional[List[T_Handler]] = None,
- block: bool = True,
- priority: int = None,
- state: Optional[T_State] = None) -> Type[Matcher]:
+ def on_message(
+ self,
+ docs: str = None,
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ permission: Optional[Permission] = None,
+ handlers: Optional[List[T_Handler]] = None,
+ block: bool = True,
+ priority: int = None,
+ state: Optional[T_State] = None,
+ ) -> Type[Matcher]:
if not rule:
rule = self.rule
if not permission:
@@ -161,7 +165,7 @@ class Service:
priority = self.priority
if not state:
state = self.state
-
+
if docs:
a = 0
cmd_list = self._load_cmds()
@@ -171,14 +175,10 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] = CommandInfo(
- type=_type,
- docs=docs,
- aliases=list()
- ).dict()
+
+ cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
self._save_cmds(cmd_list)
-
+
matcher = Matcher.new(
"message",
Rule() & rule,
@@ -200,14 +200,10 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] = CommandInfo(
- type=_type,
- docs=docs,
- aliases=list()
- ).dict()
+
+ cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
self._save_cmds(cmd_list)
-
+
matcher = Matcher.new(
"notice",
Rule() & self.rule,
@@ -229,14 +225,10 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] =CommandInfo(
- type=_type,
- docs=docs,
- aliases=list()
- ).dict()
+
+ cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
self._save_cmds(cmd_list)
-
+
matcher = Matcher.new(
"request",
Rule() & self.rule,
@@ -263,14 +255,10 @@ class Service:
rule = self.rule
if not aliases:
aliases = set()
-
- cmd_list[cmd] = CommandInfo(
- type=_type,
- docs=docs,
- aliases=list(aliases)
- ).dict()
+
+ cmd_list[cmd] = CommandInfo(type=_type, docs=docs, aliases=list(aliases)).dict()
self._save_cmds(cmd_list)
-
+
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
@@ -284,7 +272,9 @@ class Service:
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
- return self.on_message(rule=command(*commands) & rule, handlers=handlers, **kwargs)
+ return self.on_message(
+ rule=command(*commands) & rule, handlers=handlers, **kwargs
+ )
def on_keyword(
self,
@@ -295,7 +285,7 @@ class Service:
) -> Type[Matcher]:
if not rule:
rule = self.rule
-
+
a = 0
cmd_list = self._load_cmds()
while True:
@@ -304,14 +294,10 @@ class Service:
break
else:
a += 1
-
- cmd_list[_type] = CommandInfo(
- type=_type,
- docs=docs,
- aliases=list()
- ).dict()
+
+ cmd_list[_type] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
self._save_cmds(cmd_list)
-
+
return self.on_message(rule=keyword(*keywords) & rule, **kwargs)
def on_regex(
@@ -325,20 +311,15 @@ class Service:
_type = "regex"
if not rule:
rule = self.rule
-
+
cmd_list = self._load_cmds()
- cmd_list[pattern] = CommandInfo(
- type=_type,
- docs=docs,
- aliases=list()
- ).dict()
+ cmd_list[pattern] = CommandInfo(type=_type, docs=docs, aliases=list()).dict()
self._save_cmds(cmd_list)
-
+
return self.on_message(rule=regex(pattern, flags) & rule, **kwargs)
class ServiceTools(object):
-
@staticmethod
def save_service(service_data: dict, service: str) -> None:
path = SERVICES_DIR / f"{service}.json"
@@ -348,10 +329,10 @@ class ServiceTools(object):
"Please delete all file in data/service/services.\n"
"Next reboot bot."
)
-
+
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(service_data, indent=4))
-
+
@staticmethod
def load_service(service: str) -> dict:
path = SERVICES_DIR / f"{service}.json"
@@ -361,7 +342,7 @@ class ServiceTools(object):
"Please delete all file in data/service/services.\n"
"Next reboot bot."
)
-
+
with open(path, "r", encoding="utf-8") as r:
data = json.loads(r.read())
return data
@@ -369,11 +350,11 @@ class ServiceTools(object):
@classmethod
def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool:
data = cls.load_service(service)
-
+
auth_global = data.get("enabled", True)
auth_user = data.get("disable_user", list())
auth_group = data.get("disable_group", list())
-
+
if user_id:
if user_id in auth_user:
return False
@@ -383,7 +364,7 @@ class ServiceTools(object):
return False
else:
return True
-
+
if not auth_global:
return False
else: