diff options
author | Kyomotoi <[email protected]> | 2021-07-08 22:09:00 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2021-07-08 22:09:00 +0800 |
commit | be2747e4d4b820ca0f1f988d3b77a628da26fe7b (patch) | |
tree | e1a59dd79ecd973a7d704568dcdc018f1f1b651a | |
parent | a4e1b9d1581d756ef79ad063d1c0bd6b2fd13c1d (diff) | |
download | ATRI-be2747e4d4b820ca0f1f988d3b77a628da26fe7b.tar.gz ATRI-be2747e4d4b820ca0f1f988d3b77a628da26fe7b.tar.bz2 ATRI-be2747e4d4b820ca0f1f988d3b77a628da26fe7b.zip |
🔖♻️🐛🔧🔥📝 更新版本:YHN-001-A03
🔖 更新版本至:YHN-001-A03
✨ 新增插件:
- 涩图
- 闲聊(文爱
♻️ 重构:
- Service
- 所有插件
🐛 修复部分小bug
🔧 暂时移除部分设置
🔥 删除:
- 插件:nsfw、wife。日后加回
- 插件 essential 中部分内容
📝 更新README
67 files changed, 3294 insertions, 3350 deletions
diff --git a/ATRI/__init__.py b/ATRI/__init__.py index 838f804..7ea6457 100644 --- a/ATRI/__init__.py +++ b/ATRI/__init__.py @@ -7,7 +7,7 @@ from .config import RUNTIME_CONFIG from .log import logger -__version__ = "YHN-001-A02" +__version__ = "YHN-001-A03" def asgi(): diff --git a/ATRI/config.py b/ATRI/config.py index ca86e90..2f2c394 100644 --- a/ATRI/config.py +++ b/ATRI/config.py @@ -2,7 +2,7 @@ from pathlib import Path from datetime import timedelta from ipaddress import IPv4Address -from .utils.yaml import load_yml +from .utils import * CONFIG_PATH = Path(".") / "config.yml" @@ -53,12 +53,6 @@ class SauceNAO: key: str = config.get("key", "") -class Setu: - config: dict = config["Setu"] - - key: str = config.get("key", "") - - RUNTIME_CONFIG = { "host": BotSelfConfig.host, "port": BotSelfConfig.port, diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py index 987a8d8..bbf6c51 100644 --- a/ATRI/exceptions.py +++ b/ATRI/exceptions.py @@ -6,6 +6,7 @@ from pathlib import Path from random import sample from typing import Optional from traceback import format_exc +from pydantic.main import BaseModel from nonebot.adapters.cqhttp import Bot, Event from nonebot.matcher import Matcher @@ -13,23 +14,31 @@ from nonebot.typing import T_State from nonebot.message import run_postprocessor from .log import logger +from .config import BotSelfConfig ERROR_DIR = Path(".") / "ATRI" / "data" / "errors" os.makedirs(ERROR_DIR, exist_ok=True) +class ErrorInfo(BaseModel): + track_id: str + prompt: str + time: str + content: str + + def _save_error(prompt: str, content: str) -> str: track_id = "".join(sample(string.ascii_letters + string.digits, 8)) - data = { - "track_id": track_id, - "prompt": prompt, - "time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), - "content": content, - } + data = ErrorInfo( + track_id=track_id, + prompt=prompt, + time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + content=content + ) path = ERROR_DIR / f"{track_id}.json" with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) + r.write(json.dumps(data.dict(), indent=4)) return track_id @@ -99,12 +108,11 @@ async def _track_error( prompt = "Unknown ERROR->" + Error.__class__.__name__ track_id = _save_error(prompt, format_exc()) - logger.debug(f"A bug has been cumming, trace ID: {track_id}") - msg = ( - "[WARNING] 这是一个错误... ;w;\n" - f"追踪ID: {track_id}\n" - f"触发原因: {prompt}\n" - "键入 来杯红茶 以联系维护者" - ) - - await bot.send(event, msg) + logger.debug(f"A bug has been cumming!!! Track ID: {track_id}") + msg = f"呜——出错了...追踪: {track_id}" + + for superusers in BotSelfConfig.superusers: + try: + await bot.send_private_msg(user_id=superusers, message=msg) + except BaseBotException: + return diff --git a/ATRI/log.py b/ATRI/log.py index 5c4e304..3766c4c 100644 --- a/ATRI/log.py +++ b/ATRI/log.py @@ -1,6 +1,7 @@ import sys from pathlib import Path from datetime import datetime +from typing import TYPE_CHECKING from nonebot.log import logger diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py index 6bb11e8..de64501 100644 --- a/ATRI/plugins/anime_search.py +++ b/ATRI/plugins/anime_search.py @@ -1,65 +1,48 @@ import re -import json from aiohttp import FormData +from random import choice -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp.message import Message, MessageSegment -from ATRI.service import Service as sv +from ATRI.service import Service from ATRI.rule import is_in_service +from ATRI.utils import request, UbuntuPaste, Translate +from ATRI.utils.limit import FreqLimiter from ATRI.exceptions import RequestError -from ATRI.utils.request import get_bytes -from ATRI.utils.translate import to_simple_string -from ATRI.utils.ub_paste import paste URL = "https://trace.moe/api/search?url=" +_anime_flmt = FreqLimiter(10) +_anime_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) __doc__ = """ -以图搜番 -权限组:所有人 -用法: - 以图搜番 (pic) +通过一张图片搜索你需要的番!据说里*也可以 """ -anime_search = sv.on_command(cmd="以图搜番", docs=__doc__, rule=is_in_service("以图搜番")) - - -@anime_search.args_parser # type: ignore -async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) - quit_list = ["算了", "罢了", "不搜了", "取消"] - if msg in quit_list: - await anime_search.finish("好吧...") - if not msg: - await anime_search.reject("图呢?") - else: - state["pic_anime"] = msg - - -@anime_search.handle() -async def _anime_search(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["pic_anime"] = msg - - -@anime_search.got("pic_anime", prompt="图呢?") -async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["pic_anime"] - img = re.findall(r"url=(.*?)]", msg) - if not img: - await anime_search.reject("请发送图片而不是其它东西!!") - - try: - req = await get_bytes(URL + img[0]) - except RequestError: - raise RequestError("Request failed!") - data = json.loads(req)["docs"] - try: +class Anime(Service): + + def __init__(self): + Service.__init__(self, "以图搜番", __doc__, rule=is_in_service("以图搜番")) + + @staticmethod + async def _request(url: str) -> dict: + aim = URL + url + try: + res = await request.get(aim) + except RequestError: + raise RequestError("Request failed!") + result = await res.json() + return result + + @classmethod + async def search(cls, url: str) -> str: + data = await cls._request(url) + data = data["docs"] + d = dict() for i in range(len(data)): if data[i]["title_chinese"] in d.keys(): @@ -73,38 +56,70 @@ async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: else: n = data[i]["episode"] - d[to_simple_string(data[i]["title_chinese"])] = [ + d[Translate(data[i]["title_chinese"]).to_simple()] = [ data[i]["similarity"], f"第{n}集", f"{int(m)}分{int(s)}秒处", ] - except Exception as err: - raise Exception(f"Invalid data.\n{err}") - - result = sorted(d.items(), key=lambda x: x[1], reverse=True) - - t = 0 - - msg0 = f"> {event.sender.nickname}" - for i in result: - t += 1 - s = "%.2f%%" % (i[1][0] * 100) - msg0 = msg0 + ( - "\n——————————\n" - f"({t}) Similarity: {s}\n" - f"Name: {i[0]}\n" - f"Time: {i[1][1]} {i[1][2]}" - ) - - if len(result) == 2: - await anime_search.finish(Message(msg0)) + + result = sorted(d.items(), key=lambda x: x[1], reverse=True) + t = 0 + msg0 = str() + for i in result: + t += 1 + s = "%.2f%%" % (i[1][0] * 100) + msg0 = msg0 + ( + "\n——————————\n" + f"({t}) Similarity: {s}\n" + f"Name: {i[0]}\n" + f"Time: {i[1][1]} {i[1][2]}" + ) + + if len(result) == 2: + return msg0 + else: + data = FormData() + data.add_field("poster", "ATRI running log") + data.add_field("syntax", "text") + data.add_field("expiration", "day") + data.add_field("content", msg0) + + repo = f"详细请移步此处~\n{await UbuntuPaste(data).paste()}" + return repo + + +anime_search = Anime().on_command("以图搜番", "发送一张图以搜索可能的番剧") + +@anime_search.args_parser # type: ignore +async def _get_anime(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了", "不搜了", "取消"] + if msg in quit_list: + await anime_search.finish("好吧...") + if not msg: + await anime_search.reject("图呢?") else: - data = FormData() - data.add_field("poster", "ATRI running log") - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", msg0) - - repo = f"> {event.sender.nickname}\n" - repo = repo + f"详细请移步此处~\n{await paste(data)}" - await anime_search.finish(repo) + state["anime"] = msg + +@anime_search.handle() +async def _ready_sear(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _anime_flmt.check(user_id): + await anime_search.finish(_anime_flmt_notice) + + msg = str(event.message).strip() + if msg: + state["anime"] = msg + +@anime_search.got("anime", "图呢?") +async def _deal_sear(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + msg = state["anime"] + img = re.findall(r"url=(.*?)]", msg) + if not img: + await anime_search.reject("请发送图片而不是其它东西!!") + + a = await Anime().search(img[0]) + result = f"> {MessageSegment.at(user_id)}\n" + a + _anime_flmt.start_cd(user_id) + await anime_search.finish(Message(result)) diff --git a/ATRI/plugins/call_owner.py b/ATRI/plugins/call_owner.py deleted file mode 100644 index bcbae73..0000000 --- a/ATRI/plugins/call_owner.py +++ /dev/null @@ -1,78 +0,0 @@ -from nonebot.permission import SUPERUSER -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.config import BotSelfConfig -from ATRI.utils.apscheduler import scheduler -from ATRI.utils.list import count_list - - -repo_list = [] -__doc__ = """ -给维护者留言 -权限组:所有人 -用法: - 来杯红茶 (msg) -""" - -repo = sv.on_command(cmd="来杯红茶", docs=__doc__) - - [email protected]_parser # type: ignore -async def _repo_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) - if msg == "算了": - await repo.finish("好吧") - - if not msg: - await repo.reject("话呢?") - else: - state["msg_repo"] = msg - - -async def _repo(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["msg_repo"] = msg - - [email protected]("msg_repo", prompt="请告诉咱需要反馈的内容~!") -async def _repo_deal(bot: Bot, event: MessageEvent, state: T_State) -> None: - global repo_list - msg = state["msg_repo"] - user = event.user_id - - if count_list(repo_list, user) == 5: - await repo.finish("吾辈已经喝了五杯红茶啦!明天再来吧。") - - repo_list.append(user) - - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}") - - await repo.finish("吾辈的心愿已由咱转告给咱的维护者了~!") - - [email protected]_job("cron", hour=0, misfire_grace_time=60) -async def _() -> None: - global repo_list - repo_list.clear() - - -__doc__ = """ -重置给维护者的留言次数 -权限组:维护者 -用法: - /重置红茶 -""" - -reset_repo = sv.on_command(cmd="重置红茶", docs=__doc__, permission=SUPERUSER) - - -@reset_repo.handle() -async def _reset_repo(bot: Bot, event: MessageEvent) -> None: - global repo_list - repo_list.clear() - await reset_repo.finish("红茶重置完成~!") diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py new file mode 100644 index 0000000..4864be5 --- /dev/null +++ b/ATRI/plugins/chat/__init__.py @@ -0,0 +1,112 @@ +from random import choice + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.utils import CoolqCodeChecker +from ATRI.utils.limit import FreqLimiter +from ATRI.utils.apscheduler import scheduler +from .data_source import Chat + + +_chat_flmt = FreqLimiter(3) +_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) + + +chat = Chat().on_message("闲聊(文爱") + +async def _chat(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await chat.finish(_chat_flmt_notice) + + msg = str(event.message) + repo = await Chat().deal(msg, user_id) + _chat_flmt.start_cd(user_id) + await chat.finish(repo) + +my_name_is = Chat().on_command("叫我", "更改闲聊(划掉 文爱)时的称呼", aliases={"我是"}, priority=1) + +@my_name_is.args_parser # type: ignore +async def _get_name(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await my_name_is.finish("好吧...") + if not msg: + await my_name_is.reject("欧尼酱想让咱如何称呼你呢!0w0") + else: + state["name"] = msg + +@my_name_is.handle() +async def _name(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await my_name_is.finish(_chat_flmt_notice) + + msg = str(event.message).strip() + if msg: + state["name"] = msg + +@my_name_is.got("name") +async def _deal_name(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + new_name = state["name"] + repo = choice([ + f"好~w 那咱以后就称呼你为{new_name}!", + f"噢噢噢!原来你叫{new_name}阿~", + f"好欸!{new_name}ちゃん~~~", + "很不错的称呼呢w" + ]) + Chat().name_is(user_id, new_name) + _chat_flmt.start_cd(user_id) + await my_name_is.finish(repo) + +say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1) + [email protected]_parser # type: ignore +async def _get_say(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await say.finish("好吧...") + if not msg: + await say.reject("欧尼酱想让咱如何称呼你呢!0w0") + else: + state["say"] = msg + +async def _ready_say(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await say.finish(_chat_flmt_notice) + + msg = str(event.message) + if msg: + state["say"] = msg + [email protected]("say") +async def _deal_say(bot: Bot, event: MessageEvent, state: T_State): + msg = state["say"] + check = CoolqCodeChecker(msg).check + if not check: + repo = choice([ + "不要...", + "这个咱不想复读!", + "不可以", + "不好!" + ]) + await say.finish(repo) + + user_id = event.get_user_id() + _chat_flmt.start_cd(user_id) + await say.finish(msg) + + [email protected]_job("interval", hours=3, misfire_grace_time=60) +async def _check_kimo(): + try: + await Chat().update_data() + except BaseException: + pass diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/chat/data_source.py new file mode 100644 index 0000000..1b0eec9 --- /dev/null +++ b/ATRI/plugins/chat/data_source.py @@ -0,0 +1,139 @@ +import os +import json +from pathlib import Path +from jieba import posseg +from random import choice, shuffle + +from ATRI.service import Service +from ATRI.rule import to_bot, is_in_service +from ATRI.log import logger as log +from ATRI.utils import request +from ATRI.exceptions import ReadFileError, WriteError + + +__doc__ = """ +好像有点涩?(偏文爱,需at +""" + +CHAT_PATH = Path(".") / "ATRI" / "data" / "database" / "chat" +os.makedirs(CHAT_PATH, exist_ok=True) +KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" + + +class Chat(Service): + + def __init__(self): + Service.__init__(self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5) + + @staticmethod + async def _request(url: str) -> dict: + res = await request.get(url) + data = await res.json() + return data + + @classmethod + async def _generate_data(cls) -> None: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + log.warning("未检测到闲聊词库,生成中") + data = await cls._request(KIMO_URL) + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("生成完成") + except WriteError: + raise WriteError("Writing kimo words failed!") + + @classmethod + async def _load_data(cls) -> dict: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + with open(path, "r", encoding="utf-8") as r: + data = json.loads(r.read()) + return data + + @classmethod + async def update_data(cls) -> None: + log.info("更新闲聊词库ing...") + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + updata_data = await cls._request(KIMO_URL) + data = json.loads(path.read_bytes()) + for i in updata_data: + if i not in data: + data[i] = updata_data[i] + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("闲聊词库更新完成") + + @staticmethod + def name_is(user_id: str, new_name: str): + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = {} + + data = json.loads(path.read_bytes()) + data[user_id] = new_name + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + except ReadFileError: + raise ReadFileError("Update user name failed!") + + @staticmethod + def load_name(user_id: str) -> str: + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return "你" + + data = json.loads(path.read_bytes()) + try: + result = data[user_id] + except BaseException: + result = "你" + return result + + @classmethod + async def deal(cls, msg: str, user_id: str) -> str: + keywords = posseg.lcut(msg) + shuffle(keywords) + + data = await cls._load_data() + + repo = str() + for i in keywords: + a = i.word + b = list(a) + try: + if b[0] == b[1]: + a = b[0] + except BaseException: + pass + if a in data: + repo = data.get(a, str()) + + if not repo: + temp_data = list(data) + shuffle(temp_data) + for i in temp_data: + if i in msg: + repo = data.get(i, str()) + + a = choice(repo) if type(repo) is list else repo + user_name = cls.load_name(user_id) + repo = a.replace("你", user_name) + return repo diff --git a/ATRI/plugins/code_runner.py b/ATRI/plugins/code_runner.py deleted file mode 100644 index 363815f..0000000 --- a/ATRI/plugins/code_runner.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -Idea from: https://github.com/cczu-osa/aki -""" -import json -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.rule import is_in_service -from ATRI.service import Service as sv -from ATRI.utils.request import post_bytes -from ATRI.exceptions import RequestError - - -RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest" -SUPPORTED_LANGUAGES = { - "assembly": {"ext": "asm"}, - "bash": {"ext": "sh"}, - "c": {"ext": "c"}, - "clojure": {"ext": "clj"}, - "coffeescript": {"ext": "coffe"}, - "cpp": {"ext": "cpp"}, - "csharp": {"ext": "cs"}, - "erlang": {"ext": "erl"}, - "fsharp": {"ext": "fs"}, - "go": {"ext": "go"}, - "groovy": {"ext": "groovy"}, - "haskell": {"ext": "hs"}, - "java": {"ext": "java", "name": "Main"}, - "javascript": {"ext": "js"}, - "julia": {"ext": "jl"}, - "kotlin": {"ext": "kt"}, - "lua": {"ext": "lua"}, - "perl": {"ext": "pl"}, - "php": {"ext": "php"}, - "python": {"ext": "py"}, - "ruby": {"ext": "rb"}, - "rust": {"ext": "rs"}, - "scala": {"ext": "scala"}, - "swift": {"ext": "swift"}, - "typescript": {"ext": "ts"}, -} - - -__doc__ = """ -在线运行代码 -权限组:所有人 -用法: - /code (lang) (code) -示例: - /code python - print('Hello world!') -""" - -code_runner = sv.on_command(cmd="/code", docs=__doc__, rule=is_in_service("code")) - - -@code_runner.handle() -async def _code_runner(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split("\n") - - if msg[0] == "list": - msg0 = "咱现在支持的语言如下:\n" - msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys())) - - await code_runner.finish(msg0) - elif not msg[0]: - await code_runner.finish("请键入/help以获取更多支持...") - - laug = msg[0].replace("\r", "") - if laug not in SUPPORTED_LANGUAGES: - await code_runner.finish("该语言暂不支持...") - - del msg[0] - code = "\n".join(map(str, msg)) - try: - req = await post_bytes( - RUN_API_URL_FORMAT.format(laug), - json={ - "files": [ - { - "name": ( - SUPPORTED_LANGUAGES[laug].get("name", "main") - + f".{SUPPORTED_LANGUAGES[laug]['ext']}" - ), - "content": code, - } - ], - "stdin": "", - "command": "", - }, - ) - except RequestError: - raise RequestError("Failed to request!") - - payload = json.loads(req) - sent = False - for k in ["stdout", "stderr", "error"]: - v = payload.get(k) - lines = v.splitlines() - lines, remained_lines = lines[:10], lines[10:] - out = "\n".join(lines) - out, remained_out = out[: 60 * 10], out[60 * 10 :] - - if remained_lines or remained_out: - out += f"\n(太多了太多了...)" - - if out: - await bot.send(event, f"{k}:\n\n{out}") - sent = True - - if not sent: - await code_runner.finish("Running success! Nothing print.") diff --git a/ATRI/plugins/code_runner/__init__.py b/ATRI/plugins/code_runner/__init__.py new file mode 100644 index 0000000..dfe6162 --- /dev/null +++ b/ATRI/plugins/code_runner/__init__.py @@ -0,0 +1,35 @@ +from random import choice + +from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp.message import Message, MessageSegment + +from ATRI.utils.limit import FreqLimiter +from .data_source import CodeRunner + + +_flmt = FreqLimiter(5) +_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) + + +code_runner = CodeRunner().on_command("/code", "在线运行一段代码,帮助:/code help") + +@code_runner.handle() +async def _code_runner(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _flmt.check(user_id): + await code_runner.finish(_flmt_notice) + + msg = str(event.get_message()) + args = msg.split("\n") + + if not args: + content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!" + elif args[0] == "help": + content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().help() + elif args[0] == "list": + content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().list_supp_lang() + else: + content = MessageSegment.at(user_id) + await CodeRunner().runner(msg) + + _flmt.start_cd(user_id) + await code_runner.finish(Message(content)) diff --git a/ATRI/plugins/code_runner/data_source.py b/ATRI/plugins/code_runner/data_source.py new file mode 100644 index 0000000..bbe0d2e --- /dev/null +++ b/ATRI/plugins/code_runner/data_source.py @@ -0,0 +1,111 @@ +from ATRI.rule import is_in_service +from ATRI.service import Service +from ATRI.utils import request +from ATRI.exceptions import RequestError + + +RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest" +SUPPORTED_LANGUAGES = { + "assembly": {"ext": "asm"}, + "bash": {"ext": "sh"}, + "c": {"ext": "c"}, + "clojure": {"ext": "clj"}, + "coffeescript": {"ext": "coffe"}, + "cpp": {"ext": "cpp"}, + "csharp": {"ext": "cs"}, + "erlang": {"ext": "erl"}, + "fsharp": {"ext": "fs"}, + "go": {"ext": "go"}, + "groovy": {"ext": "groovy"}, + "haskell": {"ext": "hs"}, + "java": {"ext": "java", "name": "Main"}, + "javascript": {"ext": "js"}, + "julia": {"ext": "jl"}, + "kotlin": {"ext": "kt"}, + "lua": {"ext": "lua"}, + "perl": {"ext": "pl"}, + "php": {"ext": "php"}, + "python": {"ext": "py"}, + "ruby": {"ext": "rb"}, + "rust": {"ext": "rs"}, + "scala": {"ext": "scala"}, + "swift": {"ext": "swift"}, + "typescript": {"ext": "ts"}, +} + + +__doc__ = """ +在线跑代码 +""" + + +class CodeRunner(Service): + + def __init__(self): + Service.__init__(self, "在线跑代码", __doc__, rule=is_in_service("在线跑代码")) + + @staticmethod + def help() -> str: + return ( + "/code {语言}\n" + "{代码}\n" + "For example:\n" + "/code python\n" + "print('hello world')" + ) + + @staticmethod + def list_supp_lang() -> str: + msg0 = "咱现在支持的语言如下:\n" + msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys())) + return msg0 + + @staticmethod + async def runner(msg: str): + args = msg.split("\n") + if not args: + return "请检查键入内容..." + + lang = args[0].replace("\r", "") + if lang not in SUPPORTED_LANGUAGES: + return "该语言暂不支持..." + + del args[0] + code = "\n".join(map(str, args)) + url = RUN_API_URL_FORMAT.format(lang) + js = { + "files": [ + { + "name": ( + SUPPORTED_LANGUAGES[lang].get("name", "main") + + f".{SUPPORTED_LANGUAGES[lang]['ext']}" + ), + "content": code, + } + ], + "stdin": "", + "command": "" + } + + try: + res = await request.post(url, json=js) + except RequestError: + raise RequestError("Request failed!") + + payload = await res.json() + sent = False + for k in ["stdout", "stderr", "error"]: + v = payload.get(k) + lines = v.splitlines() + lines, remained_lines = lines[:10], lines[10:] + out = "\n".join(lines) + out, remained_out = out[: 60 * 10], out[60 * 10 :] + + if remained_lines or remained_out: + out += f"\n(太多了太多了...)" + + if out: + return f"\n{k}:\n{out}" + + if not sent: + return "\n运行完成,没任何输出呢..."
\ No newline at end of file diff --git a/ATRI/plugins/curse.py b/ATRI/plugins/curse.py new file mode 100644 index 0000000..b2dbc05 --- /dev/null +++ b/ATRI/plugins/curse.py @@ -0,0 +1,57 @@ +from random import choice + +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.rule import is_in_service, to_bot +from ATRI.service import Service +from ATRI.utils import request +from ATRI.utils.limit import FreqLimiter + + +URL = "https://zuanbot.com/api.php?level=min&lang=zh_cn" + +_curse_flmt = FreqLimiter(3) +_curse_flmt_notice = choice(["我看你是找🔨是吧", "给我适可而止阿!?", "扎布多得了😅", "z?是m吗?我凑那也太恐怖了", "?"]) + + +__doc__ = """ +口臭!你急了你急了! +""" + + +class Curse(Service): + + def __init__(self): + Service.__init__(self, "口臭", __doc__, rule=is_in_service("口臭")) + + @staticmethod + async def now() -> str: + res = await request.get(URL) + result = await res.text # type: ignore + return result + + +normal_curse = Curse().on_command("口臭一下", "主命令,骂你一下", aliases={"骂我", "口臭"}, rule=to_bot()) + +@normal_curse.handle() +async def _deal_n_curse(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _curse_flmt.check(user_id): + await normal_curse.finish(_curse_flmt_notice) + + result = await Curse().now() + _curse_flmt.start_cd(user_id) + await normal_curse.finish(result) + + +super_curse = Curse().on_regex(r"[来求有](.*?)骂我吗?", "有求必应") + +@super_curse.handle() +async def _deal_s_curse(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _curse_flmt.check(user_id): + await normal_curse.finish(_curse_flmt_notice) + + result = await Curse().now() + _curse_flmt.start_cd(user_id) + await normal_curse.finish(result) diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py deleted file mode 100644 index 702e5ac..0000000 --- a/ATRI/plugins/curse/__init__.py +++ /dev/null @@ -1,42 +0,0 @@ -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service, to_bot -from ATRI.utils.list import count_list, del_list_aim -from ATRI.utils.request import get_text -from ATRI.exceptions import RequestError - - -URL = "https://zuanbot.com/api.php?level=min&lang=zh_cn" -sick_list = [] - - -__doc__ = """ -口臭一下 -权限组:所有人 -用法: - 口臭,口臭一下,骂我 -""" - -curse = sv.on_command( - cmd="口臭", docs=__doc__, aliases={"口臭一下,骂我"}, rule=is_in_service("口臭") & to_bot() -) - - -async def _curse(bot: Bot, event: MessageEvent) -> None: - global sick_list - user = event.get_user_id() - if count_list(sick_list, user) == 3: - sick_list.append(user) - repo = "不是??你这么想被咱骂的嘛??" "被咱骂就这么舒服的吗?!" "该......你该不会是.....M吧!" - await curse.finish(repo) - elif count_list(sick_list, user) == 6: - sick_list = del_list_aim(sick_list, user) - await curse.finish("给我适可而止阿!?") - else: - sick_list.append(user) - try: - await curse.finish(await get_text(URL)) - except RequestError: - raise RequestError("Time out!") diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 6a14f1a..5369c9b 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -1,16 +1,16 @@ import os -import time import json -import shutil -from pathlib import Path -from random import choice +import asyncio from datetime import datetime +from pydantic.main import BaseModel +from random import choice, randint +from pathlib import Path +import nonebot from nonebot.typing import T_State from nonebot.matcher import Matcher from nonebot.message import run_preprocessor from nonebot.exception import IgnoredException -from nonebot.adapters.cqhttp.message import Message from nonebot.adapters.cqhttp import ( Bot, MessageEvent, @@ -21,246 +21,224 @@ from nonebot.adapters.cqhttp import ( GroupDecreaseNoticeEvent, GroupAdminNoticeEvent, GroupBanNoticeEvent, - LuckyKingNotifyEvent, - GroupUploadNoticeEvent, GroupRecallNoticeEvent, FriendRecallNoticeEvent, ) import ATRI -from ATRI.log import logger -from ATRI.exceptions import WriteError +from ATRI.service import Service +from ATRI.log import logger as log +from ATRI.rule import is_in_service from ATRI.config import BotSelfConfig -from ATRI.service import Service as sv -from ATRI.utils.cqcode import coolq_code_check +from ATRI.utils import CoolqCodeChecker -PLUGIN_INFO_DIR = Path(".") / "ATRI" / "data" / "service" / "services" +driver = ATRI.driver() +bots = nonebot.get_bots() + ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" -os.makedirs(PLUGIN_INFO_DIR, exist_ok=True) +MANEGE_DIR = Path(".") / "ATRI" / "data" / "database" / "manege" os.makedirs(ESSENTIAL_DIR, exist_ok=True) - - -driver = ATRI.driver() +os.makedirs(MANEGE_DIR, exist_ok=True) @driver.on_startup -async def startup() -> None: - logger.info("アトリは、高性能ですから!") +async def startup(): + log.info("アトリは、高性能ですから!") @driver.on_shutdown -async def shutdown() -> None: - logger.info("Thanks for using.") - logger.debug("bot已停止运行,正在清理插件信息...") - try: - shutil.rmtree(PLUGIN_INFO_DIR) - logger.debug("成功!") - except Exception: - repo = ("清理插件信息失败", "请前往 ATRI/data/service/services 下", "将 services 整个文件夹删除") - time.sleep(10) - raise Exception(repo) - - [email protected]_bot_connect -async def connect(bot) -> None: - for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 成功连接,数据开始传输。") - - [email protected]_bot_disconnect -async def disconnect(bot) -> None: - for superuser in BotSelfConfig.superusers: - try: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 貌似断开了呢...") - except: - logger.error("WebSocket 已断开,等待重连") +async def shutdown(): + log.info("Thanks for using.") @run_preprocessor # type: ignore async def _check_block( matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State ) -> None: - user = str(event.user_id) - try: - msg = str(event.message) - except: - msg = "" - if not sv.BlockSystem.auth_user(user): - raise IgnoredException(f"Block user: {user}") - - if not sv.Dormant.is_dormant(): - if "/dormant" not in msg: - raise IgnoredException("Bot has been dormant.") + user_file = "block_user.json" + path = MANEGE_DIR / user_file + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + + user_id = event.get_user_id() + if user_id in data: + raise IgnoredException(f"Block user: {user_id}") if isinstance(event, GroupMessageEvent): - group = str(event.group_id) - if not sv.BlockSystem.auth_group(group): - raise IgnoredException(f"Block group: {group}") - + group_file = "block_group.json" + path = MANEGE_DIR / group_file + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + + group_id = str(event.group_id) + if group_id in data: + raise IgnoredException(f"Block group: {user_id}") -@run_preprocessor # type: ignore -async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> None: - if isinstance(event, GroupMessageEvent): - if event.sub_type == "normal": - now_time = datetime.now().strftime("%Y-%m-%d") - GROUP_DIR = ESSENTIAL_DIR / "chat_history" / f"{event.group_id}" - os.makedirs(GROUP_DIR, exist_ok=True) - path = GROUP_DIR / f"{now_time}.chat.json" - now_time = datetime.now().strftime("%Y%m%d-%H%M%S") - - try: - data = json.loads(path.read_bytes()) - except: - data = dict() - data[str(event.message_id)] = { - "date": now_time, - "time": str(time.time()), - "post_type": str(event.post_type), - "sub_type": str(event.sub_type), - "user_id": str(event.user_id), - "group_id": str(event.group_id), - "message_type": str(event.message_type), - "message": str(event.message), - "raw_message": event.raw_message, - "font": str(event.font), - "sender": { - "user_id": str(event.sender.user_id), - "nickname": event.sender.nickname, - "sex": event.sender.sex, - "age": str(event.sender.age), - "card": event.sender.card, - "area": event.sender.area, - "level": event.sender.level, - "role": event.sender.role, - "title": event.sender.title, - }, - "to_me": str(event.to_me), - } - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - logger.debug(f"写入消息成功,id: {event.message_id}") - except WriteError: - logger.error("消息记录失败,可能是缺少文件的原因!") - else: - pass - else: - pass - else: - pass +class FriendRequestInfo(BaseModel): + user_id: str + comment: str + time: str + is_approve: bool -# 处理:好友请求 -request_friend_event = sv.on_request() +class GroupRequestInfo(BaseModel): + user_id: str + comment: str + time: str + is_approve: bool -@request_friend_event.handle() -async def _request_friend_event(bot, event: FriendRequestEvent) -> None: - file_name = "request_friend.json" - path = ESSENTIAL_DIR / file_name - path.parent.mkdir(exist_ok=True, parents=True) - try: - data = json.loads(path.read_bytes()) - except: - data = dict() - data[event.flag] = {"user_id": event.user_id, "comment": event.comment} - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - except WriteError: - raise WriteError("Writing file failed!") +__doc__ = """ +对bot基础/必须请求进行处理 +""" - for superuser in BotSelfConfig.superusers: - msg = ( - "主人,收到一条好友请求:\n" - f"请求人:{event.get_user_id()}\n" - f"申请信息:{event.comment}\n" - f"申请码:{event.flag}" - ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) +class Essential(Service): + + def __init__(self): + Service.__init__(self, "基础部件", __doc__) -# 处理:邀请入群,如身为管理,还附有入群请求 -request_group_event = sv.on_request() +friend_add_event = Essential().on_request("好友添加") -@request_group_event.handle() -async def _request_group_event(bot, event: GroupRequestEvent) -> None: - file_name = "request_group.json" +@friend_add_event.handle() +async def _friend_add(bot: Bot, event: FriendRequestEvent): + """ + 存储文件结构: + { + "Apply code": { + "user_id": "User ID", + "comment": "Comment content" + "time": "Time", + "is_approve": bool # Default: False + } + } + """ + file_name = "friend_add.json" path = ESSENTIAL_DIR / file_name - path.parent.mkdir(exist_ok=True, parents=True) - - try: - data = json.loads(path.read_bytes()) - except: + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) data = dict() - data[event.flag] = { - "user_id": event.user_id, - "group_id": event.group_id, - "sub_type": event.sub_type, - "comment": event.comment, + + apply_code = event.flag + apply_comment = event.comment + user_id = event.get_user_id() + now_time = datetime.now() + + data = json.loads(path.read_bytes()) + data[apply_code] = FriendRequestInfo( + user_id=user_id, + comment=apply_comment, + time=now_time, + is_approve=False + ) + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + + repo = ( + "咱收到一条好友请求...\n" + f"请求人:{user_id}\n" + f"申请信息:{apply_comment}\n" + f"申请码:{apply_code}\n" + "Tip:好友申请 帮助" + ) + for superuser in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=superuser, message=repo) + + +group_invite_event = Essential().on_request("邀请入群") + +@group_invite_event.handle() +async def _group_invite(bot: Bot, event: GroupRequestEvent): + """ + 存储文件结构: + { + "Apply code": { + "user_id": "User ID", + "comment": "Comment content" + "time": "Time", + "is_approve": bool # Default: False + } } - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - except WriteError: - raise WriteError("Writing file failed!") - + """ + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + apply_code = event.flag + apply_comment = event.comment + user_id = event.get_user_id() + now_time = datetime.now() + + data = json.loads(path.read_bytes()) + data[apply_code] = GroupRequestInfo( + user_id=user_id, + comment=apply_comment, + time=now_time, + is_approve=False + ) + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + + repo = ( + "咱收到一条群聊邀请请求...\n" + f"请求人:{user_id}\n" + f"申请信息:{apply_comment}\n" + f"申请码:{apply_code}\n" + "Tip:群聊邀请 帮助" + ) for superuser in BotSelfConfig.superusers: - msg = ( - "主人,收到一条入群请求:\n" - f"请求人:{event.get_user_id()}\n" - f"申请信息:{event.comment}\n" - f"申请码:{event.flag}" - ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) - + await bot.send_private_msg(user_id=superuser, message=repo) -# 处理群成员变动 -group_member_event = sv.on_notice() +group_member_event = Essential().on_notice("群成员变动") @group_member_event.handle() -async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None: - msg = "好欸!事新人!\n" f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!" +async def _group_member_join(bot: Bot, event: GroupIncreaseNoticeEvent): + await asyncio.sleep(randint(1, 6)) + msg = ( + "好欸!事新人!\n" + f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!" + ) await group_member_event.finish(msg) - @group_member_event.handle() -async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: - if event.is_tome(): - if event.user_id != event.self_id: - return - msg = "呜呜呜,主人" f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." - for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) - else: - await group_member_event.finish("阿!有人离开了我们...") +async def _group_member_left(bot: Bot, event: GroupDecreaseNoticeEvent): + await asyncio.sleep(randint(1, 6)) + await group_member_event.finish("呜——有人跑了...") -# 处理群管理事件 -group_admin_event = sv.on_notice() - +group_admin_event = Essential().on_notice("群管理变动") @group_admin_event.handle() -async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: +async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent): if not event.is_tome(): return for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( + await bot.send_private_msg( user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" ) -# 处理群禁言事件 -group_ban_event = sv.on_notice() - +group_ban_event = Essential().on_notice("群禁言变动") @group_ban_event.handle() -async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: +async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent): if not event.is_tome(): return @@ -271,70 +249,63 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: f"时长...是 {event.duration} 秒" ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) else: msg = "好欸!主人\n" f"咱在群 {event.group_id} 的口球被 {event.operator_id} 解除了!" for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) -# 处理群红包运气王事件 -lucky_read_bag_event = sv.on_notice() - - -@lucky_read_bag_event.handle() -async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: - msg = "8行,这可忍?" f"gkd [CQ:at,qq={event.user_id}] 发一个!" - await lucky_read_bag_event.finish(Message(msg)) - - -# 处理群文件上传事件 -group_file_upload_event = sv.on_notice() - - -@group_file_upload_event.handle() -async def _group_file_upload_event(bot, event: GroupUploadNoticeEvent) -> None: - await group_file_upload_event.finish("让我康康传了啥好东西") - - -# 处理撤回事件 -recall_event = sv.on_notice() - +recall_event = Essential().on_notice("撤回事件") @recall_event.handle() -async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None: +async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent): + if event.is_tome(): + return + try: repo = await bot.get_msg(message_id=event.message_id) - except: + except BaseException: return + user = event.user_id group = event.group_id repo = str(repo["message"]) - check = await coolq_code_check(repo, group=group) + check = CoolqCodeChecker(repo).check if not check: repo = repo.replace("CQ", "QC") - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[群:{event.group_id}]\n" "撤回了\n" f"{repo}" - + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{user}@[群:{group}]\n" + "撤回了\n" + f"{repo}" + ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) @recall_event.handle() -async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: +async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent): + if event.is_tome(): + return + try: repo = await bot.get_msg(message_id=event.message_id) - except: + except BaseException: return user = event.user_id repo = str(repo["message"]) - check = await coolq_code_check(repo, user) + check = CoolqCodeChecker(repo).check if not check: repo = repo.replace("CQ", "QC") - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[私聊]" "撤回了\n" f"{repo}" - - await bot.send(event, "咱看到惹~!") + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{user}@[私聊]" + "撤回了\n" + f"{repo}" + ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py deleted file mode 100644 index a2092e3..0000000 --- a/ATRI/plugins/funny.py +++ /dev/null @@ -1,148 +0,0 @@ -import json -import re -import asyncio -from pathlib import Path -from random import choice, randint - -from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment - -from ATRI.service import Service as sv -from ATRI.utils.limit import is_too_exciting -from ATRI.rule import is_in_service -from ATRI.utils.request import post_bytes -from ATRI.utils.translate import to_simple_string -from ATRI.exceptions import RequestError - - -__doc__ = """ -看不懂的笑话 -权限组:所有人 -用法: - 来句笑话 -""" - -get_laugh = sv.on_command(cmd="来句笑话", docs=__doc__, rule=is_in_service("来句笑话")) - - -@get_laugh.handle() -async def _get_laugh(bot: Bot, event: MessageEvent) -> None: - user_name = event.sender.nickname - laugh_list = [] - - FILE = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt" - with open(FILE, "r", encoding="utf-8") as r: - for line in r: - laugh_list.append(line.strip("\n")) - - result = choice(laugh_list) - await get_laugh.finish(result.replace("%name", user_name)) - - -me_to_you = sv.on_message(priority=5) - - -@me_to_you.handle() -async def _me_to_you(bot: Bot, event: MessageEvent) -> None: - if randint(0, 15) == 5: - msg = str(event.message) - if "我" in msg and "CQ" not in msg: - await me_to_you.finish(msg.replace("我", "你")) - - -__doc__ = """ -伪造转发 -权限组:所有人 -用法: - /fakemsg qq*name*msg... -补充: - qq: QQ号 - name: 消息中的ID - msg: 对应信息 -示例: - /fakemsg 123456789*生草人*草 114514*仙贝*臭死了 -""" - -fake_msg = sv.on_command(cmd="/fakemsg", docs=__doc__, rule=is_in_service("fakemsg")) - - -@fake_msg.handle() -async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None: - msg = str(event.message).split(" ") - user = event.user_id - group = event.group_id - node = list() - check = is_too_exciting(user, 1, seconds=600) - - if check: - for i in msg: - args = i.split("*") - qq = args[0] - name = args[1].replace("[", "[") - name = name.replace("]", "]") - repo = args[2].replace("[", "[") - repo = repo.replace("]", "]") - dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}} - node.append(dic) - await bot.send_group_forward_msg(group_id=group, messages=node) - - -EAT_URL = "https://wtf.hiigara.net/api/run/{}" - -eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service("今天吃什么")) - - -@eat_wat.handle() -async def _eat(bot: Bot, event: MessageEvent) -> None: - msg = str(event.raw_message).strip() - msg = re.search(r"大?[今|明|后]天(.*?)吃什么", msg).group() - user = event.user_id - user_n = event.sender.nickname - arg = re.findall(r"大?[今|明|后]天(.*?)吃什么", msg)[0] - nd = re.match(r"大?[今|明|后]天", msg)[0] - - if arg == "中午": - a = f"LdS4K6/{randint(0, 999999)}" - url = EAT_URL.format(a) - params = {"event": "ManualRun"} - try: - data = json.loads(await post_bytes(url, params)) - except RequestError: - raise RequestError("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", nd) - get_a = re.search(r"非常(.*?)的", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, "") - - elif arg == "晚上": - a = f"KaTMS/{randint(0, 999999)}" - url = EAT_URL.format(a) - params = {"event": "ManualRun"} - try: - data = json.loads(await post_bytes(url, params)) - except RequestError: - raise RequestError("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", "") - result = f"> {MessageSegment.at(user)}\n" + text - - else: - rd = randint(1, 10) - if rd == 5: - result = "吃我吧 ❤" - else: - a = f"JJr1hJ/{randint(0, 999999)}" - url = EAT_URL.format(a) - params = {"event": "ManualRun"} - try: - data = json.loads(await post_bytes(url, params)) - except RequestError: - raise RequestError("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", nd) - get_a = re.match(r"(.*?)的智商", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace( - get_a, f"{user_n}的智商" - ) - - await eat_wat.finish(Message(result)) diff --git a/ATRI/plugins/funny/__init__.py b/ATRI/plugins/funny/__init__.py new file mode 100644 index 0000000..de98381 --- /dev/null +++ b/ATRI/plugins/funny/__init__.py @@ -0,0 +1,86 @@ +from random import choice, randint + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent +from nonebot.adapters.cqhttp.message import Message + +from ATRI.utils.limit import FreqLimiter, DailyLimiter +from.data_source import Funny + + +get_laugh = Funny().on_command("来句笑话", "隐晦的笑话...") + +@get_laugh.handle() +async def _get_laugh(bot: Bot, event: MessageEvent): + user_name = event.sender.nickname or "该裙友" + await get_laugh.finish(Funny().idk_laugh(user_name)) + + +me_re_you = Funny().on_regex(r"我", "我也不懂咋解释", block=False) + +@me_re_you.handle() +async def _me_re_you(bot: Bot, event: MessageEvent): + if randint(0, 15) == 5: + msg = str(event.get_message()) + content, is_ok = Funny().me_re_you(msg) + if is_ok: + await me_re_you.finish(content) + + +fake_msg = Funny().on_command("/fakemsg", "伪造假转发内容,格式:qq-name-content\n可构造多条,使用空格隔开,仅限群聊") + +_fake_daliy_max = DailyLimiter(3) +_fake_max_notice = "不能继续下去了!明早再来" +_fake_flmt = FreqLimiter(60) +_fake_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) + +@fake_msg.args_parser # type: ignore +async def _perp_fake(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await fake_msg.finish("好吧...") + if not msg: + await fake_msg.reject("内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开") + else: + state["content"] = msg + +@fake_msg.handle() +async def _ready_fake(bot: Bot, event: GroupMessageEvent, state: T_State): + user_id = event.get_user_id() + if not _fake_daliy_max.check(user_id): + await fake_msg.finish(_fake_max_notice) + if not _fake_flmt.check(user_id): + await fake_msg.finish(_fake_flmt_notice) + + msg = str(event.message).strip() + if msg: + state["content"] = msg + +@fake_msg.got("content", "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开") +async def _deal_fake(bot: Bot, event: GroupMessageEvent, state: T_State): + content = state["content"] + group_id = event.group_id + user_id = event.get_user_id() + node = Funny().fake_msg(content) + await bot.send_group_forward_msg(group_id=group_id, messages=node) + + _fake_flmt.start_cd(user_id) + _fake_daliy_max.increase(user_id) + + +eat_what = Funny().on_regex(r"大?[今明后]天(.*?)吃[什啥]么?", "我来决定你吃什么!") + +_eat_flmt = FreqLimiter(15) + +@eat_what.handle() +async def _eat_what(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _eat_flmt.check(user_id): + return + + msg = str(event.get_message()) + user_name = event.sender.nickname or "裙友" + eat = await Funny().eat_what(user_name, msg) + _eat_flmt.start_cd(user_id) + await eat_what.finish(Message(eat)) diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py new file mode 100644 index 0000000..a27cdb5 --- /dev/null +++ b/ATRI/plugins/funny/data_source.py @@ -0,0 +1,113 @@ +import re + +from pathlib import Path +from random import choice, randint +from nonebot.adapters.cqhttp.utils import unescape +from nonebot.adapters.cqhttp.message import MessageSegment + +from ATRI.service import Service +from ATRI.exceptions import RequestError +from ATRI.utils import request, Translate +from ATRI.rule import is_in_service + + +__doc__ = """ +乐1乐,莫当真 +""" + + +class Funny(Service): + + def __init__(self): + Service.__init__(self, "乐", __doc__, rule=is_in_service("乐")) + + @staticmethod + def idk_laugh(name: str) -> str: + laugh_list = list() + + file_path = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt" + with open(file_path, encoding="utf-8") as r: + for line in r: + laugh_list.append(line.strip("\n")) + + rd: str = choice(laugh_list) + result = rd.replace("%name", name) + return result + + @staticmethod + def me_re_you(msg: str) -> tuple: + if "我" in msg and "[CQ" not in msg: + return msg.replace("我", "你"), True + else: + return msg, False + + @staticmethod + def fake_msg(text: str) -> list: + arg = text.split(" ") + node = list() + + for i in arg: + args = i.split("-") + qq = args[0] + name = unescape(args[1]) + repo = unescape(args[2]) + dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}} + node.append(dic) + return node + + @staticmethod + async def eat_what(name: str, msg: str) -> str: + EAT_URL = "https://wtf.hiigara.net/api/run/" + params = {"event": "ManualRun"} + pattern_0 = r"大?[今明后]天(.*?)吃[什啥]么?" + pattern_1 = r"(今|明|后|大后)天" + arg = re.findall(pattern_0, msg)[0] + day = re.match(pattern_1, msg).group(0) # type: ignore + + if arg == "中午": + a = f"LdS4K6/{randint(0, 1145141919810)}" + url = EAT_URL + a + try: + data = await request.post(url, params=params) + data = await data.json() + except RequestError: + raise RequestError("Request failed!") + + text = Translate(data["text"]).to_simple().replace("今天", day) + get_a = re.search(r"非常(.*?)的", text).group(0) # type: ignore + result = text.replace(get_a, "") + + elif arg == "晚上": + a = f"KaTMS/{randint(0, 1145141919810)}" + url = EAT_URL + a + try: + data = await request.post(url, params=params) + data = await data.json() + except RequestError: + raise RequestError("Request failed!") + + result = Translate(data["text"]).to_simple().replace("今天", day) + + else: + rd = randint(1, 10) + if rd == 5: + result = [ + "吃我吧 ❤", + "(脸红)请...请享用咱吧......", + "都可以哦~不能挑食呢~" + ] + return choice(result) + else: + a = f"JJr1hJ/{randint(0, 1145141919810)}" + url = EAT_URL + a + try: + data = await request.post(url, params=params) + data = await data.json() + except RequestError: + raise RequestError("Request failed!") + + text = Translate(data["text"]).to_simple().replace("今天", day) + get_a = re.match(r"(.*?)的智商", text).group(0) # type: ignore + result = text.replace(get_a, f"{name}的智商") + + return result
\ No newline at end of file diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py deleted file mode 100644 index c8c8f2b..0000000 --- a/ATRI/plugins/github.py +++ /dev/null @@ -1,43 +0,0 @@ -import re -import json -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.utils.request import get_bytes -from ATRI.exceptions import RequestError - - -URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}" - - -github_issues = sv.on_message() - - -@github_issues.handle() -async def _github_issues(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message) - patt = r"https://github.com/(.*)/(.*)/issues/(.*)" - need_info = re.findall(patt, msg) - if not need_info: - return - - for i in need_info: - need_info = list(i) - owner = need_info[0] - repo = need_info[1] - issue_number = need_info[2] - url = URL.format(owner=owner, repo=repo, issue_number=issue_number) - - try: - data = await get_bytes(url) - except RequestError: - return - - data = json.loads(data) - msg0 = ( - f"{repo}: #{issue_number} {data['state']}\n" - f"comments: {data['comments']}\n" - f"update: {data['updated_at']}\n" - f"{data['body']}" - ) - await github_issues.finish(msg0) diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py deleted file mode 100644 index d2754b1..0000000 --- a/ATRI/plugins/help.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -import json - -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import SERVICE_DIR -from ATRI.service import Service as sv - - -SERVICE_DIR = SERVICE_DIR / "services" - - -__doc__ = """ -查询命令用法 -权限组:所有人 -用法: - /help - /help list - /help info (cmd) -""" - -help = sv.on_command(cmd="/help", docs=__doc__) - - -async def _help(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - if msg[0] == "": - msg = ( - "呀?找不到路了?\n" - "/help list 查看可用命令列表\n" - "/help info (cmd) 查看命令具体帮助\n" - "项目地址:github.com/Kyomotoi/ATRI\n" - "咱只能帮你这么多了qwq" - ) - await help.finish(msg) - elif msg[0] == "list": - files = [] - for _, _, i in os.walk(SERVICE_DIR): - for a in i: - f = SERVICE_DIR / a - files.append(json.loads(f.read_bytes())["command"]) - cmds = " | ".join(map(str, files)) - msg = "咱能做很多事!比如:\n" + cmds - msg0 = msg + "\n没反应可能是没权限...或者为探测类型...不属于可直接触发命令..." - await help.finish(msg0) - elif msg[0] == "info": - cmd = msg[1] - data = {} - path = SERVICE_DIR / f"{cmd.replace('/', '')}.json" - try: - data = json.loads(path.read_bytes()) - except: - await help.finish("未找到相关命令...") - - msg = f"{cmd} INFO:\n" f"Enabled: {data['enabled']}\n" f"{data['docs']}" - await help.finish(msg) - else: - await help.finish("请检查输入...") diff --git a/ATRI/plugins/help/__init__.py b/ATRI/plugins/help/__init__.py new file mode 100644 index 0000000..1d9af19 --- /dev/null +++ b/ATRI/plugins/help/__init__.py @@ -0,0 +1,48 @@ +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.rule import to_bot +from .data_source import Helper + + +main_help = Helper().on_command("菜单", "获取食用bot的方法", rule=to_bot(), aliases={"/help", "menu"}) + +@main_help.handle() +async def _main_help(bot: Bot, event: MessageEvent): + repo = Helper().menu() + await main_help.finish(repo) + + +about_me = Helper().on_command("关于", "获取关于bot的信息", rule=to_bot(), aliases={"about"}) + +@about_me.handle() +async def _about_me(bot: Bot, event: MessageEvent): + repo = Helper().about() + await about_me.finish(repo) + + +service_list = Helper().on_command("服务列表", "查看所有可用服务", rule=to_bot(), aliases={"功能列表"}) + +@service_list.handle() +async def _service_list(bot: Bot, event: MessageEvent): + repo = Helper().service_list() + await service_list.finish(repo) + + +service_info = Helper().on_command("帮助", "获取服务详细帮助") + +@service_info.handle() +async def _ready_service_info(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).split(" ") + service = msg[0] + try: + cmd = msg[1] + except BaseException: + cmd = str() + + if not cmd: + repo = Helper().service_info(service) + await service_info.finish(repo) + + repo = Helper().cmd_info(service, cmd) + await service_info.finish(repo) diff --git a/ATRI/plugins/help/data_source.py b/ATRI/plugins/help/data_source.py new file mode 100644 index 0000000..930f4bb --- /dev/null +++ b/ATRI/plugins/help/data_source.py @@ -0,0 +1,112 @@ +import os + +from ATRI import __version__ +from ATRI.service import Service, SERVICES_DIR, ServiceTools +from ATRI.config import BotSelfConfig +from ATRI.exceptions import ReadFileError + + +SERVICE_INFO_FORMAT = """ +服务名:{service} +说明:{docs} +可用命令:\n{cmd_list} +是否全局启用:{enabled} +Tip: 帮助 [服务] [命令] 以查看对应命令详细信息 +""".strip() + + +COMMAND_INFO_FORMAT = """ +命令:{cmd} +类型:{cmd_type} +说明:{docs} +更多触发方式:{aliases} +""".strip() + + +class Helper(Service): + + def __init__(self): + Service.__init__(self, "帮助", "bot的食用指南~") + + @staticmethod + def menu() -> str: + return ( + "哦呀?~需要帮助?\n" + "关于 -查看bot基本信息\n" + "服务列表 -以查看所有可用服务\n" + "帮助 [服务] -以查看对应服务帮助\n" + "Tip: 均需要at触发。菜单 打开此页面" + ) + + @staticmethod + def about() -> str: + temp_list = list() + for i in BotSelfConfig.nickname: + temp_list.append(i) + nickname = "、".join(map(str, temp_list)) + return ( + "唔...是来认识咱的么\n" + f"可以称呼咱:{nickname}\n" + f"咱的型号是:{__version__}\n" + "想进一步了解:\n" + "https://github.com/Kyomotoi/ATRI" + ) + + @staticmethod + def service_list() -> str: + files = os.listdir(SERVICES_DIR) + temp_list = list() + for i in files: + service = i.replace(".json", "") + temp_list.append(service) + + msg0 = "咱搭载了以下服务~\n" + services = " | ".join(map(str, temp_list)) + msg0 = msg0 + services + repo = msg0 + "\n帮助 [服务] -以查看对应服务帮助" + return repo + + @staticmethod + def service_info(service: str) -> str: + try: + data = ServiceTools().load_service(service) + except ReadFileError: + return "请检查是否输入错误呢..." + + service_name = data.get("service", "error") + service_docs = data.get("docs", "error") + service_enabled = data.get("enabled", True) + + _service_cmd_list = list(data.get("cmd_list", {"error"})) + service_cmd_list = "\n".join(map(str, _service_cmd_list)) + + repo = SERVICE_INFO_FORMAT.format( + service=service_name, + docs=service_docs, + cmd_list=service_cmd_list, + enabled=service_enabled + ) + return repo + + @staticmethod + def cmd_info(service: str, cmd: str) -> str: + try: + data = ServiceTools().load_service(service) + except ReadFileError: + return "请检查是否输入错误..." + + cmd_list: dict = data["cmd_list"] + cmd_info = cmd_list.get(cmd, dict()) + if not cmd_info: + return "请检查命令是否输入错误..." + cmd_type = cmd_info.get("type", "ignore") + docs = cmd_info.get("docs", "ignore") + aliases = cmd_info.get("aliases", "ignore") + + repo = COMMAND_INFO_FORMAT.format( + cmd=cmd, + cmd_type=cmd_type, + docs=docs, + aliases=aliases + ) + return repo diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py deleted file mode 100644 index 19c96f6..0000000 --- a/ATRI/plugins/hitokoto.py +++ /dev/null @@ -1,51 +0,0 @@ -import json -from random import choice, randint -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.rule import is_in_service, to_bot -from ATRI.service import Service as sv -from ATRI.exceptions import RequestError -from ATRI.utils.list import count_list, del_list_aim -from ATRI.utils.request import get_bytes - -URL = [ - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/a.json", - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/b.json", - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json", -] -sick_list = [] - - -__doc__ = """ -抑郁一下 -权限组:所有人 -用法: - @ 一言 -""" - -hitokoto = sv.on_command( - cmd="一言", aliases={"抑郁一下", "网抑云"}, docs=__doc__, rule=is_in_service("一言") & to_bot() -) - - -async def _hitokoto(bot: Bot, event: MessageEvent) -> None: - global sick_list - msg = str(event.message) - user = event.get_user_id() - - if count_list(sick_list, user) == 3: - sick_list.append(user) - await hitokoto.finish("额......需要咱安慰一下嘛~?") - elif count_list(sick_list, user) == 6: - sick_list = del_list_aim(sick_list, user) - msg = "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" "我...我会守在你身边的!...嗯..一定" - await hitokoto.finish(msg) - else: - sick_list.append(user) - url = choice(URL) - try: - data = json.loads(await get_bytes(url)) - except RequestError: - raise RequestError("Request failed!") - await hitokoto.finish(data[randint(1, len(data) - 1)]["hitokoto"]) diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py deleted file mode 100644 index b316020..0000000 --- a/ATRI/plugins/manage/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -import nonebot -from pathlib import Path - - -_sub_plugins = set() - -_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve())) diff --git a/ATRI/plugins/manage/modules/block.py b/ATRI/plugins/manage/modules/block.py deleted file mode 100644 index b439ba1..0000000 --- a/ATRI/plugins/manage/modules/block.py +++ /dev/null @@ -1,153 +0,0 @@ -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv - - -__doc__ = """ -封禁用户 -权限组:维护者 -用法: - 封禁用户 QQ号 -""" - -block_user = sv.on_command(cmd="封禁用户", docs=__doc__, permission=SUPERUSER) - - -@block_user.args_parser # type: ignore -async def _block_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await block_user.finish("好吧...") - if not msg: - await block_user.reject("是谁呢?!GKD!") - else: - state["noob"] = msg - - -@block_user.handle() -async def _block_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["noob"] = msg - - -@block_user.got("noob", prompt="是谁呢?!GKD!") -async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob = state["noob"] - sv.BlockSystem.control_list(True, user=noob) - msg = f"用户[{noob}]已被封禁(;′⌒`)" - await block_user.finish(msg) - - -__doc__ = """ -解封用户 -权限组:维护者 -用法: - 解封用户 QQ号 -""" - -unblock_user = sv.on_command(cmd="解封用户", docs=__doc__, permission=SUPERUSER) - - -@unblock_user.args_parser # type: ignore -async def _unblock_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await unblock_user.finish("好吧...") - if not msg: - await unblock_user.reject("要原谅谁呢...") - else: - state["forgive"] = msg - - -@unblock_user.handle() -async def _unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["forgive"] = msg - - -@unblock_user.got("forgive", prompt="要原谅谁呢...") -async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - forgive = state["forgive"] - sv.BlockSystem.control_list(False, user=forgive) - msg = f"用户[{forgive}]已被解封ヾ(´・ω・`)ノ" - await unblock_user.finish(msg) - - -__doc__ = """ -封禁群 -权限组:维护者 -用法: - 封禁群 群号 -""" - -block_group = sv.on_command(cmd="封禁群", docs=__doc__, permission=SUPERUSER) - - -@block_group.args_parser # type: ignore -async def _block_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await block_user.finish("好吧...") - if not msg: - await block_user.reject("是哪个群?!GKD!") - else: - state["noob_g"] = msg - - -@block_group.handle() -async def _block_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["noob_g"] = msg - - -@block_group.got("noob_g", prompt="是哪个群?!GKD!") -async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob_g = state["noob_g"] - sv.BlockSystem.control_list(True, group=noob_g) - msg = f"群[{noob_g}]已被封禁(;′⌒`)" - await block_user.finish(msg) - - -__doc__ = """ -解封群 -权限组:维护者 -用法: - 解封 群号 -""" - -unblock_group = sv.on_command(cmd="解封群", docs=__doc__, permission=SUPERUSER) - - -@unblock_group.args_parser # type: ignore -async def _unblock_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await block_user.finish("好吧...") - if not msg: - await block_user.reject("要原谅哪个群呢...") - else: - state["forgive_g"] = msg - - -@unblock_group.handle() -async def _unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["forgive_g"] = msg - - -@unblock_group.got("forgive_g", prompt="要原谅哪个群呢...") -async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - forgive_g = state["forgive_g"] - sv.BlockSystem.control_list(False, group=forgive_g) - msg = f"群[{forgive_g}]已被解封ヾ(´・ω・`)ノ" - await unblock_user.finish(msg) diff --git a/ATRI/plugins/manage/modules/broadcast.py b/ATRI/plugins/manage/modules/broadcast.py deleted file mode 100644 index 7f7816d..0000000 --- a/ATRI/plugins/manage/modules/broadcast.py +++ /dev/null @@ -1,66 +0,0 @@ -import asyncio -from random import randint - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv - - -__doc__ = """ -群发内容 -权限组:维护者 -用法: - 广播 内容 -""" - -broadcast = sv.on_command(cmd="广播", docs=__doc__, permission=SUPERUSER) - - [email protected]_parser # type: ignore -async def _broadcast_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await broadcast.finish("好吧...") - if not msg: - await broadcast.reject("想群发啥呢0w0") - else: - state["msg"] = msg - - -async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["msg"] = msg - - [email protected]("msg", prompt="请告诉咱需要群发的内容~!") -async def _deal_broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["msg"] - group_list = await bot.get_group_list() - succ_list = [] - err_list = [] - - for group in group_list: - await asyncio.sleep(randint(0, 2)) - try: - await bot.send_group_msg(group_id=group["group_id"], message=msg) - except BaseException: - err_list.append(group["group_id"]) - - msg0 = "" - for i in err_list: - msg0 += f" {i}\n" - - repo_msg = ( - f"推送消息:\n{msg}\n" - "————————\n" - f"总共:{len(group_list)}\n" - f"成功推送:{len(succ_list)}\n" - f"失败[{len(err_list)}]个:\n" - ) + msg0 - - await broadcast.finish(repo_msg) diff --git a/ATRI/plugins/manage/modules/debug.py b/ATRI/plugins/manage/modules/debug.py deleted file mode 100644 index fa21c5b..0000000 --- a/ATRI/plugins/manage/modules/debug.py +++ /dev/null @@ -1,135 +0,0 @@ -from aiohttp import FormData - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.log import LOGGER_DIR, NOW_TIME -from ATRI.utils.file import open_file -from ATRI.utils.ub_paste import paste -from ATRI.exceptions import load_error - - -level_list = ["info", "warning", "error", "debug"] - - -__doc__ = """ -获取控制台信息 -权限组:维护者 -用法: - 获取log 等级 行数 -示例: - 获取log info -20(最新20行) -""" - -get_console = sv.on_command( - cmd="获取log", - aliases={"获取LOG", "获取控制台", "获取控制台信息"}, - docs=__doc__, - permission=SUPERUSER, -) - - -@get_console.handle() -async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).split(" ") - if msg: - state["level"] = msg[0] - try: - state["line"] = msg[1] - except Exception: - pass - - -@get_console.got("level", prompt="需要获取的等级是?") -async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None: - quit_list = ["算了", "罢了", "不了"] - if state["level"] in quit_list: - await get_console.finish("好吧...") - - -@get_console.got("line", prompt="需要获取的行数是?") -async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None: - level = state["level"] - line = state["line"] - repo = str() - - path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log" - logs = await open_file(path, "readlines") - - try: - content = logs[int(line) :] # type: ignore - repo = "\n".join(content).replace("[36mATRI[0m", "ATRI") - except IndexError: - await get_console.finish(f"行数错误...max: {len(logs)}") # type: ignore - - data = FormData() - data.add_field("poster", "ATRI running log") - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", repo) - - msg0 = f"> {event.sender.nickname}\n" - msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" - await track_error.finish(msg0) - - -__doc__ = """ -追踪错误 -权限组:维护者 -用法: - track 追踪ID -""" - -track_error = sv.on_command( - cmd="track", aliases={"追踪"}, docs=__doc__, permission=SUPERUSER -) - - -@track_error.args_parser # type: ignore -async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await track_error.finish("好吧...") - if not msg: - await track_error.reject("欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") - else: - state["track"] = msg - - -@track_error.handle() -async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["track"] = msg - - -@track_error.got("track", prompt="欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") -async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None: - track_id = state["track"] - data = dict() - - try: - data = load_error(track_id) - except BaseException: - await track_error.finish("未发现对应ID呢...(⇀‸↼‶)") - - msg = ( - f"ID: [{track_id}]\n" - f"Time: [{data['time']}]\n" - f"Prompt: [{data['prompt']}]\n" - "——————\n" - f"{data['content']}" - ) - - data = FormData() - data.add_field("poster", track_id) - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", msg) - - msg0 = f"> {event.sender.nickname}\n" - msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" - await track_error.finish(msg0) diff --git a/ATRI/plugins/manage/modules/dormant.py b/ATRI/plugins/manage/modules/dormant.py deleted file mode 100644 index cd72d47..0000000 --- a/ATRI/plugins/manage/modules/dormant.py +++ /dev/null @@ -1,43 +0,0 @@ -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.rule import to_bot - - -__doc__ = """ -休眠,不处理任何信息 -权限组:维护者 -用法: - @ 休眠 -""" - -dormant_enabled = sv.on_command( - cmd="休眠", docs=__doc__, rule=to_bot(), permission=SUPERUSER -) - - -@dormant_enabled.handle() -async def _dormant_enabled(bot: Bot, event: MessageEvent) -> None: - sv.Dormant.control_dormant(True) - msg = "已进入休眠状态...期间咱不会回应任何人的消息哦..." - await dormant_enabled.finish(msg) - - -__doc__ = """ -苏醒,开始处理信息 -权限组:维护者 -用法: - @ 苏醒 -""" - -dormant_disabled = sv.on_command( - cmd="苏醒", docs=__doc__, rule=to_bot(), permission=SUPERUSER -) - - -@dormant_disabled.handle() -async def _dormant_disabled(bot: Bot, event: MessageEvent) -> None: - sv.Dormant.control_dormant(False) - msg = "唔...早上好...——哇哈哈" - await dormant_disabled.finish(msg) diff --git a/ATRI/plugins/manage/modules/request.py b/ATRI/plugins/manage/modules/request.py deleted file mode 100644 index 26100b1..0000000 --- a/ATRI/plugins/manage/modules/request.py +++ /dev/null @@ -1,114 +0,0 @@ -import re -import json -from pathlib import Path - -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.exceptions import ReadFileError, FormatError - - -ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" - - -__doc__ = """ -查看好友/群申请列表 -权限组:维护者 -用法: - 查看申请列表 -""" - -req_list = sv.on_command(cmd="申请列表", docs=__doc__, permission=SUPERUSER) - - -@req_list.handle() -async def _req_list(bot: Bot, event: MessageEvent) -> None: - path_f = ESSENTIAL_DIR / "request_friend.json" - path_g = ESSENTIAL_DIR / "request_group.json" - data_f, data_g = dict() - try: - data_f = json.loads(path_f.read_bytes()) - except ReadFileError: - msg_f = "[读取文件失败]" - try: - data_g = json.loads(path_g.read_bytes()) - except ReadFileError: - msg_g = "[读取文件失败]" - - msg_f = str() - for i in data_f.keys(): - msg_f += f"{i} | {data_f[i]['user_id']} | {data_f[i]['comment']}\n" - - msg_g = str() - for i in data_g.keys(): - msg_g += f"{i} | {data_g[i]['sub_type']} | {data_g[i]['user_id']} | {data_g[i]['comment']}\n" - - msg = "好友/群申请列表如下:\n" "· 好友:\n" f"{msg_f}" "· 群:\n" f"{msg_g}" - await req_list.finish(msg) - - -req_deal = sv.on_regex(r"(同意|拒绝)(好友|群)申请", permission=SUPERUSER) - - -@req_deal.handle() -async def _req_deal(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - arg = re.findall(r"(同意|拒绝)(好友|群)申请", msg[0])[0] - app = arg[0] - _type = arg[1] - - if not msg[1]: - await req_deal.finish(f"正确用法!速看\n{app}{_type}申请 (reqid)") - - reqid = msg[1] - if app == "同意": - if _type == "好友": - try: - await bot.set_friend_add_request(flag=reqid, approve=True) - await req_deal.finish("完成~!已同意申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - elif _type == "群": - path = ESSENTIAL_DIR / "request_group.json" - data_g = dict() - try: - data_g = json.loads(path.read_bytes()) - except FileExistsError: - await req_deal.finish("读取群数据失败,可能并没有请求...") - - try: - await bot.set_group_add_request( - flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=True - ) - await req_deal.finish("完成~!已同意申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - else: - await req_deal.finish("请检查输入的值是否正确——!") - elif app == "拒绝": - if _type == "好友": - try: - await bot.set_friend_add_request(flag=reqid, approve=False) - await req_deal.finish("完成~!已拒绝申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - elif _type == "群": - path = ESSENTIAL_DIR / "request_group.json" - data_g = dict() - try: - data_g = json.loads(path.read_bytes()) - except FileExistsError: - await req_deal.finish("读取群数据失败,可能并没有请求...") - - try: - await bot.set_group_add_request( - flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=False - ) - await req_deal.finish("完成~!已拒绝申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - else: - await req_deal.finish("请检查输入的值是否正确——!") - else: - await req_deal.finish("请检查输入的值是否正确——!") diff --git a/ATRI/plugins/manage/modules/service.py b/ATRI/plugins/manage/modules/service.py deleted file mode 100644 index a3624df..0000000 --- a/ATRI/plugins/manage/modules/service.py +++ /dev/null @@ -1,179 +0,0 @@ -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp.permission import GROUP_ADMIN, GROUP_OWNER -from nonebot.adapters.cqhttp import ( - Bot, - MessageEvent, - GroupMessageEvent, - PrivateMessageEvent, -) - -from ATRI.service import Service as sv - - -__doc__ = """ -启用功能,针对单群 -权限组:维护者,群管理 -用法: - 启用 目标命令 -示例: - 启用 以图搜图 -""" - -cur_service_ena = sv.on_command( - cmd="启用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER -) - - -@cur_service_ena.args_parser # type: ignore -async def _cur_ena_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await cur_service_ena.finish("好吧...") - if not msg: - await cur_service_ena.reject("请告诉咱目标命令!") - else: - state["service_e"] = msg - - -@cur_service_ena.handle() -async def _cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_e"] = msg - - -@cur_service_ena.got("service_e", prompt="请告诉咱目标命令!") -async def _deal_cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - cmd = state["service_e"] - group = str(event.group_id) - sv.control_service(cmd, False, True, group=group) - await cur_service_ena.finish(f"成功!本群已启用:{cmd}") - - -@cur_service_ena.handle() -async def _refuse_cur_ena(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: - await cur_service_ena.finish("只能在群聊中决定哦...") - - -__doc__ = """ -禁用功能,针对单群 -权限组:维护者,群管理 -用法: - 禁用 目标命令 -示例: - 禁用 以图搜图 -""" - -cur_service_dis = sv.on_command( - cmd="禁用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER -) - - -@cur_service_dis.args_parser # type: ignore -async def _cur_dis_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await cur_service_dis.finish("好吧...") - if not msg: - await cur_service_dis.reject("请告诉咱目标命令!") - else: - state["service_d"] = msg - - -@cur_service_dis.handle() -async def _cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_d"] = msg - - -@cur_service_dis.got("service_d", prompt="请告诉咱目标命令!") -async def _deal_cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - cmd = state["service_d"] - group = str(event.group_id) - sv.control_service(cmd, False, False, group=group) - await cur_service_dis.finish(f"成功!本群已禁用:{cmd}") - - -@cur_service_dis.handle() -async def _refuse_cur_dis(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: - await cur_service_dis.finish("只能在群聊中决定哦...") - - -__doc__ = """ -全局启用功能 -权限组:维护者 -用法: - 全局启用 目标命令 -示例: - 全局启用 以图搜图 -""" - -glo_service_ena = sv.on_command(cmd="全局启用", docs=__doc__, permission=SUPERUSER) - - -@glo_service_ena.args_parser # type: ignore -async def _glo_ena_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await glo_service_ena.finish("好吧...") - if not msg: - await glo_service_ena.reject("请告诉咱目标命令!") - else: - state["service_e_g"] = msg - - -@glo_service_ena.handle() -async def _glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_e_g"] = msg - - -@glo_service_ena.got("service_e_g", prompt="请告诉咱目标命令!") -async def _deal_glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: - cmd = state["service_e_g"] - sv.control_service(cmd, True, True) - await glo_service_ena.finish(f"成功!已全局启用:{cmd}") - - -__doc__ = """ -全局禁用功能 -权限组:维护者 -用法: - 全局禁用 目标命令 -示例: - 全局禁用 以图搜图 -""" - -glo_service_dis = sv.on_command(cmd="全局禁用", docs=__doc__, permission=SUPERUSER) - - -@glo_service_dis.args_parser # type: ignore -async def _glo_dis_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await glo_service_dis.finish("好吧...") - if not msg: - await glo_service_dis.reject("请告诉咱目标命令!") - else: - state["service_d_g"] = msg - - -@glo_service_dis.handle() -async def _glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_d_g"] = msg - - -@glo_service_dis.got("service_d_g", prompt="请告诉咱目标命令!") -async def _deal_glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: - cmd = state["service_d_g"] - sv.control_service(cmd, True, False) - await glo_service_dis.finish(f"成功!已全局禁用:{cmd}") diff --git a/ATRI/plugins/manage/modules/shutdown.py b/ATRI/plugins/manage/modules/shutdown.py deleted file mode 100644 index 78f23e8..0000000 --- a/ATRI/plugins/manage/modules/shutdown.py +++ /dev/null @@ -1,32 +0,0 @@ -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv - - -__doc__ = """ -紧急停机 -权限组:维护者 -用法: - @ 关机 -""" - -shutdown = sv.on_command(cmd="关机", docs=__doc__, permission=SUPERUSER) - - -async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["msg"] = msg - - [email protected]("msg", prompt="[WARNING]此项操作将强行终止bot运行,是否继续(y/n)") -async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: - t = ["y", "Y", "是"] - if state["msg"] in t: - await bot.send(event, "咱还会醒来的,一定") - exit(0) - else: - await shutdown.finish("再考虑下吧 ;w;") diff --git a/ATRI/plugins/manege/__init__.py b/ATRI/plugins/manege/__init__.py new file mode 100644 index 0000000..d0337e9 --- /dev/null +++ b/ATRI/plugins/manege/__init__.py @@ -0,0 +1,451 @@ +import re + +from nonebot.typing import T_State +from nonebot.permission import SUPERUSER +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent +from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN + +from .data_source import Manege + + +# 求1个pr把这里优化,写得我想吐了 + + +block_user = Manege().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER) + +@block_user.args_parser # type: ignore +async def _get_block_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await block_user.finish("...看来有人逃过一劫呢") + if not msg: + await block_user.reject("哪位?GKD!") + else: + state["block_user"] = msg + +@block_user.handle() +async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["block_user"] = msg + +@block_user.got("block_user", "哪位?GKD!") +async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State): + user_id = state["block_user"] + is_ok = Manege().block_user(user_id) + if not is_ok: + await block_user.finish("kuso!封禁失败了...") + + await block_user.finish(f"用户 {user_id} 危!") + + +unblock_user = Manege().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER) + +@unblock_user.args_parser # type: ignore +async def _get_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") + if not msg: + await unblock_user.reject("哪位?GKD!") + else: + state["unblock_user"] = msg + +@unblock_user.handle() +async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["unblock_user"] = msg + +@unblock_user.got("unblock_user", "哪位?GKD!") +async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + user_id = state["unblock_user"] + is_ok = Manege().unblock_user(user_id) + if not is_ok: + await unblock_user.finish("kuso!解封失败了...") + + await unblock_user.finish(f"好欸!{user_id} 重获新生!") + + +block_group = Manege().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER) + +@block_group.args_parser # type: ignore +async def _get_block_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await block_group.finish("...看来有一群逃过一劫呢") + if not msg: + await block_group.reject("哪个群?GKD!") + else: + state["block_group"] = msg + +@block_group.handle() +async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["block_group"] = msg + +@block_group.got("block_group", "哪个群?GKD!") +async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State): + group_id = state["block_group"] + is_ok = Manege().block_group(group_id) + if not is_ok: + await block_group.finish("kuso!封禁失败了...") + + await block_group.finish(f"群 {group_id} 危!") + + +unblock_group = Manege().on_command("解封群", "对目标群进行解封", permission=SUPERUSER) + +@unblock_group.args_parser # type: ignore +async def _get_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") + if not msg: + await unblock_group.reject("哪个群?GKD!") + else: + state["unblock_group"] = msg + +@unblock_group.handle() +async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["unblock_group"] = msg + +@unblock_group.got("unblock_group", "哪个群?GKD!") +async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + group_id = state["unblock_group"] + is_ok = Manege().unblock_group(group_id) + if not is_ok: + await unblock_group.finish("kuso!解封失败了...") + + await unblock_group.finish(f"好欸!群 {group_id} 重获新生!") + + +global_block_service = Manege().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER) + +@global_block_service.args_parser # type: ignore +async def _get_global_block_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await global_block_service.finish("好吧...") + if not msg: + await global_block_service.reject("阿...是哪个服务呢") + else: + state["global_block_service"] = msg + +@global_block_service.handle() +async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["global_block_service"] = msg + +@global_block_service.got("global_block_service", "阿...是哪个服务呢") +async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State): + block_service = state["global_block_service"] + is_ok = Manege().control_global_service(block_service, False) + if not is_ok: + await global_block_service.finish("kuso!禁用失败了...") + + await global_block_service.finish(f"服务 {block_service} 已被禁用") + + +global_unblock_service = Manege().on_command("全局启用", "全局启用某服务", permission=SUPERUSER) + +@global_unblock_service.args_parser # type: ignore +async def _get_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await global_unblock_service.finish("好吧...") + if not msg: + await global_unblock_service.reject("阿...是哪个服务呢") + else: + state["global_unblock_service"] = msg + +@global_unblock_service.handle() +async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["global_unblock_service"] = msg + +@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") +async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + block_service = state["global_unblock_service"] + is_ok = Manege().control_global_service(block_service, True) + if not is_ok: + await global_unblock_service.finish("kuso!启用服务失败了...") + + await global_unblock_service.finish(f"服务 {block_service} 已启用") + + +user_block_service = Manege().on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER) + +@user_block_service.handle() +async def _user_block_service(bot: Bot, event: MessageEvent): + msg = str(event.message).strip() + pattern = r"对用户(.*?)禁用(.*)" + reg = re.findall(pattern, msg) + aim_user = reg[0] + aim_service = reg[1] + + is_ok = Manege().control_user_service(aim_service, aim_user, False) + if not is_ok: + await user_block_service.finish("禁用失败...请检查服务名是否正确") + await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}") + + + +user_unblock_service = Manege().on_regex(r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER) + +@user_unblock_service.handle() +async def _user_unblock_service(bot: Bot, event: MessageEvent): + msg = str(event.message).strip() + pattern = r"对用户(.*?)启用(.*)" + reg = re.findall(pattern, msg) + aim_user = reg[0] + aim_service = reg[1] + + is_ok = Manege().control_user_service(aim_service, aim_user, True) + if not is_ok: + await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中") + await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}") + + +group_block_service = Manege().on_command("禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) + +@group_block_service.args_parser # type: ignore +async def _get_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await group_block_service.finish("好吧...") + if not msg: + await group_block_service.reject("阿...是哪个服务呢") + else: + state["group_block_service"] = msg + +@group_block_service.handle() +async def _ready_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["group_block_service"] = msg + +@group_block_service.got("group_block_service", "阿...是哪个服务呢") +async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + aim_service = state["group_block_service"] + group_id = str(event.group_id) + + is_ok = Manege().control_group_service(aim_service, group_id, False) + if not is_ok: + await group_block_service.finish("禁用失败...请检查服务名是否输入正确") + await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}") + + +group_unblock_service = Manege().on_command("启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) + +@group_unblock_service.args_parser # type: ignore +async def _get_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await group_unblock_service.finish("好吧...") + if not msg: + await group_unblock_service.reject("阿...是哪个服务呢") + else: + state["group_unblock_service"] = msg + +@group_unblock_service.handle() +async def _ready_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["group_unblock_service"] = msg + +@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") +async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + aim_service = state["group_unblock_service"] + group_id = str(event.group_id) + + is_ok = Manege().control_group_service(aim_service, group_id, True) + if not is_ok: + await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中") + await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}") + + +get_friend_add_list = Manege().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER) + +@get_friend_add_list.handle() +async def _get_friend_add_list(bot: Bot, event: MessageEvent): + data = Manege().load_friend_apply_list() + temp_list = list() + for i in data: + apply_code = i + apply_user = data[i]["user_id"] + apply_comment = data[i]["comment"] + temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" + temp_list.append(temp_msg) + + msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) + msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" + await get_friend_add_list.finish(msg1) + + +approve_friend_add = Manege().on_command("同意好友", "同意好友申请", permission=SUPERUSER) + +@approve_friend_add.args_parser # type: ignore +async def _get_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await approve_friend_add.finish("好吧...") + if not msg: + await approve_friend_add.reject("申请码GKD!") + else: + state["approve_friend_add"] = msg + +@approve_friend_add.handle() +async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["approve_friend_add"] + +@approve_friend_add.got("approve_friend_add", "申请码GKD!") +async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["approve_friend_add"] + try: + await bot.set_friend_add_request(flag=apply_code, approve=True) + except BaseException: + await approve_friend_add.finish("同意失败...尝试下手动?") + data = Manege().load_friend_apply_list() + data.pop(apply_code) + Manege().save_friend_apply_list(data) + await approve_friend_add.finish("好欸!申请已通过!") + + +refuse_friend_add = Manege().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER) + +@refuse_friend_add.args_parser # type: ignore +async def _get_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await refuse_friend_add.finish("好吧...") + if not msg: + await refuse_friend_add.reject("申请码GKD!") + else: + state["refuse_friend_add"] = msg + +@refuse_friend_add.handle() +async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["refuse_friend_add"] + +@refuse_friend_add.got("refuse_friend_add", "申请码GKD!") +async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["refuse_friend_add"] + try: + await bot.set_friend_add_request(flag=apply_code, approve=False) + except BaseException: + await refuse_friend_add.finish("拒绝失败...尝试下手动?") + data = Manege().load_friend_apply_list() + data.pop(apply_code) + Manege().save_friend_apply_list(data) + await refuse_friend_add.finish("已拒绝!") + + +get_group_invite_list = Manege().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER) + +@get_group_invite_list.handle() +async def _get_group_invite_list(bot: Bot, event: MessageEvent): + data = Manege().load_invite_apply_list() + temp_list = list() + for i in data: + apply_code = i + apply_user = data[i]["user_id"] + apply_comment = data[i]["comment"] + temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" + temp_list.append(temp_msg) + + msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) + msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" + await get_friend_add_list.finish(msg1) + + +approve_group_invite = Manege().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER) + +@approve_group_invite.args_parser # type: ignore +async def _get_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await approve_group_invite.finish("好吧...") + if not msg: + await approve_group_invite.reject("申请码GKD!") + else: + state["approve_group_invite"] = msg + +@approve_group_invite.handle() +async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["approve_group_invite"] + +@approve_group_invite.got("approve_group_invite", "申请码GKD!") +async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["approve_group_invite"] + try: + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=True) + except BaseException: + await approve_group_invite.finish("同意失败...尝试下手动?") + data = Manege().load_invite_apply_list() + data.pop(apply_code) + Manege().save_invite_apply_list(data) + await approve_group_invite.finish("好欸!申请已通过!") + + +refuse_group_invite = Manege().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER) + +@refuse_group_invite.args_parser # type: ignore +async def _get_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await refuse_group_invite.finish("好吧...") + if not msg: + await refuse_group_invite.reject("申请码GKD!") + else: + state["refuse_group_invite"] = msg + +@refuse_group_invite.handle() +async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["refuse_group_invite"] + +@refuse_group_invite.got("refuse_group_invite", "申请码GKD!") +async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["refuse_group_invite"] + try: + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=False) + except BaseException: + await refuse_group_invite.finish("拒绝失败...尝试下手动?") + data = Manege().load_invite_apply_list() + data.pop(apply_code) + Manege().save_invite_apply_list(data) + await refuse_group_invite.finish("已拒绝!") + + +track_error = Manege().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) + +@track_error.handle() +async def _track_error(bot: Bot, event: MessageEvent): + track_id = str(event.message).strip() + repo = await Manege().track_error(track_id) + await track_error.finish(repo) diff --git a/ATRI/plugins/manege/data_source.py b/ATRI/plugins/manege/data_source.py new file mode 100644 index 0000000..933f6c6 --- /dev/null +++ b/ATRI/plugins/manege/data_source.py @@ -0,0 +1,281 @@ +import os +import json +from pathlib import Path +from aiohttp import FormData +from datetime import datetime + +from ATRI.service import Service, ServiceTools +from ATRI.utils import UbuntuPaste +from ATRI.exceptions import ReadFileError, load_error + + +MANEGE_DIR = Path(".") / "ATRI" / "data"/ "database" / "manege" +ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" +os.makedirs(MANEGE_DIR, exist_ok=True) +os.makedirs(ESSENTIAL_DIR, exist_ok=True) + + +TRACK_BACK_FORMAT = """ +Track ID:{track_id} +Prompt: {prompt} +Time: {time} +{content} +""".strip() + + +__doc__ = """ +控制bot的各项服务 +""" + + +class Manege(Service): + + def __init__(self): + Service.__init__(self, "管理", __doc__, True) + + @staticmethod + def _load_block_user_list() -> dict: + """ + 文件结构: + { + "Block user ID": { + "time": "Block time" + } + } + """ + file_name = "block_user.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + return data + + + @staticmethod + def _save_block_user_list(data: dict) -> None: + file_name = "block_user.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + def _load_block_group_list() -> dict: + """ + 文件结构: + { + "Block group ID": { + "time": "Block time" + } + } + """ + file_name = "block_group.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + return data + + @staticmethod + def _save_block_group_list(data: dict) -> None: + file_name = "block_group.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @classmethod + def block_user(cls, user_id: str) -> bool: + data = cls._load_block_user_list() + now_time = datetime.now() + data[user_id] = { + "time": now_time + } + try: + cls._save_block_user_list(data) + return True + except BaseException: + return False + + @classmethod + def unblock_user(cls, user_id: str) -> bool: + data: dict = cls._load_block_user_list() + if user_id not in data: + return False + + try: + data.pop(user_id) + cls._save_block_user_list(data) + return True + except BaseException: + return False + + @classmethod + def block_group(cls, group_id: str) -> bool: + data = cls._load_block_group_list() + now_time = datetime.now() + data[group_id] = { + "time": now_time + } + try: + cls._save_block_group_list(data) + return True + except BaseException: + return False + + @classmethod + def unblock_group(cls, group_id: str) -> bool: + data: dict = cls._load_block_group_list() + if group_id not in data: + return False + + try: + data.pop(group_id) + cls._save_block_group_list(data) + return True + except BaseException: + return False + + @staticmethod + def control_global_service(service: str, is_enabled: bool) -> bool: + """ + Only SUPERUSER. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + data["enabled"] = is_enabled + ServiceTools().save_service(data, service) + return True + + @staticmethod + def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool: + """ + Only SUPERUSER. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + temp_list: list = data.get("disable_user", list()) + + if is_enabled: + try: + temp_list.remove(user_id) + except BaseException: + return False + else: + temp_list.append(user_id) + data["disable_user"] = temp_list + ServiceTools().save_service(data, service) + return True + + @staticmethod + def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool: + """ + SUPERUSER and GROUPADMIN or GROUPOWNER. + Only current group. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + temp_list: list = data.get("disable_group", list()) + + if is_enabled: + try: + temp_list.remove(group_id) + except BaseException: + return False + else: + temp_list.append(group_id) + data["disable_group"] = temp_list + ServiceTools().save_service(data, service) + return True + + @staticmethod + def load_friend_apply_list() -> dict: + file_name = "friend_add.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + + data = json.loads(path.read_bytes()) + return data + + @staticmethod + def save_friend_apply_list(data: dict) -> None: + file_name = "friend_add.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + def load_invite_apply_list() -> dict: + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + + data = json.loads(path.read_bytes()) + return data + + @staticmethod + def save_invite_apply_list(data: dict) -> None: + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + async def track_error(track_id: str) -> str: + try: + data = load_error(track_id) + except ReadFileError: + return "请检查ID是否正确..." + + prompt = data.get("prompt", "ignore") + time = data.get("time", "ignore") + content = data.get("content", "ignore") + + msg0 = TRACK_BACK_FORMAT.format( + track_id=track_id, + prompt=prompt, + time=time, + content=content + ) + f_data = FormData() + f_data.add_field("poster", "ATRI running log") + f_data.add_field("syntax", "text") + f_data.add_field("expiration", "day") + f_data.add_field("content", msg0) + + repo = f"详细请移步此处~\n{await UbuntuPaste(data).paste()}" + return repo
\ No newline at end of file diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py deleted file mode 100644 index 692ac48..0000000 --- a/ATRI/plugins/nsfw.py +++ /dev/null @@ -1,116 +0,0 @@ -import re -import json - -from nonebot.adapters.cqhttp import Bot, GroupMessageEvent -from nonebot.typing import T_State - -from ATRI.log import logger as log -from ATRI.config import BotSelfConfig, NsfwCheck -from ATRI.service import Service as sv -from ATRI.exceptions import RequestError -from ATRI.rule import is_in_service -from ATRI.utils.request import get_bytes -from ATRI.utils.cqcode import coolq_code_check - - -nsfw_url = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url=" - - -nsfw_checking = sv.on_message() - - -@nsfw_checking.handle() -async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None: - if NsfwCheck.enabled: - msg = str(event.message) - user = event.user_id - group = event.group_id - check = await coolq_code_check(msg, user, group) - - if check: - if "image" not in msg: - return - - url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0] - try: - data = json.loads(await get_bytes(url)) - except: - log.warning("检测涩图失败,请查阅文档以获取帮助") - return - if round(data["score"], 4) * 100 >= NsfwCheck.passing_rate: - score = "{:.2%}".format(round(data["score"], 4)) - log.debug(f"截获涩图,得分:{score}") - for sup in BotSelfConfig.superusers: - await bot.send_private_msg( - user_id=sup, message=f"{msg}\n涩值: {score}" - ) - await bot.send(event, f"好涩哦!涩值:{score}\n不行了咱要发给主人看!") - else: - pass - - -__doc__ = """ -检测你图片的涩值 -权限组:所有人 -用法: - /nsfw (pic) -补充: - pic: 图片 -示例: - /nsfw 然后Bot会向你索取图片 -""" - -nsfw_reading = sv.on_command(cmd="/nsfw", docs=__doc__, rule=is_in_service("nsfw")) - - -@nsfw_reading.args_parser # type: ignore -async def _nsfw(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message) - quit_list = ["算了", "罢了", "不搜了"] - if msg in quit_list: - await nsfw_reading.finish("好吧") - - if not msg: - await nsfw_reading.reject("图呢?") - else: - state["pic_nsfw"] = msg - - -@nsfw_reading.handle() -async def _nsfw_r(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - user = event.user_id - group = event.group_id - msg = str(event.message).strip() - check = await coolq_code_check(msg, user, group) - if check and msg: - state["pic_nsfw"] = msg - - -@nsfw_reading.got("pic_nsfw", prompt="图呢?") -async def _nsfw_reading(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = state["pic_nsfw"] - pic = re.findall(r"url=(.*?)]", msg) - if not pic: - await nsfw_reading.reject("请发送图片而不是其它东西!!") - - url = nsfw_url + pic[0] - try: - data = json.loads(await get_bytes(url)) - except RequestError: - raise RequestError("Time out!") - - score = round(data["score"], 4) - result = "{:.2%}".format(round(data["score"], 4)) - if score >= 0.9: - level = "hso! 我要发给主人看!" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg( - user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}" - ) - elif 0.9 > score >= 0.6: - level = "嗯,可冲" - else: - level = "?能不能换张55完全冲不起来" - - repo = f"涩值:{result}\n{level}" - await nsfw_reading.finish(repo) diff --git a/ATRI/plugins/repo.py b/ATRI/plugins/repo.py new file mode 100644 index 0000000..db68721 --- /dev/null +++ b/ATRI/plugins/repo.py @@ -0,0 +1,71 @@ +from random import choice + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.service import Service +from ATRI.config import BotSelfConfig +from ATRI.utils.limit import FreqLimiter, DailyLimiter + + +_repo_flmt = FreqLimiter(20) +_repo_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) +_repo_dlmt = DailyLimiter(5) +_repo_dlmt_notice = "阿!不能再喝了,再喝就晕过去了!" + + +REPO_FORMAT = """ +来自用户{user}反馈: +{msg} +""" + + +class Repo(Service): + + def __init__(self): + Service.__init__(self, "反馈", "向维护者发送消息") + + +repo = Repo().on_command("来杯红茶", "向维护者发送消息", aliases={"反馈", "报告"}) + [email protected]_parser # type: ignore +async def _get_repo(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了", "不搜了", "取消"] + if msg in quit_list: + await repo.finish("好吧...") + if not msg: + await repo.reject("需要反馈的内容呢?~") + else: + state["repo"] = msg + +async def _ready_repo(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _repo_flmt.check(user_id): + await repo.finish(_repo_flmt_notice) + if not _repo_dlmt.check(user_id): + await repo.finish(_repo_dlmt_notice) + + msg = str(event.message).strip() + if msg: + state["repo"] = msg + [email protected]("repo", "需要反馈的内容呢?~") +async def _deal_repo(bot: Bot, event: MessageEvent, state: T_State): + msg = state["repo"] + user_id = event.get_user_id() + repo_0 = REPO_FORMAT.format( + user=user_id, + msg=msg + ) + + for superuser in BotSelfConfig.superusers: + try: + await bot.send_private_msg(user_id=superuser, message=repo_0) + except BaseException: + await repo.finish("发送失败了呢...") + + _repo_flmt.start_cd(user_id) + _repo_dlmt.increase(user_id) + await repo.finish("吾辈的心愿已由咱转告维护者!") diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py index 476eb88..87a2b02 100644 --- a/ATRI/plugins/rich/__init__.py +++ b/ATRI/plugins/rich/__init__.py @@ -1,70 +1,26 @@ -import re -import json -from random import choice -from aiohttp.client import ClientSession - from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import MessageSegment - -from ATRI.service import Service as sv -from ATRI.utils.request import get_bytes -from ATRI.utils.limit import is_too_exciting - -from .data_source import dec +from ATRI.utils.limit import FreqLimiter +from .data_source import Rich -temp_list = [] -img_url = [ - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/fkrich.png", - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg", -] +_rich_flmt = FreqLimiter(2) -bilibili_rich = sv.on_message() +bili_rich = Rich().on_message("小程序爪巴", block=False) -@bilibili_rich.handle() -async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: - global temp_list +@bili_rich.handle() +async def _fk_bili(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _rich_flmt.check(user_id): + return + + msg = str(event.message) try: - msg = str(event.raw_message).replace("\\", "") - bv = False - - if "qqdocurl" not in msg: - if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace("av", "") - else: - bv = re.findall(r"(BV\w+)", msg) - av = str(dec(bv[0])) - else: - patt = r"(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+" - bv_url = re.findall(patt, msg) - bv_url = bv_url[3] - async with ClientSession() as session: - async with session.get(url=bv_url) as r: - bv = re.findall(r"(BV\w+)", str(r.url)) - av = dec(bv[0]) - - if not bv: - if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace("av", "") - else: - return - - user = event.user_id - check = is_too_exciting(user, 1, 10) - if not check: - return - - URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}" - data = json.loads(await get_bytes(URL))["data"] - repo = ( - f"{data['bvid']} INFO:\n" - f"Title: {data['title']}\n" - f"Link: {data['short_link']}\n" - "にまねげぴのTencent rich!" - ) - await bot.send(event, MessageSegment.image(file=choice(img_url))) - await bilibili_rich.finish(repo) + result, is_ok = await Rich().fk_bili(msg) except BaseException: return + if not is_ok: + return + _rich_flmt.start_cd(user_id) + await bili_rich.finish(result) diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py index 59474ff..3277a58 100644 --- a/ATRI/plugins/rich/data_source.py +++ b/ATRI/plugins/rich/data_source.py @@ -1,5 +1,15 @@ +import re + +from ATRI.service import Service +from ATRI.utils import request +from ATRI.rule import is_in_service +from ATRI.exceptions import RequestError + + +URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid=" + table = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF" -tr = {} +tr = dict() for i in range(58): tr[table[i]] = i s = [11, 10, 3, 8, 4, 6] @@ -7,16 +17,90 @@ xor = 177451812 add = 8728348608 -def dec(x) -> int: - r = 0 - for i in range(6): - r += tr[x[s[i]]] * 58 ** i - return (r - add) ^ xor +__doc__ = """ +啥b腾讯小程序给👴爪巴 +目前只整了b站的 +""" -def enc(x) -> str: - x = (x ^ xor) + add - r = list("BV1 4 1 7 ") - for i in range(6): - r[s[i]] = table[x // 58 ** i % 58] - return "".join(r) +class Rich(Service): + + def __init__(self): + Service.__init__(self, "小程序处理", __doc__, rule=is_in_service("小程序处理")) + + @staticmethod + def _bv_dec(x) -> str: + r = 0 + for i in range(6): + r += tr[x[s[i]]] * 58 ** i + result = "av" + str((r - add) ^ xor) + return result + + @staticmethod + def _bv_enc(x) -> str: + x = (x ^ xor) + add + r = list("BV1 4 1 7 ") + for i in range(6): + r[s[i]] = table[x // 58 ** i % 58] + return "".join(r) + + @classmethod + async def fk_bili(cls, text: str) -> tuple: + """ + 为何本函数这么多 try,因为此函数被用于监听所有信息 + 如果真出现错误,就会一直刷屏 + """ + msg = text.replace("\\", "") + bv = False + + if "qqdocurl" not in msg: + if "av" in msg: + av = re.findall(r"(av\d+)", msg) + if not av: + return "Get value (av) failed!", False + av = av[0].replace("av", "") + else: + bv = re.findall(r"([Bb][Vv]\w+)", msg) + if not bv: + return "Get value (bv) failed!", False + av = str(cls._bv_dec(bv[0])).replace("av", "") + else: + pattern = r"(?:(?:https?):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+" + bv_url = re.findall(pattern, msg) + if not bv_url: + return "Get value (bv url) failed!", False + bv_url = bv_url[3] + + try: + res = await request.get(bv_url) + except RequestError: + return "Request failed!", False + bv = re.findall(r"(BV\w+)", str(res.url)) + if not bv: + return "Get value (bv) failed!", False + av = cls._bv_dec(bv[0]) + + if not bv: + if "av" in msg: + av = re.findall(r"(av\d+)", msg) + if not av: + return "Get value (av) failed!", False + av = av[0].replace("av", "") + else: + return "Not found av", False + + url = URL + av + try: + res = await request.get(url) + except RequestError: + return "Request failed!", False + res_data = await res.json() + data = res_data["data"] + + result = ( + f"{data['bvid']} INFO:\n" + f"Title: {data['title']}\n" + f"Link: {data['short_link']}" + ) + return result, True +
\ No newline at end of file diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index 6361ade..9e52d66 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -1,80 +1,51 @@ -import re -import json +from re import findall from random import choice +from nonebot.typing import T_State from nonebot.adapters.cqhttp import Bot, MessageEvent from nonebot.adapters.cqhttp.message import Message, MessageSegment -from nonebot.typing import T_State from ATRI.config import SauceNAO -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.exceptions import RequestError - -from .data_source import SauceNao +from ATRI.utils.limit import FreqLimiter +from .data_source import SaouceNao -__doc__ = """ -以图搜图 -权限组:所有人 -用法: - 以图搜图 (pic) -""" +_search_flmt = FreqLimiter(5) +_search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) -saucenao = sv.on_command(cmd="以图搜图", docs=__doc__, rule=is_in_service("以图搜图")) +saucenao = SaouceNao().on_command("以图搜图", "透过一张图搜索可能的来源") @saucenao.args_parser # type: ignore -async def _load_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) +async def _get_img(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() quit_list = ["算了", "罢了", "不搜了"] if msg in quit_list: await saucenao.finish("好吧...") - if not msg: await saucenao.reject("图呢?") else: - state["pic_sau"] = msg - + state["img"] = msg @saucenao.handle() -async def _sauce_nao(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _ready_search(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _search_flmt.check(user_id): + await saucenao.finish(_search_flmt_notice) + msg = str(event.message).strip() if msg: - state["pic_sau"] = msg + state["img"] = msg - [email protected]("pic_sau", prompt="图呢?") -async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["pic_sau"] - img = re.findall(r"url=(.*?)]", msg) [email protected]("img", "图呢?") +async def _deal_search(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + msg = state["img"] + img = findall(r"url=(.*?)]", msg) if not img: - await saucenao.finish("请发送图片而不是其他东西!!") - - try: - task = SauceNao(api_key=SauceNAO.key) - data = json.loads(await task.search(img[0])) - except RequestError: - raise RequestError("Request failed!") - - res = data["results"] - result = list() - for i in range(0, 3): - data = res[i] - - _result = dict() - _result["similarity"] = data["header"]["similarity"] - _result["index_name"] = data["header"]["index_name"] - _result["url"] = choice(data["data"].get("ext_urls", ["None"])) - result.append(_result) - - msg0 = f"> {MessageSegment.at(event.user_id)}" - for i in result: - msg0 = msg0 + ( - "\n——————————\n" - f"Similarity: {i['similarity']}\n" - f"Name: {i['index_name']}\n" - f"URL: {i['url'].replace('https://', '')}" - ) - - await saucenao.finish(Message(msg0)) + await saucenao.reject("请发送图片而不是其他东西!!") + + a = SaouceNao(SauceNAO.key) + result = f"> {MessageSegment.at(user_id)}" + await a.search(img[0]) + _search_flmt.start_cd(user_id) + await saucenao.finish(Message(result)) diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py index efe8fe1..d948f91 100644 --- a/ATRI/plugins/saucenao/data_source.py +++ b/ATRI/plugins/saucenao/data_source.py @@ -1,13 +1,25 @@ -from ATRI.utils.request import post_bytes +from random import choice +from aiohttp import FormData + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.exceptions import RequestError +from ATRI.utils import request, UbuntuPaste URL = "https://saucenao.com/search.php" -class SauceNao: - def __init__( - self, api_key: str, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5 - ) -> None: +__doc__ = """ +以图搜图,仅限二刺螈 +""" + + +class SaouceNao(Service): + + def __init__(self, api_key: str = None, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5): + Service.__init__(self, "以图搜图", __doc__, rule=is_in_service("以图搜图")) + params = dict() params["api_key"] = api_key params["output_type"] = output_type @@ -16,8 +28,48 @@ class SauceNao: params["db"] = db params["numres"] = numres self.params = params - - async def search(self, url: str): + + async def _request(self, url: str): self.params["url"] = url - res = await post_bytes(url=URL, params=self.params) - return res + + try: + res = await request.post(URL, params=self.params) + except RequestError: + raise RequestError("Request failed!") + data = await res.json() + return data + + async def search(self, url: str) -> str: + data = await self._request(url) + res = data["results"] + + result = list() + for i in range(len(res)): + data = res[i] + + _result = dict() + _result["similarity"] = data["header"]["similarity"] + _result["index_name"] = data["header"]["index_name"] + _result["url"] = choice(data["data"].get("ext_urls", ["None"])) + result.append(_result) + + msg0 = str() + for i in result: + msg0 += ( + "\n——————————\n" + f"Similarity: {i['similarity']}\n" + f"Name: {i['index_name']}\n" + f"URL: {i['url'].replace('https://', '')}" + ) + + if len(res) <= 3: + return msg0 + else: + data = FormData() + data.add_field("poster", "ATRI running log") + data.add_field("syntax", "text") + data.add_field("expiration", "day") + data.add_field("content", msg0) + + repo = f"\n详细请移步此处~\n{await UbuntuPaste(data).paste()}" + return repo diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py index b316020..7eb2c61 100644 --- a/ATRI/plugins/setu/__init__.py +++ b/ATRI/plugins/setu/__init__.py @@ -1,7 +1,82 @@ -import nonebot -from pathlib import Path +import re +import asyncio +from random import choice +from nonebot.adapters.cqhttp import Bot, MessageEvent, Message +from ATRI.utils.limit import FreqLimiter, DailyLimiter +from ATRI.utils.apscheduler import scheduler +from .data_source import Setu -_sub_plugins = set() -_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve())) +_setu_flmt = FreqLimiter(120) +_setu_dlmt = DailyLimiter(5) + + +random_setu = Setu().on_command("来张涩图", "来张随机涩图,冷却2分钟,每天限5张", aliases={"涩图来", "来点涩图", "来份涩图"}) + +@random_setu.handle() +async def _random_setu(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _setu_flmt.check(user_id): + await random_setu.finish() + if not _setu_dlmt.check(user_id): + await random_setu.finish() + + setu, title, p_id = await Setu().random_setu() + repo = ( + f"Title: {title}\n" + f"Pid: {p_id}" + ) + await bot.send(event, repo) + msg_1 = await bot.send(event, Message(setu)) + event_id = msg_1["message_id"] + _setu_flmt.start_cd(user_id) + _setu_dlmt.increase(user_id) + await asyncio.sleep(30) + await bot.delete_msg(message_id=event_id) + + +tag_setu = Setu().on_regex(r"来[张点丶份](.*?)的[涩色🐍]图", "根据提供的tag查找涩图") + +@tag_setu.handle() +async def _tag_setu(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _setu_flmt.check(user_id): + await random_setu.finish() + if not _setu_dlmt.check(user_id): + await random_setu.finish() + + msg = str(event.message).strip() + pattern = r"来[张点丶份](.*?)的[涩色🐍]图" + tag = re.findall(pattern, msg)[0] + setu, title, p_id, is_ok = await Setu().tag_setu(tag) + if not is_ok: + await tag_setu.finish(f"没有 {tag} 的涩图呢...") + repo_0 = ( + f"Title: {title}\n" + f"Pid: {p_id}" + ) + + await bot.send(event, repo_0) + msg_1 = await bot.send(event, Message(setu)) + event_id = msg_1["message_id"] + _setu_flmt.start_cd(user_id) + _setu_dlmt.increase(user_id) + await asyncio.sleep(30) + await bot.delete_msg(message_id=event_id) + + [email protected]_job("interval", hours=1, misfire_grace_time=60, args=[Bot]) +async def _scheduler_setu(bot): + try: + group_list = await bot.get_group_list() + lucky_group = choice(group_list) + group_id = lucky_group["group_id"] + setu = await Setu().scheduler() + msg_0 = await bot.send_group_msg(group_id=int(group_id), message=Message(setu)) + message_id = msg_0["message_id"] + await asyncio.sleep(60) + await bot.delete_msg(message_id=message_id) + + except BaseException: + pass diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py new file mode 100644 index 0000000..4070fd5 --- /dev/null +++ b/ATRI/plugins/setu/data_source.py @@ -0,0 +1,90 @@ +from random import choice +from nonebot.adapters.cqhttp import MessageSegment + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.utils import request + + +LOLICON_URL = "https://api.lolicon.app/setu/v2" +SETU_TEMP_FORMAT = "https://pixiv.cat/{p_id}.{ext}" # 为何要这样组,因为 i.pixiv.cat 不稳定! +SCHEDULER_FORMAT = """ +是{tag}哦~❤ +{setu} +""" + + +class Setu(Service): + + def __init__(self): + Service.__init__(self, "涩图", "hso!", rule=is_in_service("涩图")) + + @staticmethod + async def random_setu() -> tuple: + """ + 随机涩图. + """ + res = await request.get(LOLICON_URL) + data: dict = await res.json() + temp_data: dict = data.get("data", list())[0] + + title = temp_data.get("title", "木陰のねこ") + p_id = temp_data.get("pid", 88124144) + ext = temp_data.get("ext", "jpg") + url = SETU_TEMP_FORMAT.format( + p_id=p_id, + ext=ext + ) + setu = MessageSegment.image(url) + return setu, title, p_id + + @staticmethod + async def tag_setu(tag: str) -> tuple: + """ + 指定tag涩图. + """ + url = LOLICON_URL + f"?tag={tag}" + res = await request.get(url) + data: dict = await res.json() + + temp_data: dict = data.get("data", list())[0] + if not temp_data: + is_ok = False + is_ok = True + + title = temp_data.get("title", "木陰のねこ") + p_id = temp_data.get("pid", 88124144) + ext = temp_data.get("ext", "jpg") + url = SETU_TEMP_FORMAT.format( + p_id=p_id, + ext=ext + ) + setu = MessageSegment.image(url) + return setu, title, p_id, is_ok + + @staticmethod + async def scheduler() -> str: + """ + 每隔指定时间随机抽取一个群发送涩图. + 格式: + 是{tag}哦~❤ + {setu} + """ + res = await request.get(LOLICON_URL) + data: dict = await res.json() + temp_data: dict = data.get("data", list())[0] + + p_id = temp_data.get("pid", 88124144) + tag = choice(temp_data.get("tags", ["女孩子"])) + ext = temp_data.get("ext", "jpg") + + url = SETU_TEMP_FORMAT.format( + p_id=p_id, + ext=ext + ) + setu = MessageSegment.image(url) + repo = SCHEDULER_FORMAT.format( + tag=tag, + setu=setu + ) + return repo diff --git a/ATRI/plugins/setu/modules/data_source.py b/ATRI/plugins/setu/modules/data_source.py deleted file mode 100644 index bda7363..0000000 --- a/ATRI/plugins/setu/modules/data_source.py +++ /dev/null @@ -1,181 +0,0 @@ -import os -import json -import string -import aiosqlite -from aiosqlite.core import Connection -from pathlib import Path -from random import sample, choice -from aiohttp import ClientSession -from nonebot.adapters.cqhttp.message import MessageSegment, Message - -from ATRI.log import logger as log -from ATRI.config import NsfwCheck -from ATRI.exceptions import RequestError, WriteError -from ATRI.utils.request import get_bytes -from ATRI.utils.img import compress_image - - -TEMP_DIR: Path = Path(".") / "ATRI" / "data" / "temp" / "setu" -SETU_DIR = Path(".") / "ATRI" / "data" / "database" / "setu" -os.makedirs(TEMP_DIR, exist_ok=True) -os.makedirs(SETU_DIR, exist_ok=True) -NSFW_URL = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url=" -SIZE_REDUCE: bool = True - - -class Hso: - @staticmethod - async def nsfw_check(url: str) -> float: - url = NSFW_URL + url - try: - data = json.loads(await get_bytes(url)) - except RequestError: - raise RequestError("Request failed!") - return round(data["score"], 4) - - @staticmethod - async def _comp_setu(url: str) -> str: - temp_id = "".join(sample(string.ascii_letters + string.digits, 8)) - file = TEMP_DIR / f"{temp_id}.png" - - try: - async with ClientSession() as session: - async with session.get(url) as r: - data = await r.read() - except RequestError: - raise RequestError("Request img failed!") - - try: - with open(file, "wb") as r: - r.write(data) - except WriteError: - raise WriteError("Writing img failed!") - - return compress_image(os.path.abspath(file)) - - @classmethod - async def setu(cls, data: dict) -> str: - pid = data["pid"] - title = data["title"] - if SIZE_REDUCE: - img = MessageSegment.image( - "file:///" + await cls._comp_setu(data["url"]), proxy=False - ) - else: - img = MessageSegment.image(data["url"], proxy=False) - - msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}" - return msg - - @classmethod - async def acc_setu(cls, d: list) -> str: - data: dict = choice(d) - - for i in data["tags"]: - if i["name"] == "R-18": - return "太涩了不方便发w" - - pid = data["id"] - title = data["title"] - try: - pic = data["meta_single_page"]["original_image_url"].replace( - "pximg.net", "pixiv.cat" - ) - except Exception: - pic = choice(data["meta_pages"])["original"]["image_urls"].replace( - "pximg.net", "pixiv.cat" - ) - if SIZE_REDUCE: - img = MessageSegment.image( - "file:///" + await cls._comp_setu(pic), proxy=False - ) - else: - img = MessageSegment.image(pic, proxy=False) - - msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}" - return msg - - -class SetuData: - SETU_DATA = SETU_DIR / "setu.db" - - @classmethod - async def _check_database(cls) -> bool: - if not cls.SETU_DATA.exists(): - log.warning(f"未发现数据库\n-> {cls.SETU_DATA}\n将开始创建") - async with aiosqlite.connect(cls.SETU_DATA) as db: - cur = await db.cursor() - await cur.execute( - """ - CREATE TABLE setu( - pid PID, title TITLE, tags TAGS, - user_id USER_ID, user_name USER_NAME, - user_account USER_ACCOUNT, url URL, - UNIQUE( - pid, title, tags, user_id, - user_name, user_account, url - ) - ); - """ - ) - await db.commit() - log.warning(f"...创建数据库\n-> {cls.SETU_DATA}\n完成!") - return True - return True - - @classmethod - async def add_data(cls, d: dict) -> None: - data = ( - d["pid"], - d["title"], - d["tags"], - d["user_id"], - d["user_name"], - d["user_account"], - d["url"], - ) - - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - await db.execute( - """ - INSERT INTO setu( - pid, title, tags, user_id, - user_name, user_account, url - ) VALUES( - ?, ?, ?, ?, ?, ?, ? - ); - """, - data, - ) - await db.commit() - - @classmethod - async def del_data(cls, pid: int) -> None: - if not isinstance(pid, int): # 防注入 - raise ValueError("Please provide int.") - - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - await db.execute(f"DELETE FROM setu WHERE pid = {str(pid)};") - await db.commit() - - @classmethod - async def count(cls): - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - async with db.execute("SELECT * FROM setu") as cursor: - return len(await cursor.fetchall()) # type: ignore - - @classmethod - async def get_setu(cls): - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - async with db.execute( - "SELECT * FROM setu ORDER BY RANDOM() limit 1;" - ) as cursor: - return await cursor.fetchall() diff --git a/ATRI/plugins/setu/modules/main_setu.py b/ATRI/plugins/setu/modules/main_setu.py deleted file mode 100644 index a82ef83..0000000 --- a/ATRI/plugins/setu/modules/main_setu.py +++ /dev/null @@ -1,186 +0,0 @@ -import re -import json -from random import choice, random - -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.utils.request import get_bytes, post_bytes -from ATRI.utils.limit import is_too_exciting -from ATRI.config import Setu, BotSelfConfig -from ATRI.exceptions import RequestError - -from .data_source import Hso, SIZE_REDUCE, SetuData - - -LOLICON_URL: str = "https://api.lolicon.app/setu/" -PIXIV_URL: str = ( - "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word=" -) -R18_ENABLED: int = 0 -USE_LOCAL_DATA: bool = False -MIX_LOCAL_DATA: bool = False - - -setu = sv.on_regex( - r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]", rule=is_in_service("setu") -) - - -async def _setu(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 3, hours=1) - if not check: - return - - await bot.send(event, "别急,在找了!") - params = {"apikey": Setu.key, "r18": str(R18_ENABLED), "size1200": "true"} - try: - data = json.loads(await post_bytes(LOLICON_URL, params))["data"][0] - except RequestError: - raise RequestError("Request failed!") - - check = await Hso.nsfw_check(data["url"]) - score = "{:.2%}".format(check, 4) - - if not MIX_LOCAL_DATA: - if USE_LOCAL_DATA: - data = choice(await SetuData.get_setu()) # type: ignore - data = {"pid": data[0], "title": data[1], "url": data[6]} - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if check >= 0.9: - if random() <= 0.2: - repo = "我找到图了,但我发给主人了❤\n" f"涩值:{score}" - await bot.send(event, repo) - msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if random() <= 0.5: - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - data = choice(await SetuData.get_setu()) # type: ignore - data = {"pid": data[0], "title": data[1], "url": data[6]} - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - - -key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service("setu")) - - -@key_setu.handle() -async def _key_setu(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 10, hours=1) - if not check: - await setu.finish("休息一下吧❤") - - await bot.send(event, "别急,在找了!") - msg = str(event.message).strip() - tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0] - URL = PIXIV_URL + tag - - try: - data = json.loads(await get_bytes(URL))["illusts"] - except RequestError: - raise RequestError("Request msg failed!") - - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.acc_setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.acc_setu(data))) - - -__doc__ = """ -涩图设置 -权限组:维护者 -用法: - 涩图设置 启用/禁用r18 - 涩图设置 启用/禁用压缩 - 涩图设置 启用/禁用本地涩图 - 涩图设置 启用/禁用混合本地涩图 -""" - -setu_config = sv.on_command(cmd="涩图设置", docs=__doc__, permission=SUPERUSER) - - -@setu_config.handle() -async def _setu_config(bot: Bot, event: MessageEvent) -> None: - global R18_ENABLED, SIZE_REDUCE, USE_LOCAL_DATA, MIX_LOCAL_DATA - msg = str(event.message).split(" ") - if msg[0] == "": - repo = "可用设置如下:\n启用/禁用r18\n启用/禁用压缩\n启用/禁用本地涩图\n启用/禁用混合本地涩图" - await setu_config.finish(repo) - elif msg[0] == "启用r18": - R18_ENABLED = 1 - await setu_config.finish("已启用r18") - elif msg[0] == "禁用r18": - R18_ENABLED = 0 - await setu_config.finish("已禁用r18") - elif msg[0] == "启用压缩": - SIZE_REDUCE = True - await setu_config.finish("已启用图片压缩") - elif msg[0] == "禁用压缩": - SIZE_REDUCE = False - await setu_config.finish("已禁用图片压缩") - elif msg[0] == "启用本地涩图": - USE_LOCAL_DATA = True - await setu_config.finish("已启用本地涩图") - elif msg[0] == "禁用本地涩图": - USE_LOCAL_DATA = False - await setu_config.finish("已禁用本地涩图") - elif msg[0] == "启用混合本地涩图": - MIX_LOCAL_DATA = True - await setu_config.finish("已启用混合本地涩图") - elif msg[0] == "禁用混合本地涩图": - MIX_LOCAL_DATA = False - await setu_config.finish("已禁用混合本地涩图") - else: - await setu_config.finish("阿!请检查拼写") - - -not_get_se = sv.on_command("不够涩") - - -@not_get_se.handle() -async def _not_se(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 1, 120) - if check: - msg = choice(["那你来发", "那你来发❤"]) - await not_get_se.finish(msg) diff --git a/ATRI/plugins/setu/modules/scheduler.py b/ATRI/plugins/setu/modules/scheduler.py deleted file mode 100644 index e66c398..0000000 --- a/ATRI/plugins/setu/modules/scheduler.py +++ /dev/null @@ -1,15 +0,0 @@ -import shutil -from ATRI.log import logger as log -from ATRI.utils.apscheduler import scheduler - -from .data_source import TEMP_DIR - - [email protected]_job("interval", days=7, misfire_grace_time=10) -async def clear_temp(): - log.info("正在清除涩图缓存") - try: - shutil.rmtree(TEMP_DIR) - log.info("清除缓存成功!") - except Exception: - log.warn("清除图片缓存失败!") diff --git a/ATRI/plugins/setu/modules/store.py b/ATRI/plugins/setu/modules/store.py deleted file mode 100644 index e278c77..0000000 --- a/ATRI/plugins/setu/modules/store.py +++ /dev/null @@ -1,136 +0,0 @@ -import json -from random import choice - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment - -from ATRI.service import Service as sv -from ATRI.utils.request import get_bytes -from ATRI.exceptions import RequestError - -from .data_source import SetuData - - -API_URL: str = "https://api.kyomotoi.moe/api/pixiv/illust?id=" - - -__doc__ = """ -为本地添加涩图! -权限组:维护者 -用法: - 添加涩图 (pid) -补充: - pid: Pixiv 作品id -""" - - -add_setu = sv.on_command(cmd="添加涩图", docs=__doc__, permission=SUPERUSER) - - -@add_setu.args_parser # type: ignore -async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await add_setu.finish("好吧...") - if not msg: - await add_setu.reject("涩图(pid)速发!") - else: - state["setu_add"] = msg - - -@add_setu.handle() -async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["setu_add"] = msg - - -@add_setu.got("setu_add", prompt="涩图(pid)速发!") -async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - pid = state["setu_add"] - - URL = API_URL + pid - try: - data = json.loads(await get_bytes(URL))["illust"] - except RequestError: - raise RequestError("Request failed!") - - try: - pic = data["meta_single_page"]["original_image_url"].replace( - "pximg.net", "pixiv.cat" - ) - except Exception: - pic = choice(data["meta_pages"])["image_urls"]["original"].replace( - "pximg.net", "pixiv.cat" - ) - - d = { - "pid": pid, - "title": data["title"], - "tags": str(data["tags"]), - "user_id": data["user"]["id"], - "user_name": data["user"]["name"], - "user_account": data["user"]["account"], - "url": pic, - } - await SetuData.add_data(d) - - show_img = data["image_urls"]["medium"].replace("pximg.net", "pixiv.cat") - msg = ( - "好欸!是新涩图:\n" - f"Pid: {pid}\n" - f"Title: {data['title']}\n" - f"{MessageSegment.image(show_img)}" - ) - await add_setu.finish(Message(msg)) - - -__doc__ = """ -删除涩图! -权限组:维护者 -用法: - 删除涩图 (pid) -补充: - pid: Pixiv 作品id -""" - - -del_setu = sv.on_command(cmd="删除涩图", docs=__doc__, permission=SUPERUSER) - - -@del_setu.args_parser # type: ignore -async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await add_setu.finish("好吧...") - if not msg: - await add_setu.reject("涩图(pid)速发!") - else: - state["setu_del"] = msg - - -@del_setu.handle() -async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["setu_del"] = msg - - -@del_setu.got("setu_del", prompt="涩图(pid)速发!") -async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - pid = int(state["setu_del"]) - await SetuData.del_data(pid) - await del_setu.finish(f"涩图({pid})已删除...") - - -count_setu = sv.on_command(cmd="涩图总量", permission=SUPERUSER) - - -@count_setu.handle() -async def _count_setu(bot: Bot, event: MessageEvent) -> None: - msg = f"咱本地搭载了 {await SetuData.count()} 张涩图!" - await count_setu.finish(msg) diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py deleted file mode 100644 index c49570b..0000000 --- a/ATRI/plugins/status.py +++ /dev/null @@ -1,115 +0,0 @@ -import time -import psutil -from datetime import datetime -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.log import logger as log -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.exceptions import GetStatusError -from ATRI.utils.apscheduler import scheduler -from ATRI.config import BotSelfConfig - - -__doc__ = """ -测试机器人状态 -权限组:所有人 -用法: - /ping -""" - -ping = sv.on_command(cmd="/ping", docs="测试机器人", rule=is_in_service("ping")) - - -async def _ping(bot: Bot, event: MessageEvent) -> None: - await ping.finish("I'm fine.") - - -__doc__ = """ -检查机器人性能 -权限组:所有人 -用法: - /status -""" - -status = sv.on_command(cmd="/status", docs=__doc__, rule=is_in_service("status")) - - -async def _status(bot: Bot, event: MessageEvent) -> None: - try: - cpu = psutil.cpu_percent(interval=1) - mem = psutil.virtual_memory().percent - disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore - now = time.time() - boot = psutil.boot_time() - up_time = str( - datetime.utcfromtimestamp(now).replace(microsecond=0) - - datetime.utcfromtimestamp(boot).replace(microsecond=0) - ) - except GetStatusError: - raise GetStatusError("Failed to get status.") - - msg = "アトリは、高性能ですから!" - - if cpu > 90: # type: ignore - msg = "咱感觉有些头晕..." - if mem > 90: - msg = "咱感觉有点头晕并且有点累..." - elif mem > 90: - msg = "咱感觉有点累..." - elif disk > 90: - msg = "咱感觉身体要被塞满了..." - - msg0 = ( - "Self status:\n" - f"* CPU: {cpu}%\n" - f"* MEM: {mem}%\n" - f"* DISK: {disk}%\n" - f"* netSENT: {inteSENT}MB\n" - f"* netRECV: {inteRECV}MB\n" - f"* Runtime: {up_time}\n" - ) + msg - - await status.finish(msg0) - - [email protected]_job("interval", minutes=5, misfire_grace_time=10) -async def _(): - log.info("开始自检") - try: - cpu = psutil.cpu_percent(interval=1) - mem = psutil.virtual_memory().percent - disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore - except GetStatusError: - raise GetStatusError("Failed to get status.") - - msg = str() - if cpu > 90: # type: ignore - msg = "咱感觉有些头晕..." - if mem > 90: - msg = "咱感觉有点头晕并且有点累..." - elif mem > 90: - msg = "咱感觉有点累..." - elif disk > 90: - msg = "咱感觉身体要被塞满了..." - else: - log.info("运作正常") - return - - msg0 = ( - "Self status:\n" - f"* CPU: {cpu}%\n" - f"* MEM: {mem}%\n" - f"* DISK: {disk}%\n" - f"* netSENT: {inteSENT}MB\n" - f"* netRECV: {inteRECV}MB\n" - ) + msg - - for sup in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=sup, message=msg0) diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py new file mode 100644 index 0000000..8955cb1 --- /dev/null +++ b/ATRI/plugins/status/__init__.py @@ -0,0 +1,26 @@ +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.utils.apscheduler import scheduler +from .data_source import IsSurvive + + +ping = IsSurvive().on_command("/ping", "检测bot简单信息处理速度") + +async def _ping(bot: Bot, event: MessageEvent): + await ping.finish(IsSurvive.ping()) + + +status = IsSurvive().on_command("/status", "查看运行资源占用") + +async def _status(bot: Bot, event: MessageEvent): + msg, _ = IsSurvive.get_status() + await status.finish(msg) + + [email protected]_job("interval", minutes=10, misfire_grace_time=15) +async def _status_checking(): + msg, stat = IsSurvive().get_status() + if not stat: + await status.finish(msg) diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py new file mode 100644 index 0000000..c353313 --- /dev/null +++ b/ATRI/plugins/status/data_source.py @@ -0,0 +1,71 @@ +import time +import psutil +from datetime import datetime + +from ATRI.service import Service +from ATRI.log import logger as log +from ATRI.rule import is_in_service +from ATRI.exceptions import GetStatusError + + +__doc__ = """ +检查咱自身状态 +""" + + +class IsSurvive(Service): + + def __init__(self): + Service.__init__(self, "状态", __doc__, rule=is_in_service("状态")) + + @staticmethod + def ping() -> str: + return "I'm fine." + + @staticmethod + def get_status(): + log.info("开始检查资源消耗...") + try: + cpu = psutil.cpu_percent(interval=1) + mem = psutil.virtual_memory().percent + disk = psutil.disk_usage("/").percent + inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore + inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + + now = time.time() + boot = psutil.boot_time() + up_time = str( + datetime.utcfromtimestamp(now).replace(microsecond=0) + - datetime.utcfromtimestamp(boot).replace(microsecond=0) + ) + except GetStatusError: + raise GetStatusError("Failed to get status.") + + msg = "アトリは、高性能ですから!" + if cpu > 90: # type: ignore + msg = "咱感觉有些头晕..." + is_ok = False + if mem > 90: + msg = "咱感觉有点头晕并且有点累..." + is_ok = False + elif mem > 90: + msg = "咱感觉有点累..." + is_ok = False + elif disk > 90: + msg = "咱感觉身体要被塞满了..." + is_ok = False + else: + log.info("资源占用正常") + is_ok = True + + msg0 = ( + "Self status:\n" + f"* CPU: {cpu}%\n" + f"* MEM: {mem}%\n" + f"* DISK: {disk}%\n" + f"* netSENT: {inteSENT}MB\n" + f"* netRECV: {inteRECV}MB\n" + f"* Runtime: {up_time}\n" + ) + msg + + return msg0, is_ok diff --git a/ATRI/plugins/util/__init__.py b/ATRI/plugins/util/__init__.py new file mode 100644 index 0000000..1e31bff --- /dev/null +++ b/ATRI/plugins/util/__init__.py @@ -0,0 +1,134 @@ +import re +from random import choice, random + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.utils.limit import FreqLimiter +from .data_source import Encrypt, Utils, Yinglish + + +roll = Utils().on_command("/roll", "骰子~用法:1d10 或 2d10+2d10+more") + [email protected]_parser # type: ignore +async def _get_roll(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await roll.finish("好吧...") + if not msg: + await roll.reject("参数呢?!格式:1d10 或 2d10+2d10+more") + else: + state["roll"] = msg + +async def _ready_roll(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["roll"] = msg + [email protected]("roll", "参数呢?!格式:1d10 或 2d10+2d10+more") +async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State): + text = state["roll"] + match = re.match(r"^([\dd+\s]+?)$", text) + + if not match: + await roll.finish("阿——!参数不对!格式:1d10 或 2d10+2d10+more") + + msg = Utils().roll_dice(text) + await roll.finish(msg) + + +encrypt_en = Utils().on_command("加密", "我们之前的秘密❤") + +@encrypt_en.args_parser # type: ignore +async def _get_encr_en_text(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await roll.finish("好吧...") + if not msg: + await roll.reject("内容呢?!") + else: + state["encr_en_text"] = msg + +@encrypt_en.handle() +async def _ready_en(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["encr_en_text"] = msg + +@encrypt_en.got("encr_en_text", "内容呢?!") +async def _deal_en(bot: Bot, event: MessageEvent, state: T_State): + text = state["encr_en_text"] + is_ok = len(text) + if is_ok < 10: + await encrypt_en.reject("太短不加密!") + en = Encrypt() + result = en.encode(text) + await encrypt_en.finish(result) + + +encrypt_de = Utils().on_command("解密", "解开我们的秘密❤") + +@encrypt_de.args_parser # type: ignore +async def _get_encr_de_text(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await encrypt_de.finish("好吧...") + if not msg: + await encrypt_de.reject("内容呢?!") + else: + state["encr_de_text"] = msg + +@encrypt_de.handle() +async def _ready_de(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["encr_de_text"] = msg + +@encrypt_de.got("encr_de_text", "内容呢?!") +async def _deal_de(bot: Bot, event: MessageEvent, state: T_State): + text = state["encr_de_text"] + en = Encrypt() + result = en.decode(text) + await encrypt_de.finish(result) + + +sepi = Utils().on_command("涩批一下", "将正常的句子涩一涩~") + +_sepi_flmt = FreqLimiter(3) +_sepi_flmt_notice = ["涩批爬", "✌🥵✌"] + [email protected]_parser # type: ignore +async def _get_sepi(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了", "取消"] + if msg in quit_list: + await sepi.finish("好吧...") + if not msg: + await sepi.reject("内容呢?!") + else: + state["sepi_text"] = msg + +async def _ready_sepi(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _sepi_flmt.check(user_id): + await sepi.finish(choice(_sepi_flmt_notice)) + + msg = str(event.message).strip() + if msg: + state["sepi_text"] = msg + [email protected]("sepi_text", "内容呢?!") +async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + msg = state["sepi_text"] + if len(msg) < 4: + await sepi.finish("这么短?涩不起来!") + + result = Yinglish.deal(msg, random()) + _sepi_flmt.start_cd(user_id) + await sepi.finish(result) diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/util/data_source.py index 92ae930..a6afdf3 100644 --- a/ATRI/plugins/utils/data_source.py +++ b/ATRI/plugins/util/data_source.py @@ -4,40 +4,58 @@ import jieba.posseg as pseg from typing import Union, Optional from random import random, choice, randint +from ATRI.service import Service +from ATRI.rule import is_in_service -def roll_dice(par: str) -> str: - result = 0 - proc = "" - proc_list = [] - p = par.split("+") - for i in p: - args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i) - args = list(args[0]) +__doc__ = """ +非常实用(?)的工具们! +""" - args[0] = args[0] or 1 - if int(args[0]) >= 5000 or int(args[2]) >= 5000: - return "阿...好大......" - - for a in range(1, int(args[0]) + 1): - rd = randint(1, int(args[2])) - result = result + rd - - if len(proc_list) <= 10: - proc_list.append(rd) - - if len(proc_list) <= 10: - proc += "+".join(map(str, proc_list)) - elif len(proc_list) > 10: - proc += "太长了不展示了就酱w" - else: - proc += str(result) - - result = f"{par}=({proc})={result}" - return result +class Utils(Service): + + def __init__(self): + Service.__init__(self, "小工具", __doc__, rule=is_in_service("小工具")) + + @staticmethod + def roll_dice(par: str) -> str: + result = 0 + proc = "" + proc_list = [] + p = par.split("+") + + for i in p: + args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i) + args = list(args[0]) + + args[0] = args[0] or 1 + if int(args[0]) >= 5000 or int(args[2]) >= 5000: + return "阿...好大......" + + for a in range(1, int(args[0]) + 1): + rd = randint(1, int(args[2])) + result = result + rd + + if len(proc_list) <= 10: + proc_list.append(rd) + + if len(proc_list) <= 10: + proc += "+".join(map(str, proc_list)) + elif len(proc_list) > 10: + proc += "太长了不展示了就酱w" + else: + proc += str(result) -class Encrypt: + result = f"{par}=({proc})={result}" + return result + +class Encrypt(Utils): + """ + 某nb改的(逃 + 总之就是非常nb + """ + cr = "ĀāĂ㥹ÀÁÂÃÄÅ" cc = "ŢţŤťŦŧṪṫṬṭṮṯṰṱ" cn = "ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr" @@ -168,9 +186,9 @@ class Encrypt: return self._decodeBytes(s).decode(encoding) except UnicodeDecodeError: raise ValueError("Decoding failed") - - -class Yinglish: + +class Yinglish(Utils): + @staticmethod def _to_ying(x, y, ying) -> str: if random() > ying: diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py deleted file mode 100644 index 608f1f5..0000000 --- a/ATRI/plugins/utils/__init__.py +++ /dev/null @@ -1,120 +0,0 @@ -import re -from random import random - -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from .data_source import roll_dice, Encrypt, Yinglish - - -__doc__ = """ -roll一下 -权限组:所有人 -用法: - /roll (int)d(int)+... -补充: - int: 阿拉伯数字 -示例: - /roll 1d10+10d9+4d5+2d3 -""" - -roll = sv.on_command(cmd="/roll", docs=__doc__, rule=is_in_service("roll")) - - [email protected]_parser # type: ignore -async def _load_roll(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await roll.finish("好吧...") - if not msg: - await roll.reject("点呢?(1d10+...)") - else: - state["resu"] = msg - - -async def _roll(bot: Bot, event: MessageEvent, state: T_State) -> None: - args = str(event.message).strip() - if args: - state["resu"] = args - - [email protected]("resu", prompt="roll 参数不能为空~!\ndemo:1d10 或 2d10+2d10") -async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State) -> None: - resu = state["resu"] - match = re.match(r"^([\dd+\s]+?)$", resu) - - if not match: - await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10") - - await roll.finish(roll_dice(resu)) - - -__doc__ = """ -加密传输(bushi -权限组:所有人 -用法: - /enc e,d msg -补充: - e,d:对应 编码/解码 - msg: 目标内容 -示例: - /enc e アトリは高性能ですから! -""" - -encrypt = sv.on_command(cmd="/enc", docs=__doc__, rule=is_in_service("enc")) - - -async def _encrypt(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - _type = msg[0] - s = msg[1] - e = Encrypt() - - if _type == "e": - await encrypt.finish(e.encode(s)) - elif _type == "d": - await encrypt.finish(e.decode(s)) - else: - await encrypt.finish("请检查输入~!") - - -__doc__ = """ -涩批一下! -权限组:所有人 -用法: - 涩批一下 (msg) -""" - -sepi = sv.on_command(cmd="涩批一下", docs=__doc__, rule=is_in_service("涩批一下")) - - -async def _load_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await sepi.finish("好吧...") - if not msg: - await sepi.reject("话呢?") - else: - state["sepi_msg"] = msg - - -async def _sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["sepi_msg"] = msg - - [email protected]("sepi_msg", prompt="话呢?") -async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["sepi_msg"] - if len(msg) < 4: - await sepi.finish("这么短?涩不起来!") - await sepi.finish(Yinglish.deal(msg, random())) diff --git a/ATRI/plugins/wife/__init__.py b/ATRI/plugins/wife/__init__.py deleted file mode 100644 index 2b6f6a3..0000000 --- a/ATRI/plugins/wife/__init__.py +++ /dev/null @@ -1,116 +0,0 @@ -import re -import asyncio -from random import choice - -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import ( - Bot, - MessageEvent, - GroupMessageEvent, - PrivateMessageEvent, -) - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.utils.limit import is_too_exciting - -from .data_source import Tsuma - - -__doc__ = """ -好欸!是老婆! -权限组:所有人 -用法: - 抽老婆 # 获取一位老婆 - 查老婆 # 查询老婆,如+at对象可查询对方 - 我要离婚 # 离婚... -""" - -roll_wife = sv.on_command(cmd="抽老婆", docs=__doc__, rule=is_in_service("抽老婆")) - - -@roll_wife.handle() -async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None: - user = event.user_id - gender = event.sender.sex - group = event.group_id - user_name = await bot.get_group_member_info(group_id=group, user_id=user) - user_name = user_name["nickname"] - run = is_too_exciting(user, 1, seconds=5) - if not run: - return - - check_repo, if_h = Tsuma.check_tsuma(str(user)) - if if_h: - await roll_wife.finish(check_repo) - - msg = "5秒后咱将随机抽取一位群友成为\n" f"{user_name} 的老婆!究竟是谁呢~?" - await bot.send(event, msg) - await asyncio.sleep(5) - - async def get_luck_user(): - luck_list = await bot.get_group_member_list(group_id=group) - return choice(luck_list) - - while True: - luck_user = await get_luck_user() - luck_qq = luck_user["user_id"] - if user != luck_qq: - break - - luck_gender = luck_user["sex"] - luck_user = luck_user["nickname"] - d = { - "nickname": user_name, - "gender": gender, - "lassie": {"nickname": luck_user, "qq": luck_qq, "gender": luck_gender}, - } - - if str(luck_qq) == str(event.self_id): - Tsuma.got_tsuma(str(user), d) - msg = "老婆竟是我自己~❤" - else: - msg = Tsuma.got_tsuma(str(user), d) - - await roll_wife.finish(msg) - - -@roll_wife.handle() -async def _no_pr(bot: Bot, event: PrivateMessageEvent) -> None: - await roll_wife.finish("对8起...该功能只对群聊开放(") - - -inquire_wife = sv.on_command(cmd="查老婆", rule=is_in_service("抽老婆")) - - -@inquire_wife.handle() -async def _inq_wife(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - if msg[0] == "": - user = str(event.user_id) - await inquire_wife.finish(Tsuma.inquire_tsuma(user)) - else: - aim = re.findall(r"qq=(.*?)]", msg[0])[0] - await inquire_wife.finish(Tsuma.inquire_tsuma(aim).replace("你", "ta")) - - -want_divorce = sv.on_command(cmd="我要离婚", rule=is_in_service("抽老婆")) - - -@want_divorce.handle() -async def _want_div(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["is_d"] = msg - - -@want_divorce.got("is_d", prompt="你确定吗?(是/否)") -async def _deal_div(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["is_d"] - user = str(event.user_id) - name = event.sender.nickname - - if msg in ["是", "确定"]: - await want_divorce.finish(Tsuma.divorce(user)) - else: - await want_divorce.finish(f"({name})回心转意了!") diff --git a/ATRI/plugins/wife/data_source.py b/ATRI/plugins/wife/data_source.py deleted file mode 100644 index 9665351..0000000 --- a/ATRI/plugins/wife/data_source.py +++ /dev/null @@ -1,87 +0,0 @@ -import os -import json -from pathlib import Path - - -WIFE_DIR = Path(".") / "ATRI" / "data" / "database" / "wife" -MERRY_LIST_PATH = WIFE_DIR / "merry_list.json" -os.makedirs(WIFE_DIR, exist_ok=True) - - -class Tsuma: - @staticmethod - def _load_tsuma() -> dict: - try: - return json.loads(MERRY_LIST_PATH.read_bytes()) - except FileNotFoundError: - with open(MERRY_LIST_PATH, "w") as r: - r.write(json.dumps({}, indent=4)) - return dict() - - @staticmethod - def _store_tsuma(data: dict) -> None: - with open(MERRY_LIST_PATH, "w") as r: - r.write(json.dumps(data, indent=4)) - - @classmethod - def check_tsuma(cls, user: str): - data = cls._load_tsuma() - if user in data: - msg = "阿,你已经有老婆惹!" f"ta是:{data[user]['lassie']['nickname']}" - return msg, True - else: - return "悲——你还没老婆...", False - - @classmethod - def inquire_tsuma(cls, user: str) -> str: - data = cls._load_tsuma() - if user in data: - return f"你的老婆是:{data[user]['lassie']['nickname']} 哦~❤" - else: - return "悲——你还没老婆..." - - @classmethod - def got_tsuma(cls, user: str, d: dict) -> str: - check_repo, if_h = cls.check_tsuma(user) # 防止出现多人同时操作导致 NTR 事件 - if if_h: - return check_repo - else: - data = cls._load_tsuma() - data[user] = { - "nickname": d["nickname"], - "gender": d["gender"], - "lassie": { - "nickname": d["lassie"]["nickname"], - "qq": d["lassie"]["qq"], - "gender": d["lassie"]["gender"], - }, - } - cls._store_tsuma(data) - - data[d["lassie"]["qq"]] = { - "nickname": d["lassie"]["nickname"], - "gender": d["lassie"]["gender"], - "lassie": { - "nickname": d["nickname"], - "qq": user, - "gender": d["gender"], - }, - } - cls._store_tsuma(data) - - msg = ( - f"> {d['lassie']['nickname']}({d['lassie']['qq']})\n" - f"恭喜成为 {d['nickname']} 的老婆~⭐" - ) - return msg - - @classmethod - def divorce(cls, user: str) -> str: - data = cls._load_tsuma() - if not user in data: - return "悲——你还没老婆。。" - - msg = f"悲——,({data[user]['nickname']})抛弃了({data[user]['lassie']['nickname']})" - del data[user] - cls._store_tsuma(data) - return msg diff --git a/ATRI/rule.py b/ATRI/rule.py index 907a65d..3d75f84 100644 --- a/ATRI/rule.py +++ b/ATRI/rule.py @@ -1,17 +1,24 @@ +from nonebot.adapters.cqhttp.event import PrivateMessageEvent from nonebot.rule import Rule -from nonebot.adapters.cqhttp import GroupMessageEvent, PokeNotifyEvent +from nonebot.adapters.cqhttp import MessageEvent, GroupMessageEvent -from .service import Service as sv +from .service import ServiceTools def is_in_service(service: str) -> Rule: async def _is_in_service(bot, event, state) -> bool: - user = str(event.user_id) - if isinstance(event, GroupMessageEvent): - return sv.auth_service(service, user, str(event.group_id)) + if isinstance(event, PrivateMessageEvent): + user_id = event.get_user_id() + result = ServiceTools().auth_service(service, user_id) + return result + elif isinstance(event, GroupMessageEvent): + user_id = event.get_event_name() + group_id = str(event.group_id) + result = ServiceTools().auth_service(service, user_id, group_id) + return result else: - return sv.auth_service(service, user, None) - + return True + return Rule(_is_in_service) @@ -20,10 +27,3 @@ def to_bot() -> Rule: return event.is_tome() return Rule(_to_bot) - - -def poke(bot, event: PokeNotifyEvent, state): - if event.is_tome(): - return True - else: - return False diff --git a/ATRI/service.py b/ATRI/service.py index 989179b..4798fd5 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -2,17 +2,15 @@ import os import re import json from pathlib import Path -from datetime import datetime -from typing import Dict, Any, List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING +from pydantic import BaseModel +from typing import List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING + from nonebot.matcher import Matcher from nonebot.permission import Permission -from nonebot.plugin import on_message from nonebot.typing import T_State, T_Handler, T_RuleChecker from nonebot.rule import Rule, command, keyword, regex -from .log import logger as log -from .config import NetworkPost -from .utils.request import post_bytes +from ATRI.exceptions import ReadFileError, WriteError if TYPE_CHECKING: from nonebot.adapters import Bot, Event @@ -24,148 +22,168 @@ os.makedirs(SERVICE_DIR, exist_ok=True) os.makedirs(SERVICES_DIR, exist_ok=True) -is_sleep: bool = False -matcher_list: list = [] - - -def _load_block_list() -> dict: - file_name = "ban.json" - file = SERVICE_DIR / file_name - try: - data = json.loads(file.read_bytes()) - except: - data = {"user": {}, "group": {}} - with open(file, "w") as r: - r.write(json.dumps(data, indent=4)) - return data - - -def _save_block_list(data: dict) -> None: - file_name = "ban.json" - file = SERVICE_DIR / file_name - with open(file, "w") as r: - r.write(json.dumps(data, indent=4)) +class ServiceInfo(BaseModel): + service: str + docs: str + cmd_list: dict + enabled: bool + only_admin: bool + disable_user: list + disable_group: list -def _load_service_config(service: str, docs: str = None) -> dict: - file_name = service.replace("/", "") + ".json" - file = SERVICES_DIR / file_name - try: - data = json.loads(file.read_bytes()) - except: - service_info = { - "command": service, - "docs": docs, - "enabled": True, - "disable_user": {}, - "disable_group": {}, - } - with open(file, "w") as r: - r.write(json.dumps(service_info, indent=4)) - data = service_info - return data - - -def _save_service_config(service: str, data: dict) -> None: - file_name = service.replace("/", "") + ".json" - file = SERVICES_DIR / file_name - with open(file, "w") as r: - r.write(json.dumps(data, indent=4)) +class CommandInfo(BaseModel): + type: str + docs: str + aliases: list class Service: """ 集成一套服务管理,对功能信息进行持久化 - 计划搭配前端使用 + 服务文件结构: + { + "service": "Service name", + "docs": "Main helps and commands", + "cmd_list": { + "/cmd0": { + "type": "Command type", + "docs": "Command help", + "aliases": ["More trigger ways."] + } + }, + "enabled": True, + "only_admin": False, + "disable_user": [], + "disable_group": [] + } """ - - @staticmethod - def manual_reg_service(service: str, docs: str = None): - file_name = service.replace("/", "") + ".json" - file = SERVICES_DIR / file_name - service_info = { - "command": service, - "docs": docs, - "enabled": True, - "disable_user": {}, - "disable_group": {}, - } - with open(file, "w") as r: - r.write(json.dumps(service_info, indent=4)) - - @staticmethod - def auth_service(service: str, user: str, group: str = None) -> bool: - data = _load_service_config(service) - if user in data["disable_user"]: - return False - else: - if group in data["disable_group"]: - return False - else: - return True - - @staticmethod - def control_service( - service: str, - is_global: bool, - is_enabled: int, - user: str = None, - group: str = None, - ) -> None: - data = _load_service_config(service) - is_enabled = bool(is_enabled) - - if is_global: - status = "disabled" if is_enabled else "enabled" - data["enabled"] = is_enabled - log.info(f"\033[33mService: {service} has been {status}.\033[33m") - else: - if user: - if not is_enabled: - data["disable_user"][user] = str(datetime.now()) - log.info( - f"\033[33mNew service blocked user: {user}\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) - else: - if user in data["disable_user"]: - del data["disable_user"][user] - log.info( - f"\033[33mUser: {user} has been unblock\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) - else: - if not is_enabled: - data["disable_group"][group] = str(datetime.now()) - log.info( - f"\033[33mNew service blocked group: {group}\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) + + def __init__(self, + service: str, + docs: str = None, + only_admin: bool = False, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + handlers: Optional[List[T_Handler]] = None, + temp: bool = False, + priority: int = 1, + state: Optional[T_State] = None): + self.service = service + self.docs = docs + self.only_admin = only_admin + self.rule = rule + self.permission = permission + self.handlers = handlers + self.temp = temp + self.priority = priority + self.state = state + + def _generate_service_config(self, service: str = None, docs: str = None) -> None: + if not service: + service = self.service + if not docs: + docs = self.docs or str() + + path = SERVICES_DIR / f"{service}.json" + data = ServiceInfo( + service=service, + docs=docs, + cmd_list=dict(), + enabled=True, + only_admin=self.only_admin, + disable_user=list(), + disable_group=list() + ) + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + except WriteError: + raise WriteError("Write service info failed!") + + def save_service(self, service_data: dict, service: str = None) -> None: + if not service: + service = self.service + + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + self._generate_service_config() + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(service_data, indent=4)) + + def load_service(self, service: str = None) -> dict: + if not service: + service = self.service + + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + self._generate_service_config() + + try: + data = json.loads(path.read_bytes()) + except ReadFileError: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + self._generate_service_config() + data = json.loads(path.read_bytes()) + return data + + def _save_cmds(self, cmds: dict) -> None: + data = self.load_service(self.service) + temp_data: dict = data["cmd_list"] + temp_data.update(cmds) + self.save_service(data) + + def _load_cmds(self) -> dict: + path = SERVICES_DIR / f"{self.service}.json" + if not path.is_file(): + self._generate_service_config() + + data = json.loads(path.read_bytes()) + return data["cmd_list"] + + def on_message(self, + docs: str = None, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + handlers: Optional[List[T_Handler]] = None, + block: bool = True, + priority: int = None, + state: Optional[T_State] = None) -> Type[Matcher]: + if not rule: + rule = self.rule + if not permission: + permission = self.permission + if not handlers: + handlers = self.handlers + if not priority: + priority = self.priority + if not state: + state = self.state + + if docs: + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "message" + str(a) + if _type not in cmd_list: + break else: - if group in data["disable_group"]: - del data["disable_group"][group] - log.info( - f"\033[33mGroup: {group} has been unblock\033[33m" - f"\033[33m | Service: {service} | Time: {datetime.now()}\033[33m" - ) - _save_service_config(service, data) - - @staticmethod - def on_message( - rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = True, - state: Optional[T_State] = None, - ) -> Type[Matcher]: + a += 1 + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + matcher = Matcher.new( "message", Rule() & rule, permission or Permission(), - temp=temp, + temp=self.temp, priority=priority, block=block, handlers=handlers, @@ -173,59 +191,86 @@ class Service: ) return matcher - @staticmethod - def on_notice( - rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - ) -> Type[Matcher]: + def on_notice(self, docs: str, block: bool = True) -> Type[Matcher]: + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "notice" + str(a) + if _type not in cmd_list: + break + else: + a += 1 + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + matcher = Matcher.new( "notice", - Rule() & rule, + Rule() & self.rule, Permission(), - temp=temp, - priority=priority, + temp=self.temp, + priority=self.priority, block=block, - handlers=handlers, - default_state=state, + handlers=self.handlers, + default_state=self.state, ) return matcher - @staticmethod - def on_request( - rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[T_Handler]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - ) -> Type[Matcher]: + def on_request(self, docs: str, block: bool = True) -> Type[Matcher]: + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "request" + str(a) + if _type not in cmd_list: + break + else: + a += 1 + + cmd_list[_type] =CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + matcher = Matcher.new( "request", - Rule() & rule, + Rule() & self.rule, Permission(), - temp=temp, - priority=priority, + temp=self.temp, + priority=self.priority, block=block, - handlers=handlers, - default_state=state, + handlers=self.handlers, + default_state=self.state, ) return matcher - @classmethod def on_command( - cls, + self, cmd: Union[str, Tuple[str, ...]], - docs: Optional[str] = None, + docs: str, rule: Optional[Union[Rule, T_RuleChecker]] = None, aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, **kwargs, ) -> Type[Matcher]: + _type = "command" + cmd_list = self._load_cmds() + if not rule: + rule = self.rule + if not aliases: + aliases = set() + + cmd_list[cmd] = CommandInfo( + type=_type, + docs=docs, + aliases=list(aliases) + ).dict() + self._save_cmds(cmd_list) + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): message = event.get_message() segment = message.pop(0) @@ -239,118 +284,107 @@ class Service: handlers.insert(0, _strip_cmd) commands = set([cmd]) | (aliases or set()) - _load_service_config(str(cmd), docs) - return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs) + return self.on_message(rule=command(*commands) & rule, handlers=handlers, **kwargs) - @classmethod def on_keyword( - cls, + self, keywords: Set[str], - docs: Optional[str] = None, + docs: str, rule: Optional[Union[Rule, T_RuleChecker]] = None, **kwargs, ) -> Type[Matcher]: - _load_service_config(list(keywords)[0], docs) - return cls.on_message(keyword(*keywords) & rule, **kwargs) + if not rule: + rule = self.rule + + a = 0 + cmd_list = self._load_cmds() + while True: + _type = "keyword" + str(a) + if _type not in cmd_list: + break + else: + a += 1 + + cmd_list[_type] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + + return self.on_message(rule=keyword(*keywords) & rule, **kwargs) - @classmethod def on_regex( - cls, + self, pattern: str, + docs: str, flags: Union[int, re.RegexFlag] = 0, rule: Optional[Union[Rule, T_RuleChecker]] = None, **kwargs, ) -> Type[Matcher]: - return on_message(regex(pattern, flags) & rule, **kwargs) - - class NetworkPost: - URL = f"http://{NetworkPost.host}:" f"{NetworkPost.port}/" - - @classmethod - async def send_private_msg( - cls, user_id: int, message: str, auto_escape: bool = False - ) -> Dict[str, Any]: - url = cls.URL + "send_private_msg?" - params = { - "user_id": user_id, - "message": message, - "auto_escape": f"{auto_escape}", - } - result = json.loads(await post_bytes(url, params)) - log.debug(result) - return result - - @classmethod - def send_group_msg( - cls, group_id: int, message: Union[str], auto_escape: Optional[bool] = ... - ) -> Dict[str, Any]: - ... - - @classmethod - async def send_msg( - cls, - message_type: Optional[str] = "", - user_id: Optional[int] = None, - group_id: Optional[int] = None, - message=Union[str], - auto_escape: bool = False, - ) -> Dict[str, Any]: - url = cls.URL + "send_msg?" - params = { - "message_type": "", - "user_id": user_id, - "group_id": group_id, - "message": message, - "auto_escape": str(auto_escape), - } - result = json.loads(await post_bytes(url, params)) - log.debug(result) - return result + _type = "regex" + if not rule: + rule = self.rule + + cmd_list = self._load_cmds() + cmd_list[pattern] = CommandInfo( + type=_type, + docs=docs, + aliases=list() + ).dict() + self._save_cmds(cmd_list) + + return self.on_message(rule=regex(pattern, flags) & rule, **kwargs) + + +class ServiceTools(object): + + @staticmethod + def save_service(service_data: dict, service: str) -> None: + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + raise ReadFileError( + f"Can't find service: ({service}) file.\n" + "Please delete all file in data/service/services.\n" + "Next reboot bot." + ) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(service_data, indent=4)) + + @staticmethod + def load_service(service: str) -> dict: + path = SERVICES_DIR / f"{service}.json" + if not path.is_file(): + raise ReadFileError( + f"Can't find service: ({service}) file.\n" + "Please delete all file in data/service/services.\n" + "Next reboot bot." + ) + + with open(path, "r", encoding="utf-8") as r: + data = json.loads(r.read()) + return data - class Dormant: - @staticmethod - def is_dormant() -> bool: - return False if is_sleep else True + @classmethod + def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool: + data = cls.load_service(service) + + auth_global = data.get("enabled", True) + auth_user = data.get("disable_user", list()) + auth_group = data.get("disable_group", list()) + + if user_id: + if user_id in auth_user: + return False - @staticmethod - def control_dormant(is_enable: bool) -> None: - global is_sleep - if is_enable: - is_sleep = True + if group_id: + if group_id in auth_group: + return False else: - is_sleep = False - - class BlockSystem: - file_name = "ban.json" - path = SERVICE_DIR / file_name - - @staticmethod - def auth_user(user: str) -> bool: - return False if user in _load_block_list()["user"] else True - - @staticmethod - def auth_group(group: str) -> bool: - return False if group in _load_block_list()["group"] else True - - @staticmethod - def control_list(is_enabled: bool, user: str = None, group: str = None) -> None: - data = _load_block_list() - if user: - if is_enabled: - data["user"][user] = str(datetime.now()) - log.info( - f"\033[33mNew blocked user: {user} | Time: {datetime.now()}\033[33m" - ) - else: - del data["user"][str(user)] - log.info(f"\033[33mUser {user} has been unblock.\033[33m") - elif group: - if is_enabled: - data["group"][group] = str(datetime.now()) - log.info( - f"\033[33mNew blocked group: {group} | Time: {datetime.now()}\033[33m" - ) - else: - del data["group"][str(group)] - log.info(f"\033[33mGroup {group} has been unblock.\033[33m") - _save_block_list(data) + return True + + if not auth_global: + return False + else: + return True diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py index e69de29..35f5aad 100644 --- a/ATRI/utils/__init__.py +++ b/ATRI/utils/__init__.py @@ -0,0 +1,214 @@ +import os +import re +import yaml +import aiofiles +from pathlib import Path +from datetime import datetime +from PIL import Image, ImageFile +from aiofiles.threadpool.text import AsyncTextIOWrapper + +from . import request + + +def now_time() -> float: + """获取当前时间的整数.""" + now_ = datetime.now() + hour = now_.hour + minute = now_.minute + now = hour + minute / 60 + return now + + +def load_yml(file: Path, encoding="utf-8") -> dict: + """打开 yaml 格式的文件.""" + with open(file, "r", encoding=encoding) as f: + data = yaml.safe_load(f) + return data + + +def safe_string(value): + if isinstance(value, bytes): + return value.decode() + return str(value) + + +class ListDealer: + """ + 对列表进行处理 + """ + + def __init__(self, lst: list, aim): + self.lst = lst + self.aim = aim + + def count(self) -> int: + count = 0 + for ele in self.lst: + if ele == self.aim: + count = count + 1 + return count + + def del_aim(self) -> list: + while self.aim in self.lst: + self.lst.remove(self.aim) + return self.lst + + +class CoolqCodeChecker: + """ + 检查所传回的cq码是否存在被注入可能 + """ + tenc_gchat_url: str = "gchat.qpic.cn" + may_inject_keys: list = ["record", "video", "music", "xml", "json"] + + def __init__(self, text: str): + self.text = text + + @property + def check(self) -> bool: + _type = re.findall(r"CQ:(.*?),", self.text) + for i in _type: + if i == "image": + result = re.findall(r"url=(.*?)]", self.text) + url = "" if not result else result[0] + if self.tenc_gchat_url not in url: + return False + else: + return True + if i in self.may_inject_keys: + return False + else: + return True + else: + return True + + +class FileDealer: + """ + 打开文件 + """ + + def __init__(self, path: Path, encoding: str = "utf-8"): + self.path = path + self.encoding = encoding + + async def write(self, path: Path, content): + try: + async with aiofiles.open(path, "w", encoding=self.encoding) as target: + await target.write(content) + except Exception: + raise Exception(f"Writing file({path}) failed!") + + async def _reader(self) -> AsyncTextIOWrapper: + try: + tar = await aiofiles.open(self.path, "r", encoding=self.encoding) + except FileNotFoundError: + raise FileNotFoundError(f"File({self.path}) not find!") + return tar + + async def read(self): + tar = await self._reader() + return tar.read() + + async def readline(self): + tar = await self._reader() + return tar.readline() + + async def readlines(self): + tar = await self._reader() + return tar.readlines() + + async def readtable(self): + tar = await self._reader() + return tar.readable() + + +class ImageDealer: + """ + 对图片进行压缩处理 + """ + + def __init__(self, out_path, kb: int = 300, quality: int = 85, k: float = 0.9): + self.out_path = out_path + self.kb = kb + self.quality = quality + self.k = k + + def deal(self) -> str: + o_size = os.path.getsize(self.out_path) // 1024 + if o_size <= self.kb: + return self.out_path + + ImageFile.LOAD_TRUNCATED_IMAGES = True # type: ignore + while o_size > self.kb: + img = Image.open(self.out_path) + x, y = img.size + out = img.resize((int(x * self.k), int(y * self.k)), Image.ANTIALIAS) + try: + out.save(self.out_path, quality=self.quality) + except Exception: + raise Exception("Writing file failed!") + o_size = os.path.getsize(self.out_path) // 1024 + return self.out_path + + +class Translate: + """ + 繁 <=> 中 + """ + + SIMPLE = "万与丑专业丛东丝丢两严丧个丬丰临为丽举么义乌乐乔习乡书买乱争于亏云亘亚产亩亲亵亸亿仅从仑仓仪们价众优伙会伛伞伟传伤伥伦伧伪伫体余佣佥侠侣侥侦侧侨侩侪侬俣俦俨俩俪俭债倾偬偻偾偿傥傧储傩儿兑兖党兰关兴兹养兽冁内冈册写军农冢冯冲决况冻净凄凉凌减凑凛几凤凫凭凯击凼凿刍划刘则刚创删别刬刭刽刿剀剂剐剑剥剧劝办务劢动励劲劳势勋勐勚匀匦匮区医华协单卖卢卤卧卫却卺厂厅历厉压厌厍厕厢厣厦厨厩厮县参叆叇双发变叙叠叶号叹叽吁后吓吕吗吣吨听启吴呒呓呕呖呗员呙呛呜咏咔咙咛咝咤咴咸哌响哑哒哓哔哕哗哙哜哝哟唛唝唠唡唢唣唤唿啧啬啭啮啰啴啸喷喽喾嗫呵嗳嘘嘤嘱噜噼嚣嚯团园囱围囵国图圆圣圹场坂坏块坚坛坜坝坞坟坠垄垅垆垒垦垧垩垫垭垯垱垲垴埘埙埚埝埯堑堕塆墙壮声壳壶壸处备复够头夸夹夺奁奂奋奖奥妆妇妈妩妪妫姗姜娄娅娆娇娈娱娲娴婳婴婵婶媪嫒嫔嫱嬷孙学孪宁宝实宠审宪宫宽宾寝对寻导寿将尔尘尧尴尸尽层屃屉届属屡屦屿岁岂岖岗岘岙岚岛岭岳岽岿峃峄峡峣峤峥峦崂崃崄崭嵘嵚嵛嵝嵴巅巩巯币帅师帏帐帘帜带帧帮帱帻帼幂幞干并广庄庆庐庑库应庙庞废庼廪开异弃张弥弪弯弹强归当录彟彦彻径徕御忆忏忧忾怀态怂怃怄怅怆怜总怼怿恋恳恶恸恹恺恻恼恽悦悫悬悭悯惊惧惨惩惫惬惭惮惯愍愠愤愦愿慑慭憷懑懒懔戆戋戏戗战戬户扎扑扦执扩扪扫扬扰抚抛抟抠抡抢护报担拟拢拣拥拦拧拨择挂挚挛挜挝挞挟挠挡挢挣挤挥挦捞损捡换捣据捻掳掴掷掸掺掼揸揽揿搀搁搂搅携摄摅摆摇摈摊撄撑撵撷撸撺擞攒敌敛数斋斓斗斩断无旧时旷旸昙昼昽显晋晒晓晔晕晖暂暧札术朴机杀杂权条来杨杩杰极构枞枢枣枥枧枨枪枫枭柜柠柽栀栅标栈栉栊栋栌栎栏树栖样栾桊桠桡桢档桤桥桦桧桨桩梦梼梾检棂椁椟椠椤椭楼榄榇榈榉槚槛槟槠横樯樱橥橱橹橼檐檩欢欤欧歼殁殇残殒殓殚殡殴毁毂毕毙毡毵氇气氢氩氲汇汉污汤汹沓沟没沣沤沥沦沧沨沩沪沵泞泪泶泷泸泺泻泼泽泾洁洒洼浃浅浆浇浈浉浊测浍济浏浐浑浒浓浔浕涂涌涛涝涞涟涠涡涢涣涤润涧涨涩淀渊渌渍渎渐渑渔渖渗温游湾湿溃溅溆溇滗滚滞滟滠满滢滤滥滦滨滩滪漤潆潇潋潍潜潴澜濑濒灏灭灯灵灾灿炀炉炖炜炝点炼炽烁烂烃烛烟烦烧烨烩烫烬热焕焖焘煅煳熘爱爷牍牦牵牺犊犟状犷犸犹狈狍狝狞独狭狮狯狰狱狲猃猎猕猡猪猫猬献獭玑玙玚玛玮环现玱玺珉珏珐珑珰珲琎琏琐琼瑶瑷璇璎瓒瓮瓯电画畅畲畴疖疗疟疠疡疬疮疯疱疴痈痉痒痖痨痪痫痴瘅瘆瘗瘘瘪瘫瘾瘿癞癣癫癯皑皱皲盏盐监盖盗盘眍眦眬着睁睐睑瞒瞩矫矶矾矿砀码砖砗砚砜砺砻砾础硁硅硕硖硗硙硚确硷碍碛碜碱碹磙礼祎祢祯祷祸禀禄禅离秃秆种积称秽秾稆税稣稳穑穷窃窍窑窜窝窥窦窭竖竞笃笋笔笕笺笼笾筑筚筛筜筝筹签简箓箦箧箨箩箪箫篑篓篮篱簖籁籴类籼粜粝粤粪粮糁糇紧絷纟纠纡红纣纤纥约级纨纩纪纫纬纭纮纯纰纱纲纳纴纵纶纷纸纹纺纻纼纽纾线绀绁绂练组绅细织终绉绊绋绌绍绎经绐绑绒结绔绕绖绗绘给绚绛络绝绞统绠绡绢绣绤绥绦继绨绩绪绫绬续绮绯绰绱绲绳维绵绶绷绸绹绺绻综绽绾绿缀缁缂缃缄缅缆缇缈缉缊缋缌缍缎缏缐缑缒缓缔缕编缗缘缙缚缛缜缝缞缟缠缡缢缣缤缥缦缧缨缩缪缫缬缭缮缯缰缱缲缳缴缵罂网罗罚罢罴羁羟羡翘翙翚耢耧耸耻聂聋职聍联聩聪肃肠肤肷肾肿胀胁胆胜胧胨胪胫胶脉脍脏脐脑脓脔脚脱脶脸腊腌腘腭腻腼腽腾膑臜舆舣舰舱舻艰艳艹艺节芈芗芜芦苁苇苈苋苌苍苎苏苘苹茎茏茑茔茕茧荆荐荙荚荛荜荞荟荠荡荣荤荥荦荧荨荩荪荫荬荭荮药莅莜莱莲莳莴莶获莸莹莺莼萚萝萤营萦萧萨葱蒇蒉蒋蒌蓝蓟蓠蓣蓥蓦蔷蔹蔺蔼蕲蕴薮藁藓虏虑虚虫虬虮虽虾虿蚀蚁蚂蚕蚝蚬蛊蛎蛏蛮蛰蛱蛲蛳蛴蜕蜗蜡蝇蝈蝉蝎蝼蝾螀螨蟏衅衔补衬衮袄袅袆袜袭袯装裆裈裢裣裤裥褛褴襁襕见观觃规觅视觇览觉觊觋觌觍觎觏觐觑觞触觯詟誉誊讠计订讣认讥讦讧讨让讪讫训议讯记讱讲讳讴讵讶讷许讹论讻讼讽设访诀证诂诃评诅识诇诈诉诊诋诌词诎诏诐译诒诓诔试诖诗诘诙诚诛诜话诞诟诠诡询诣诤该详诧诨诩诪诫诬语诮误诰诱诲诳说诵诶请诸诹诺读诼诽课诿谀谁谂调谄谅谆谇谈谊谋谌谍谎谏谐谑谒谓谔谕谖谗谘谙谚谛谜谝谞谟谠谡谢谣谤谥谦谧谨谩谪谫谬谭谮谯谰谱谲谳谴谵谶谷豮贝贞负贠贡财责贤败账货质贩贪贫贬购贮贯贰贱贲贳贴贵贶贷贸费贺贻贼贽贾贿赀赁赂赃资赅赆赇赈赉赊赋赌赍赎赏赐赑赒赓赔赕赖赗赘赙赚赛赜赝赞赟赠赡赢赣赪赵赶趋趱趸跃跄跖跞践跶跷跸跹跻踊踌踪踬踯蹑蹒蹰蹿躏躜躯车轧轨轩轪轫转轭轮软轰轱轲轳轴轵轶轷轸轹轺轻轼载轾轿辀辁辂较辄辅辆辇辈辉辊辋辌辍辎辏辐辑辒输辔辕辖辗辘辙辚辞辩辫边辽达迁过迈运还这进远违连迟迩迳迹适选逊递逦逻遗遥邓邝邬邮邹邺邻郁郄郏郐郑郓郦郧郸酝酦酱酽酾酿释里鉅鉴銮錾钆钇针钉钊钋钌钍钎钏钐钑钒钓钔钕钖钗钘钙钚钛钝钞钟钠钡钢钣钤钥钦钧钨钩钪钫钬钭钮钯钰钱钲钳钴钵钶钷钸钹钺钻钼钽钾钿铀铁铂铃铄铅铆铈铉铊铋铍铎铏铐铑铒铕铗铘铙铚铛铜铝铞铟铠铡铢铣铤铥铦铧铨铪铫铬铭铮铯铰铱铲铳铴铵银铷铸铹铺铻铼铽链铿销锁锂锃锄锅锆锇锈锉锊锋锌锍锎锏锐锑锒锓锔锕锖锗错锚锜锞锟锠锡锢锣锤锥锦锨锩锫锬锭键锯锰锱锲锳锴锵锶锷锸锹锺锻锼锽锾锿镀镁镂镃镆镇镈镉镊镌镍镎镏镐镑镒镕镖镗镙镚镛镜镝镞镟镠镡镢镣镤镥镦镧镨镩镪镫镬镭镮镯镰镱镲镳镴镶长门闩闪闫闬闭问闯闰闱闲闳间闵闶闷闸闹闺闻闼闽闾闿阀阁阂阃阄阅阆阇阈阉阊阋阌阍阎阏阐阑阒阓阔阕阖阗阘阙阚阛队阳阴阵阶际陆陇陈陉陕陧陨险随隐隶隽难雏雠雳雾霁霉霭靓静靥鞑鞒鞯鞴韦韧韨韩韪韫韬韵页顶顷顸项顺须顼顽顾顿颀颁颂颃预颅领颇颈颉颊颋颌颍颎颏颐频颒颓颔颕颖颗题颙颚颛颜额颞颟颠颡颢颣颤颥颦颧风飏飐飑飒飓飔飕飖飗飘飙飚飞飨餍饤饥饦饧饨饩饪饫饬饭饮饯饰饱饲饳饴饵饶饷饸饹饺饻饼饽饾饿馀馁馂馃馄馅馆馇馈馉馊馋馌馍馎馏馐馑馒馓馔馕马驭驮驯驰驱驲驳驴驵驶驷驸驹驺驻驼驽驾驿骀骁骂骃骄骅骆骇骈骉骊骋验骍骎骏骐骑骒骓骔骕骖骗骘骙骚骛骜骝骞骟骠骡骢骣骤骥骦骧髅髋髌鬓魇魉鱼鱽鱾鱿鲀鲁鲂鲄鲅鲆鲇鲈鲉鲊鲋鲌鲍鲎鲏鲐鲑鲒鲓鲔鲕鲖鲗鲘鲙鲚鲛鲜鲝鲞鲟鲠鲡鲢鲣鲤鲥鲦鲧鲨鲩鲪鲫鲬鲭鲮鲯鲰鲱鲲鲳鲴鲵鲶鲷鲸鲹鲺鲻鲼鲽鲾鲿鳀鳁鳂鳃鳄鳅鳆鳇鳈鳉鳊鳋鳌鳍鳎鳏鳐鳑鳒鳓鳔鳕鳖鳗鳘鳙鳛鳜鳝鳞鳟鳠鳡鳢鳣鸟鸠鸡鸢鸣鸤鸥鸦鸧鸨鸩鸪鸫鸬鸭鸮鸯鸰鸱鸲鸳鸴鸵鸶鸷鸸鸹鸺鸻鸼鸽鸾鸿鹀鹁鹂鹃鹄鹅鹆鹇鹈鹉鹊鹋鹌鹍鹎鹏鹐鹑鹒鹓鹔鹕鹖鹗鹘鹚鹛鹜鹝鹞鹟鹠鹡鹢鹣鹤鹥鹦鹧鹨鹩鹪鹫鹬鹭鹯鹰鹱鹲鹳鹴鹾麦麸黄黉黡黩黪黾鼋鼌鼍鼗鼹齄齐齑齿龀龁龂龃龄龅龆龇龈龉龊龋龌龙龚龛龟志制咨只里系范松没尝尝闹面准钟别闲干尽脏拼" + TRADITION = "萬與醜專業叢東絲丟兩嚴喪個爿豐臨為麗舉麼義烏樂喬習鄉書買亂爭於虧雲亙亞產畝親褻嚲億僅從侖倉儀們價眾優夥會傴傘偉傳傷倀倫傖偽佇體餘傭僉俠侶僥偵側僑儈儕儂俁儔儼倆儷儉債傾傯僂僨償儻儐儲儺兒兌兗黨蘭關興茲養獸囅內岡冊寫軍農塚馮衝決況凍淨淒涼淩減湊凜幾鳳鳧憑凱擊氹鑿芻劃劉則剛創刪別剗剄劊劌剴劑剮劍剝劇勸辦務勱動勵勁勞勢勳猛勩勻匭匱區醫華協單賣盧鹵臥衛卻巹廠廳曆厲壓厭厙廁廂厴廈廚廄廝縣參靉靆雙發變敘疊葉號歎嘰籲後嚇呂嗎唚噸聽啟吳嘸囈嘔嚦唄員咼嗆嗚詠哢嚨嚀噝吒噅鹹呱響啞噠嘵嗶噦嘩噲嚌噥喲嘜嗊嘮啢嗩唕喚呼嘖嗇囀齧囉嘽嘯噴嘍嚳囁嗬噯噓嚶囑嚕劈囂謔團園囪圍圇國圖圓聖壙場阪壞塊堅壇壢壩塢墳墜壟壟壚壘墾坰堊墊埡墶壋塏堖塒塤堝墊垵塹墮壪牆壯聲殼壺壼處備復夠頭誇夾奪奩奐奮獎奧妝婦媽嫵嫗媯姍薑婁婭嬈嬌孌娛媧嫻嫿嬰嬋嬸媼嬡嬪嬙嬤孫學孿寧寶實寵審憲宮寬賓寢對尋導壽將爾塵堯尷屍盡層屭屜屆屬屢屨嶼歲豈嶇崗峴嶴嵐島嶺嶽崠巋嶨嶧峽嶢嶠崢巒嶗崍嶮嶄嶸嶔崳嶁脊巔鞏巰幣帥師幃帳簾幟帶幀幫幬幘幗冪襆幹並廣莊慶廬廡庫應廟龐廢廎廩開異棄張彌弳彎彈強歸當錄彠彥徹徑徠禦憶懺憂愾懷態慫憮慪悵愴憐總懟懌戀懇惡慟懨愷惻惱惲悅愨懸慳憫驚懼慘懲憊愜慚憚慣湣慍憤憒願懾憖怵懣懶懍戇戔戲戧戰戩戶紮撲扡執擴捫掃揚擾撫拋摶摳掄搶護報擔擬攏揀擁攔擰撥擇掛摯攣掗撾撻挾撓擋撟掙擠揮撏撈損撿換搗據撚擄摑擲撣摻摜摣攬撳攙擱摟攪攜攝攄擺搖擯攤攖撐攆擷擼攛擻攢敵斂數齋斕鬥斬斷無舊時曠暘曇晝曨顯晉曬曉曄暈暉暫曖劄術樸機殺雜權條來楊榪傑極構樅樞棗櫪梘棖槍楓梟櫃檸檉梔柵標棧櫛櫳棟櫨櫟欄樹棲樣欒棬椏橈楨檔榿橋樺檜槳樁夢檮棶檢欞槨櫝槧欏橢樓欖櫬櫚櫸檟檻檳櫧橫檣櫻櫫櫥櫓櫞簷檁歡歟歐殲歿殤殘殞殮殫殯毆毀轂畢斃氈毿氌氣氫氬氳彙漢汙湯洶遝溝沒灃漚瀝淪滄渢溈滬濔濘淚澩瀧瀘濼瀉潑澤涇潔灑窪浹淺漿澆湞溮濁測澮濟瀏滻渾滸濃潯濜塗湧濤澇淶漣潿渦溳渙滌潤澗漲澀澱淵淥漬瀆漸澠漁瀋滲溫遊灣濕潰濺漵漊潷滾滯灩灄滿瀅濾濫灤濱灘澦濫瀠瀟瀲濰潛瀦瀾瀨瀕灝滅燈靈災燦煬爐燉煒熗點煉熾爍爛烴燭煙煩燒燁燴燙燼熱煥燜燾煆糊溜愛爺牘犛牽犧犢強狀獷獁猶狽麅獮獰獨狹獅獪猙獄猻獫獵獼玀豬貓蝟獻獺璣璵瑒瑪瑋環現瑲璽瑉玨琺瓏璫琿璡璉瑣瓊瑤璦璿瓔瓚甕甌電畫暢佘疇癤療瘧癘瘍鬁瘡瘋皰屙癰痙癢瘂癆瘓癇癡癉瘮瘞瘺癟癱癮癭癩癬癲臒皚皺皸盞鹽監蓋盜盤瞘眥矓著睜睞瞼瞞矚矯磯礬礦碭碼磚硨硯碸礪礱礫礎硜矽碩硤磽磑礄確鹼礙磧磣堿镟滾禮禕禰禎禱禍稟祿禪離禿稈種積稱穢穠穭稅穌穩穡窮竊竅窯竄窩窺竇窶豎競篤筍筆筧箋籠籩築篳篩簹箏籌簽簡籙簀篋籜籮簞簫簣簍籃籬籪籟糴類秈糶糲粵糞糧糝餱緊縶糸糾紆紅紂纖紇約級紈纊紀紉緯紜紘純紕紗綱納紝縱綸紛紙紋紡紵紖紐紓線紺絏紱練組紳細織終縐絆紼絀紹繹經紿綁絨結絝繞絰絎繪給絢絳絡絕絞統綆綃絹繡綌綏絛繼綈績緒綾緓續綺緋綽緔緄繩維綿綬繃綢綯綹綣綜綻綰綠綴緇緙緗緘緬纜緹緲緝縕繢緦綞緞緶線緱縋緩締縷編緡緣縉縛縟縝縫縗縞纏縭縊縑繽縹縵縲纓縮繆繅纈繚繕繒韁繾繰繯繳纘罌網羅罰罷羆羈羥羨翹翽翬耮耬聳恥聶聾職聹聯聵聰肅腸膚膁腎腫脹脅膽勝朧腖臚脛膠脈膾髒臍腦膿臠腳脫腡臉臘醃膕齶膩靦膃騰臏臢輿艤艦艙艫艱豔艸藝節羋薌蕪蘆蓯葦藶莧萇蒼苧蘇檾蘋莖蘢蔦塋煢繭荊薦薘莢蕘蓽蕎薈薺蕩榮葷滎犖熒蕁藎蓀蔭蕒葒葤藥蒞蓧萊蓮蒔萵薟獲蕕瑩鶯蓴蘀蘿螢營縈蕭薩蔥蕆蕢蔣蔞藍薊蘺蕷鎣驀薔蘞藺藹蘄蘊藪槁蘚虜慮虛蟲虯蟣雖蝦蠆蝕蟻螞蠶蠔蜆蠱蠣蟶蠻蟄蛺蟯螄蠐蛻蝸蠟蠅蟈蟬蠍螻蠑螿蟎蠨釁銜補襯袞襖嫋褘襪襲襏裝襠褌褳襝褲襇褸襤繈襴見觀覎規覓視覘覽覺覬覡覿覥覦覯覲覷觴觸觶讋譽謄訁計訂訃認譏訐訌討讓訕訖訓議訊記訒講諱謳詎訝訥許訛論訩訟諷設訪訣證詁訶評詛識詗詐訴診詆謅詞詘詔詖譯詒誆誄試詿詩詰詼誠誅詵話誕詬詮詭詢詣諍該詳詫諢詡譸誡誣語誚誤誥誘誨誑說誦誒請諸諏諾讀諑誹課諉諛誰諗調諂諒諄誶談誼謀諶諜謊諫諧謔謁謂諤諭諼讒諮諳諺諦謎諞諝謨讜謖謝謠謗諡謙謐謹謾謫譾謬譚譖譙讕譜譎讞譴譫讖穀豶貝貞負貟貢財責賢敗賬貨質販貪貧貶購貯貫貳賤賁貰貼貴貺貸貿費賀貽賊贄賈賄貲賃賂贓資賅贐賕賑賚賒賦賭齎贖賞賜贔賙賡賠賧賴賵贅賻賺賽賾贗讚贇贈贍贏贛赬趙趕趨趲躉躍蹌蹠躒踐躂蹺蹕躚躋踴躊蹤躓躑躡蹣躕躥躪躦軀車軋軌軒軑軔轉軛輪軟轟軲軻轤軸軹軼軤軫轢軺輕軾載輊轎輈輇輅較輒輔輛輦輩輝輥輞輬輟輜輳輻輯轀輸轡轅轄輾轆轍轔辭辯辮邊遼達遷過邁運還這進遠違連遲邇逕跡適選遜遞邐邏遺遙鄧鄺鄔郵鄒鄴鄰鬱郤郟鄶鄭鄆酈鄖鄲醞醱醬釅釃釀釋裏钜鑒鑾鏨釓釔針釘釗釙釕釷釺釧釤鈒釩釣鍆釹鍚釵鈃鈣鈈鈦鈍鈔鍾鈉鋇鋼鈑鈐鑰欽鈞鎢鉤鈧鈁鈥鈄鈕鈀鈺錢鉦鉗鈷缽鈳鉕鈽鈸鉞鑽鉬鉭鉀鈿鈾鐵鉑鈴鑠鉛鉚鈰鉉鉈鉍鈹鐸鉶銬銠鉺銪鋏鋣鐃銍鐺銅鋁銱銦鎧鍘銖銑鋌銩銛鏵銓鉿銚鉻銘錚銫鉸銥鏟銃鐋銨銀銣鑄鐒鋪鋙錸鋱鏈鏗銷鎖鋰鋥鋤鍋鋯鋨鏽銼鋝鋒鋅鋶鐦鐧銳銻鋃鋟鋦錒錆鍺錯錨錡錁錕錩錫錮鑼錘錐錦鍁錈錇錟錠鍵鋸錳錙鍥鍈鍇鏘鍶鍔鍤鍬鍾鍛鎪鍠鍰鎄鍍鎂鏤鎡鏌鎮鎛鎘鑷鐫鎳鎿鎦鎬鎊鎰鎔鏢鏜鏍鏰鏞鏡鏑鏃鏇鏐鐔钁鐐鏷鑥鐓鑭鐠鑹鏹鐙鑊鐳鐶鐲鐮鐿鑔鑣鑞鑲長門閂閃閆閈閉問闖閏闈閑閎間閔閌悶閘鬧閨聞闥閩閭闓閥閣閡閫鬮閱閬闍閾閹閶鬩閿閽閻閼闡闌闃闠闊闋闔闐闒闕闞闤隊陽陰陣階際陸隴陳陘陝隉隕險隨隱隸雋難雛讎靂霧霽黴靄靚靜靨韃鞽韉韝韋韌韍韓韙韞韜韻頁頂頃頇項順須頊頑顧頓頎頒頌頏預顱領頗頸頡頰頲頜潁熲頦頤頻頮頹頷頴穎顆題顒顎顓顏額顳顢顛顙顥纇顫顬顰顴風颺颭颮颯颶颸颼颻飀飄飆飆飛饗饜飣饑飥餳飩餼飪飫飭飯飲餞飾飽飼飿飴餌饒餉餄餎餃餏餅餑餖餓餘餒餕餜餛餡館餷饋餶餿饞饁饃餺餾饈饉饅饊饌饢馬馭馱馴馳驅馹駁驢駔駛駟駙駒騶駐駝駑駕驛駘驍罵駰驕驊駱駭駢驫驪騁驗騂駸駿騏騎騍騅騌驌驂騙騭騤騷騖驁騮騫騸驃騾驄驏驟驥驦驤髏髖髕鬢魘魎魚魛魢魷魨魯魴魺鮁鮃鯰鱸鮋鮓鮒鮊鮑鱟鮍鮐鮭鮚鮳鮪鮞鮦鰂鮜鱠鱭鮫鮮鮺鯗鱘鯁鱺鰱鰹鯉鰣鰷鯀鯊鯇鮶鯽鯒鯖鯪鯕鯫鯡鯤鯧鯝鯢鯰鯛鯨鯵鯴鯔鱝鰈鰏鱨鯷鰮鰃鰓鱷鰍鰒鰉鰁鱂鯿鰠鼇鰭鰨鰥鰩鰟鰜鰳鰾鱈鱉鰻鰵鱅鰼鱖鱔鱗鱒鱯鱤鱧鱣鳥鳩雞鳶鳴鳲鷗鴉鶬鴇鴆鴣鶇鸕鴨鴞鴦鴒鴟鴝鴛鴬鴕鷥鷙鴯鴰鵂鴴鵃鴿鸞鴻鵐鵓鸝鵑鵠鵝鵒鷳鵜鵡鵲鶓鵪鶤鵯鵬鵮鶉鶊鵷鷫鶘鶡鶚鶻鶿鶥鶩鷊鷂鶲鶹鶺鷁鶼鶴鷖鸚鷓鷚鷯鷦鷲鷸鷺鸇鷹鸌鸏鸛鸘鹺麥麩黃黌黶黷黲黽黿鼂鼉鞀鼴齇齊齏齒齔齕齗齟齡齙齠齜齦齬齪齲齷龍龔龕龜誌製谘隻裡係範鬆冇嚐嘗鬨麵準鐘彆閒乾儘臟拚" + + def __init__(self, text: str): + self.text = text + + def to_tradition(self) -> str: + output_str_list = [] + str_len = len(self.text) + + for i in range(str_len): + found_index = self.SIMPLE.find(self.text[i]) + + if not found_index == -1: + output_str_list.append(self.TRADITION[found_index]) + else: + output_str_list.append(self.text[i]) + + return "".join(output_str_list) + + def to_simple(self) -> str: + output_str_list = [] + str_len = len(self.text) + + for i in range(str_len): + found_index = self.TRADITION.find(self.text[i]) + + if not found_index == -1: + output_str_list.append(self.SIMPLE[found_index]) + else: + output_str_list.append(self.text[i]) + + return "".join(output_str_list) + + +class UbuntuPaste: + """ + 将信息粘贴至 ubuntu pastebin + """ + + URL = "https://paste.ubuntu.com/" + + def __init__(self, form_data): + self.form_data = form_data + + async def paste(self) -> str: + try: + res = await request.post(self.URL, self.form_data) + except BaseException: + raise BaseException("Request failed!") + result = res.url + return result + + + diff --git a/ATRI/utils/cqcode.py b/ATRI/utils/cqcode.py deleted file mode 100644 index e4c6b5e..0000000 --- a/ATRI/utils/cqcode.py +++ /dev/null @@ -1,27 +0,0 @@ -import re -from typing import Optional - -from ATRI.service import Service as sv - - -tencent_gchat_url = "gchat.qpic.cn" -noob_code = ["record", "video", "music", "xml", "json"] - - -async def coolq_code_check( - cq_code: str, user: Optional[int] = None, group: Optional[int] = None -): - _type = re.findall(r"CQ:(.*?),", cq_code) - for i in _type: - if i == "image": - result = re.findall(r"url=(.*?)]", cq_code) - url = "" if not result else result[0] - if tencent_gchat_url not in url: - msg = "你注你🐎呢" - await sv.NetworkPost.send_msg(user_id=user, group_id=group, message=msg) - else: - return True - elif i in noob_code: - return False - else: - return True diff --git a/ATRI/utils/file.py b/ATRI/utils/file.py deleted file mode 100644 index 6cae2a5..0000000 --- a/ATRI/utils/file.py +++ /dev/null @@ -1,30 +0,0 @@ -import aiofiles -import urllib -from pathlib import Path - -from ATRI.exceptions import WriteError - -from .request import get_content - - -async def write_file(path: Path, text, encoding="utf-8") -> None: - try: - async with aiofiles.open(path, "w", encoding=encoding) as target: - await target.write(text) - except WriteError: - raise WriteError("Writing file failed!") - - -async def open_file(path: Path, method, encoding="utf-8"): - try: - async with aiofiles.open(path, "r", encoding=encoding) as target: - if method == "read": - return target.read() - elif method == "readlines": - return await target.readlines() - elif method == "readline": - return await target.readline() - else: - return target.readable() - except EOFError: - raise EOFError("File not fond!") diff --git a/ATRI/utils/img.py b/ATRI/utils/img.py deleted file mode 100644 index 147af79..0000000 --- a/ATRI/utils/img.py +++ /dev/null @@ -1,23 +0,0 @@ -import os -from PIL import Image, ImageFile - -from ATRI.exceptions import WriteError - - -def compress_image(out_file, kb=300, quality=85, k=0.9) -> str: - """将目标图片进行压缩""" - o_size = os.path.getsize(out_file) // 1024 - if o_size <= kb: - return out_file - - ImageFile.LOAD_TRUNCATED_IMAGES = True # type: ignore - while o_size > kb: - img = Image.open(out_file) - x, y = img.size - out = img.resize((int(x * k), int(y * k)), Image.ANTIALIAS) - try: - out.save(out_file, quality=quality) - except WriteError: - raise WriteError("Writing file failed!") - o_size = os.path.getsize(out_file) // 1024 - return out_file diff --git a/ATRI/utils/limit.py b/ATRI/utils/limit.py index bdd1d84..4b3caef 100644 --- a/ATRI/utils/limit.py +++ b/ATRI/utils/limit.py @@ -1,39 +1,138 @@ -import datetime -from random import choice +import time +import pytz +import functools +from threading import RLock +from collections import defaultdict, deque +from datetime import datetime, timedelta -from ATRI.service import Service as sv -from .list import count_list, del_list_aim -from .apscheduler import scheduler, DateTrigger +class LimitBucket: + """ + 限制某功能运行中某段在一定速率下 + """ + + def __init__(self, capacity, fill_rate, is_lock: bool = False) -> None: + """ + :param capacity: 容量总数 + :param fill_rate: 重新装填速率(单位:秒) + """ + self._capacity = float(capacity) + self._tokens = float(capacity) + self._fill_rate = float(fill_rate) + self._last_time = time() + self._is_lock = is_lock + self._lock = RLock() + + def _get_cur_tokens(self): + if self._tokens < self._capacity: + now = time() + delta = self._fill_rate * (now - self._last_time) + self._tokens = min(self._capacity, self._tokens + delta) + self._last_time = now + return self._tokens + + def get_cur_tokens(self): + if self._is_lock: + with self._lock: + return self._get_cur_tokens() + else: + return self._get_cur_tokens() + + def _consume(self, tokens) -> bool: + if tokens <= self.get_cur_tokens(): + self._tokens -= tokens + return True + return False + + def consume(self, tokens): + if self._is_lock: + with self._lock: + return self._consume(tokens) + else: + return self._consume(tokens) -exciting_user_temp = [] -exciting_user = [] +class RateLimiting: + """ + 限制该功能全体速率 + """ + + def __init__(self, max_calls, period=1.0): + if period <= 0: + raise ValueError('Rate limiting period should be > 0') + if max_calls <= 0: + raise ValueError('Rate limiting number of calls should be > 0') -def del_list(user: str) -> None: - global exciting_user - exciting_user = del_list_aim(exciting_user, user) + self.calls = deque() + self.period = period + self.max_calls = max_calls -def is_too_exciting( - user: int, times: int, seconds: float = 0, hours: float = 0, days: float = 0 -) -> bool: - global exciting_user + def __call__(self, func): + @functools.wraps(func) + def wrapped(*args, **kwargs): + with self: + return func(*args, **kwargs) + return wrapped - if user in exciting_user: - return False - else: - if count_list(exciting_user_temp, user) == times: - delta = datetime.timedelta(seconds=seconds, hours=hours, days=days) - trigger = DateTrigger(run_date=datetime.datetime.now() + delta) - - scheduler.add_job( - func=del_list, - trigger=trigger, - args=(user,), - misfire_grace_time=1, - ) - return False - else: - exciting_user_temp.append(user) - return True + def __enter__(self): + if len(self.calls) >= self.max_calls: + time.sleep(self.period - self._timespan) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.calls.append(time.time()) + while self._timespan >= self.period: + self.calls.popleft() + + @property + def _timespan(self): + return self.calls[-1] - self.calls[0] + + +class FreqLimiter: + """ + Copy from: https://github.com/Ice-Cirno/HoshinoBot/blob/master/hoshino/util/__init__.py + """ + + def __init__(self, default_cd_seconds): + self.next_time = defaultdict(float) + self.default_cd = default_cd_seconds + + def check(self, key) -> bool: + return bool(time.time() >= self.next_time[key]) + + def start_cd(self, key, cd_time=0): + self.next_time[key] = time.time() + (cd_time if cd_time > 0 else self.default_cd) + + def left_time(self, key) -> float: + return self.next_time[key] - time.time() + + +class DailyLimiter: + """ + Copy from: https://github.com/Ice-Cirno/HoshinoBot/blob/master/hoshino/util/__init__.py + """ + tz = pytz.timezone('Asia/Shanghai') + + def __init__(self, max_num): + self.today = -1 + self.count = defaultdict(int) + self.max = max_num + + def check(self, key) -> bool: + now = datetime.now(self.tz) + day = (now - timedelta(hours=6)).day + if day != self.today: + self.today = day + self.count.clear() + return bool(self.count[key] < self.max) + + def get_num(self, key): + return self.count[key] + + def increase(self, key, num=1): + self.count[key] += num + + def reset(self, key): + self.count[key] = 0 diff --git a/ATRI/utils/list.py b/ATRI/utils/list.py deleted file mode 100644 index af6fb76..0000000 --- a/ATRI/utils/list.py +++ /dev/null @@ -1,14 +0,0 @@ -def count_list(lst: list, aim) -> int: - """查看指定列表中目标元素所存在的数量""" - count = 0 - for ele in lst: - if ele == aim: - count = count + 1 - return count - - -def del_list_aim(lst: list, aim) -> list: - """删除指定列表中的所有元素""" - while aim in lst: - lst.remove(aim) - return lst diff --git a/ATRI/utils/request.py b/ATRI/utils/request.py index d16c17b..c638d63 100644 --- a/ATRI/utils/request.py +++ b/ATRI/utils/request.py @@ -1,32 +1,54 @@ -from typing import Optional -from aiohttp import ClientSession - - -async def get_text(url: str, headers: Optional[dict] = None) -> str: - async with ClientSession() as session: - async with session.get(url, headers=headers) as r: - result = await r.text() - return result - - -async def get_bytes(url: str, headers: Optional[dict] = None) -> bytes: - async with ClientSession() as session: - async with session.get(url, headers=headers) as r: - result = await r.read() - return result - - -async def get_content(url: str, headers: Optional[dict] = None): - async with ClientSession() as session: - async with session.get(url, headers=headers) as r: - result = await r.content.read() - return result - - -async def post_bytes( - url: str, params: Optional[dict] = None, json: Optional[dict] = None -) -> bytes: - async with ClientSession() as session: - async with session.post(url, params=params, json=json) as r: - result = await r.read() - return result +from aiohttp import ClientSession, ClientResponse + + +class Response: + + def __init__(self, response: ClientResponse) -> None: + self.raw_response = response + + @property + def status(self) -> int: + return self.raw_response.status + + @property + def url(self): + return self.raw_response.url + + @property + def real_url(self): + return self.raw_response.real_url + + @property + def host(self): + return self.raw_response.host + + @property + def headers(self): + return self.raw_response.headers + + @property + def raw_headers(self): + return self.raw_response.raw_headers + + @property + def request_info(self): + return self.raw_response.request_info + + @property + async def read(self): + return await self.raw_response.read() + + @property + async def text(self): + return await self.raw_response.text() + + async def json(self): + return await self.raw_response.json() + + +async def get(url, params: dict = None, **kwargs) -> Response: + return Response(await ClientSession().get(url=url, params=params, **kwargs)) + + +async def post(url, data: dict = None, **kwargs) -> Response: + return Response(await ClientSession().post(url=url, data=data, **kwargs)) diff --git a/ATRI/utils/time.py b/ATRI/utils/time.py deleted file mode 100644 index 5204ea5..0000000 --- a/ATRI/utils/time.py +++ /dev/null @@ -1,10 +0,0 @@ -from datetime import datetime - - -def now_time() -> float: - """获取当前小时整数""" - now_ = datetime.now() - hour = now_.hour - minute = now_.minute - now = hour + minute / 60 - return now diff --git a/ATRI/utils/translate.py b/ATRI/utils/translate.py deleted file mode 100644 index 7df891c..0000000 --- a/ATRI/utils/translate.py +++ /dev/null @@ -1,32 +0,0 @@ -SIMPLE = "万与丑专业丛东丝丢两严丧个丬丰临为丽举么义乌乐乔习乡书买乱争于亏云亘亚产亩亲亵亸亿仅从仑仓仪们价众优伙会伛伞伟传伤伥伦伧伪伫体余佣佥侠侣侥侦侧侨侩侪侬俣俦俨俩俪俭债倾偬偻偾偿傥傧储傩儿兑兖党兰关兴兹养兽冁内冈册写军农冢冯冲决况冻净凄凉凌减凑凛几凤凫凭凯击凼凿刍划刘则刚创删别刬刭刽刿剀剂剐剑剥剧劝办务劢动励劲劳势勋勐勚匀匦匮区医华协单卖卢卤卧卫却卺厂厅历厉压厌厍厕厢厣厦厨厩厮县参叆叇双发变叙叠叶号叹叽吁后吓吕吗吣吨听启吴呒呓呕呖呗员呙呛呜咏咔咙咛咝咤咴咸哌响哑哒哓哔哕哗哙哜哝哟唛唝唠唡唢唣唤唿啧啬啭啮啰啴啸喷喽喾嗫呵嗳嘘嘤嘱噜噼嚣嚯团园囱围囵国图圆圣圹场坂坏块坚坛坜坝坞坟坠垄垅垆垒垦垧垩垫垭垯垱垲垴埘埙埚埝埯堑堕塆墙壮声壳壶壸处备复够头夸夹夺奁奂奋奖奥妆妇妈妩妪妫姗姜娄娅娆娇娈娱娲娴婳婴婵婶媪嫒嫔嫱嬷孙学孪宁宝实宠审宪宫宽宾寝对寻导寿将尔尘尧尴尸尽层屃屉届属屡屦屿岁岂岖岗岘岙岚岛岭岳岽岿峃峄峡峣峤峥峦崂崃崄崭嵘嵚嵛嵝嵴巅巩巯币帅师帏帐帘帜带帧帮帱帻帼幂幞干并广庄庆庐庑库应庙庞废庼廪开异弃张弥弪弯弹强归当录彟彦彻径徕御忆忏忧忾怀态怂怃怄怅怆怜总怼怿恋恳恶恸恹恺恻恼恽悦悫悬悭悯惊惧惨惩惫惬惭惮惯愍愠愤愦愿慑慭憷懑懒懔戆戋戏戗战戬户扎扑扦执扩扪扫扬扰抚抛抟抠抡抢护报担拟拢拣拥拦拧拨择挂挚挛挜挝挞挟挠挡挢挣挤挥挦捞损捡换捣据捻掳掴掷掸掺掼揸揽揿搀搁搂搅携摄摅摆摇摈摊撄撑撵撷撸撺擞攒敌敛数斋斓斗斩断无旧时旷旸昙昼昽显晋晒晓晔晕晖暂暧札术朴机杀杂权条来杨杩杰极构枞枢枣枥枧枨枪枫枭柜柠柽栀栅标栈栉栊栋栌栎栏树栖样栾桊桠桡桢档桤桥桦桧桨桩梦梼梾检棂椁椟椠椤椭楼榄榇榈榉槚槛槟槠横樯樱橥橱橹橼檐檩欢欤欧歼殁殇残殒殓殚殡殴毁毂毕毙毡毵氇气氢氩氲汇汉污汤汹沓沟没沣沤沥沦沧沨沩沪沵泞泪泶泷泸泺泻泼泽泾洁洒洼浃浅浆浇浈浉浊测浍济浏浐浑浒浓浔浕涂涌涛涝涞涟涠涡涢涣涤润涧涨涩淀渊渌渍渎渐渑渔渖渗温游湾湿溃溅溆溇滗滚滞滟滠满滢滤滥滦滨滩滪漤潆潇潋潍潜潴澜濑濒灏灭灯灵灾灿炀炉炖炜炝点炼炽烁烂烃烛烟烦烧烨烩烫烬热焕焖焘煅煳熘爱爷牍牦牵牺犊犟状犷犸犹狈狍狝狞独狭狮狯狰狱狲猃猎猕猡猪猫猬献獭玑玙玚玛玮环现玱玺珉珏珐珑珰珲琎琏琐琼瑶瑷璇璎瓒瓮瓯电画畅畲畴疖疗疟疠疡疬疮疯疱疴痈痉痒痖痨痪痫痴瘅瘆瘗瘘瘪瘫瘾瘿癞癣癫癯皑皱皲盏盐监盖盗盘眍眦眬着睁睐睑瞒瞩矫矶矾矿砀码砖砗砚砜砺砻砾础硁硅硕硖硗硙硚确硷碍碛碜碱碹磙礼祎祢祯祷祸禀禄禅离秃秆种积称秽秾稆税稣稳穑穷窃窍窑窜窝窥窦窭竖竞笃笋笔笕笺笼笾筑筚筛筜筝筹签简箓箦箧箨箩箪箫篑篓篮篱簖籁籴类籼粜粝粤粪粮糁糇紧絷纟纠纡红纣纤纥约级纨纩纪纫纬纭纮纯纰纱纲纳纴纵纶纷纸纹纺纻纼纽纾线绀绁绂练组绅细织终绉绊绋绌绍绎经绐绑绒结绔绕绖绗绘给绚绛络绝绞统绠绡绢绣绤绥绦继绨绩绪绫绬续绮绯绰绱绲绳维绵绶绷绸绹绺绻综绽绾绿缀缁缂缃缄缅缆缇缈缉缊缋缌缍缎缏缐缑缒缓缔缕编缗缘缙缚缛缜缝缞缟缠缡缢缣缤缥缦缧缨缩缪缫缬缭缮缯缰缱缲缳缴缵罂网罗罚罢罴羁羟羡翘翙翚耢耧耸耻聂聋职聍联聩聪肃肠肤肷肾肿胀胁胆胜胧胨胪胫胶脉脍脏脐脑脓脔脚脱脶脸腊腌腘腭腻腼腽腾膑臜舆舣舰舱舻艰艳艹艺节芈芗芜芦苁苇苈苋苌苍苎苏苘苹茎茏茑茔茕茧荆荐荙荚荛荜荞荟荠荡荣荤荥荦荧荨荩荪荫荬荭荮药莅莜莱莲莳莴莶获莸莹莺莼萚萝萤营萦萧萨葱蒇蒉蒋蒌蓝蓟蓠蓣蓥蓦蔷蔹蔺蔼蕲蕴薮藁藓虏虑虚虫虬虮虽虾虿蚀蚁蚂蚕蚝蚬蛊蛎蛏蛮蛰蛱蛲蛳蛴蜕蜗蜡蝇蝈蝉蝎蝼蝾螀螨蟏衅衔补衬衮袄袅袆袜袭袯装裆裈裢裣裤裥褛褴襁襕见观觃规觅视觇览觉觊觋觌觍觎觏觐觑觞触觯詟誉誊讠计订讣认讥讦讧讨让讪讫训议讯记讱讲讳讴讵讶讷许讹论讻讼讽设访诀证诂诃评诅识诇诈诉诊诋诌词诎诏诐译诒诓诔试诖诗诘诙诚诛诜话诞诟诠诡询诣诤该详诧诨诩诪诫诬语诮误诰诱诲诳说诵诶请诸诹诺读诼诽课诿谀谁谂调谄谅谆谇谈谊谋谌谍谎谏谐谑谒谓谔谕谖谗谘谙谚谛谜谝谞谟谠谡谢谣谤谥谦谧谨谩谪谫谬谭谮谯谰谱谲谳谴谵谶谷豮贝贞负贠贡财责贤败账货质贩贪贫贬购贮贯贰贱贲贳贴贵贶贷贸费贺贻贼贽贾贿赀赁赂赃资赅赆赇赈赉赊赋赌赍赎赏赐赑赒赓赔赕赖赗赘赙赚赛赜赝赞赟赠赡赢赣赪赵赶趋趱趸跃跄跖跞践跶跷跸跹跻踊踌踪踬踯蹑蹒蹰蹿躏躜躯车轧轨轩轪轫转轭轮软轰轱轲轳轴轵轶轷轸轹轺轻轼载轾轿辀辁辂较辄辅辆辇辈辉辊辋辌辍辎辏辐辑辒输辔辕辖辗辘辙辚辞辩辫边辽达迁过迈运还这进远违连迟迩迳迹适选逊递逦逻遗遥邓邝邬邮邹邺邻郁郄郏郐郑郓郦郧郸酝酦酱酽酾酿释里鉅鉴銮錾钆钇针钉钊钋钌钍钎钏钐钑钒钓钔钕钖钗钘钙钚钛钝钞钟钠钡钢钣钤钥钦钧钨钩钪钫钬钭钮钯钰钱钲钳钴钵钶钷钸钹钺钻钼钽钾钿铀铁铂铃铄铅铆铈铉铊铋铍铎铏铐铑铒铕铗铘铙铚铛铜铝铞铟铠铡铢铣铤铥铦铧铨铪铫铬铭铮铯铰铱铲铳铴铵银铷铸铹铺铻铼铽链铿销锁锂锃锄锅锆锇锈锉锊锋锌锍锎锏锐锑锒锓锔锕锖锗错锚锜锞锟锠锡锢锣锤锥锦锨锩锫锬锭键锯锰锱锲锳锴锵锶锷锸锹锺锻锼锽锾锿镀镁镂镃镆镇镈镉镊镌镍镎镏镐镑镒镕镖镗镙镚镛镜镝镞镟镠镡镢镣镤镥镦镧镨镩镪镫镬镭镮镯镰镱镲镳镴镶长门闩闪闫闬闭问闯闰闱闲闳间闵闶闷闸闹闺闻闼闽闾闿阀阁阂阃阄阅阆阇阈阉阊阋阌阍阎阏阐阑阒阓阔阕阖阗阘阙阚阛队阳阴阵阶际陆陇陈陉陕陧陨险随隐隶隽难雏雠雳雾霁霉霭靓静靥鞑鞒鞯鞴韦韧韨韩韪韫韬韵页顶顷顸项顺须顼顽顾顿颀颁颂颃预颅领颇颈颉颊颋颌颍颎颏颐频颒颓颔颕颖颗题颙颚颛颜额颞颟颠颡颢颣颤颥颦颧风飏飐飑飒飓飔飕飖飗飘飙飚飞飨餍饤饥饦饧饨饩饪饫饬饭饮饯饰饱饲饳饴饵饶饷饸饹饺饻饼饽饾饿馀馁馂馃馄馅馆馇馈馉馊馋馌馍馎馏馐馑馒馓馔馕马驭驮驯驰驱驲驳驴驵驶驷驸驹驺驻驼驽驾驿骀骁骂骃骄骅骆骇骈骉骊骋验骍骎骏骐骑骒骓骔骕骖骗骘骙骚骛骜骝骞骟骠骡骢骣骤骥骦骧髅髋髌鬓魇魉鱼鱽鱾鱿鲀鲁鲂鲄鲅鲆鲇鲈鲉鲊鲋鲌鲍鲎鲏鲐鲑鲒鲓鲔鲕鲖鲗鲘鲙鲚鲛鲜鲝鲞鲟鲠鲡鲢鲣鲤鲥鲦鲧鲨鲩鲪鲫鲬鲭鲮鲯鲰鲱鲲鲳鲴鲵鲶鲷鲸鲹鲺鲻鲼鲽鲾鲿鳀鳁鳂鳃鳄鳅鳆鳇鳈鳉鳊鳋鳌鳍鳎鳏鳐鳑鳒鳓鳔鳕鳖鳗鳘鳙鳛鳜鳝鳞鳟鳠鳡鳢鳣鸟鸠鸡鸢鸣鸤鸥鸦鸧鸨鸩鸪鸫鸬鸭鸮鸯鸰鸱鸲鸳鸴鸵鸶鸷鸸鸹鸺鸻鸼鸽鸾鸿鹀鹁鹂鹃鹄鹅鹆鹇鹈鹉鹊鹋鹌鹍鹎鹏鹐鹑鹒鹓鹔鹕鹖鹗鹘鹚鹛鹜鹝鹞鹟鹠鹡鹢鹣鹤鹥鹦鹧鹨鹩鹪鹫鹬鹭鹯鹰鹱鹲鹳鹴鹾麦麸黄黉黡黩黪黾鼋鼌鼍鼗鼹齄齐齑齿龀龁龂龃龄龅龆龇龈龉龊龋龌龙龚龛龟志制咨只里系范松没尝尝闹面准钟别闲干尽脏拼" -TRADITION = "萬與醜專業叢東絲丟兩嚴喪個爿豐臨為麗舉麼義烏樂喬習鄉書買亂爭於虧雲亙亞產畝親褻嚲億僅從侖倉儀們價眾優夥會傴傘偉傳傷倀倫傖偽佇體餘傭僉俠侶僥偵側僑儈儕儂俁儔儼倆儷儉債傾傯僂僨償儻儐儲儺兒兌兗黨蘭關興茲養獸囅內岡冊寫軍農塚馮衝決況凍淨淒涼淩減湊凜幾鳳鳧憑凱擊氹鑿芻劃劉則剛創刪別剗剄劊劌剴劑剮劍剝劇勸辦務勱動勵勁勞勢勳猛勩勻匭匱區醫華協單賣盧鹵臥衛卻巹廠廳曆厲壓厭厙廁廂厴廈廚廄廝縣參靉靆雙發變敘疊葉號歎嘰籲後嚇呂嗎唚噸聽啟吳嘸囈嘔嚦唄員咼嗆嗚詠哢嚨嚀噝吒噅鹹呱響啞噠嘵嗶噦嘩噲嚌噥喲嘜嗊嘮啢嗩唕喚呼嘖嗇囀齧囉嘽嘯噴嘍嚳囁嗬噯噓嚶囑嚕劈囂謔團園囪圍圇國圖圓聖壙場阪壞塊堅壇壢壩塢墳墜壟壟壚壘墾坰堊墊埡墶壋塏堖塒塤堝墊垵塹墮壪牆壯聲殼壺壼處備復夠頭誇夾奪奩奐奮獎奧妝婦媽嫵嫗媯姍薑婁婭嬈嬌孌娛媧嫻嫿嬰嬋嬸媼嬡嬪嬙嬤孫學孿寧寶實寵審憲宮寬賓寢對尋導壽將爾塵堯尷屍盡層屭屜屆屬屢屨嶼歲豈嶇崗峴嶴嵐島嶺嶽崠巋嶨嶧峽嶢嶠崢巒嶗崍嶮嶄嶸嶔崳嶁脊巔鞏巰幣帥師幃帳簾幟帶幀幫幬幘幗冪襆幹並廣莊慶廬廡庫應廟龐廢廎廩開異棄張彌弳彎彈強歸當錄彠彥徹徑徠禦憶懺憂愾懷態慫憮慪悵愴憐總懟懌戀懇惡慟懨愷惻惱惲悅愨懸慳憫驚懼慘懲憊愜慚憚慣湣慍憤憒願懾憖怵懣懶懍戇戔戲戧戰戩戶紮撲扡執擴捫掃揚擾撫拋摶摳掄搶護報擔擬攏揀擁攔擰撥擇掛摯攣掗撾撻挾撓擋撟掙擠揮撏撈損撿換搗據撚擄摑擲撣摻摜摣攬撳攙擱摟攪攜攝攄擺搖擯攤攖撐攆擷擼攛擻攢敵斂數齋斕鬥斬斷無舊時曠暘曇晝曨顯晉曬曉曄暈暉暫曖劄術樸機殺雜權條來楊榪傑極構樅樞棗櫪梘棖槍楓梟櫃檸檉梔柵標棧櫛櫳棟櫨櫟欄樹棲樣欒棬椏橈楨檔榿橋樺檜槳樁夢檮棶檢欞槨櫝槧欏橢樓欖櫬櫚櫸檟檻檳櫧橫檣櫻櫫櫥櫓櫞簷檁歡歟歐殲歿殤殘殞殮殫殯毆毀轂畢斃氈毿氌氣氫氬氳彙漢汙湯洶遝溝沒灃漚瀝淪滄渢溈滬濔濘淚澩瀧瀘濼瀉潑澤涇潔灑窪浹淺漿澆湞溮濁測澮濟瀏滻渾滸濃潯濜塗湧濤澇淶漣潿渦溳渙滌潤澗漲澀澱淵淥漬瀆漸澠漁瀋滲溫遊灣濕潰濺漵漊潷滾滯灩灄滿瀅濾濫灤濱灘澦濫瀠瀟瀲濰潛瀦瀾瀨瀕灝滅燈靈災燦煬爐燉煒熗點煉熾爍爛烴燭煙煩燒燁燴燙燼熱煥燜燾煆糊溜愛爺牘犛牽犧犢強狀獷獁猶狽麅獮獰獨狹獅獪猙獄猻獫獵獼玀豬貓蝟獻獺璣璵瑒瑪瑋環現瑲璽瑉玨琺瓏璫琿璡璉瑣瓊瑤璦璿瓔瓚甕甌電畫暢佘疇癤療瘧癘瘍鬁瘡瘋皰屙癰痙癢瘂癆瘓癇癡癉瘮瘞瘺癟癱癮癭癩癬癲臒皚皺皸盞鹽監蓋盜盤瞘眥矓著睜睞瞼瞞矚矯磯礬礦碭碼磚硨硯碸礪礱礫礎硜矽碩硤磽磑礄確鹼礙磧磣堿镟滾禮禕禰禎禱禍稟祿禪離禿稈種積稱穢穠穭稅穌穩穡窮竊竅窯竄窩窺竇窶豎競篤筍筆筧箋籠籩築篳篩簹箏籌簽簡籙簀篋籜籮簞簫簣簍籃籬籪籟糴類秈糶糲粵糞糧糝餱緊縶糸糾紆紅紂纖紇約級紈纊紀紉緯紜紘純紕紗綱納紝縱綸紛紙紋紡紵紖紐紓線紺絏紱練組紳細織終縐絆紼絀紹繹經紿綁絨結絝繞絰絎繪給絢絳絡絕絞統綆綃絹繡綌綏絛繼綈績緒綾緓續綺緋綽緔緄繩維綿綬繃綢綯綹綣綜綻綰綠綴緇緙緗緘緬纜緹緲緝縕繢緦綞緞緶線緱縋緩締縷編緡緣縉縛縟縝縫縗縞纏縭縊縑繽縹縵縲纓縮繆繅纈繚繕繒韁繾繰繯繳纘罌網羅罰罷羆羈羥羨翹翽翬耮耬聳恥聶聾職聹聯聵聰肅腸膚膁腎腫脹脅膽勝朧腖臚脛膠脈膾髒臍腦膿臠腳脫腡臉臘醃膕齶膩靦膃騰臏臢輿艤艦艙艫艱豔艸藝節羋薌蕪蘆蓯葦藶莧萇蒼苧蘇檾蘋莖蘢蔦塋煢繭荊薦薘莢蕘蓽蕎薈薺蕩榮葷滎犖熒蕁藎蓀蔭蕒葒葤藥蒞蓧萊蓮蒔萵薟獲蕕瑩鶯蓴蘀蘿螢營縈蕭薩蔥蕆蕢蔣蔞藍薊蘺蕷鎣驀薔蘞藺藹蘄蘊藪槁蘚虜慮虛蟲虯蟣雖蝦蠆蝕蟻螞蠶蠔蜆蠱蠣蟶蠻蟄蛺蟯螄蠐蛻蝸蠟蠅蟈蟬蠍螻蠑螿蟎蠨釁銜補襯袞襖嫋褘襪襲襏裝襠褌褳襝褲襇褸襤繈襴見觀覎規覓視覘覽覺覬覡覿覥覦覯覲覷觴觸觶讋譽謄訁計訂訃認譏訐訌討讓訕訖訓議訊記訒講諱謳詎訝訥許訛論訩訟諷設訪訣證詁訶評詛識詗詐訴診詆謅詞詘詔詖譯詒誆誄試詿詩詰詼誠誅詵話誕詬詮詭詢詣諍該詳詫諢詡譸誡誣語誚誤誥誘誨誑說誦誒請諸諏諾讀諑誹課諉諛誰諗調諂諒諄誶談誼謀諶諜謊諫諧謔謁謂諤諭諼讒諮諳諺諦謎諞諝謨讜謖謝謠謗諡謙謐謹謾謫譾謬譚譖譙讕譜譎讞譴譫讖穀豶貝貞負貟貢財責賢敗賬貨質販貪貧貶購貯貫貳賤賁貰貼貴貺貸貿費賀貽賊贄賈賄貲賃賂贓資賅贐賕賑賚賒賦賭齎贖賞賜贔賙賡賠賧賴賵贅賻賺賽賾贗讚贇贈贍贏贛赬趙趕趨趲躉躍蹌蹠躒踐躂蹺蹕躚躋踴躊蹤躓躑躡蹣躕躥躪躦軀車軋軌軒軑軔轉軛輪軟轟軲軻轤軸軹軼軤軫轢軺輕軾載輊轎輈輇輅較輒輔輛輦輩輝輥輞輬輟輜輳輻輯轀輸轡轅轄輾轆轍轔辭辯辮邊遼達遷過邁運還這進遠違連遲邇逕跡適選遜遞邐邏遺遙鄧鄺鄔郵鄒鄴鄰鬱郤郟鄶鄭鄆酈鄖鄲醞醱醬釅釃釀釋裏钜鑒鑾鏨釓釔針釘釗釙釕釷釺釧釤鈒釩釣鍆釹鍚釵鈃鈣鈈鈦鈍鈔鍾鈉鋇鋼鈑鈐鑰欽鈞鎢鉤鈧鈁鈥鈄鈕鈀鈺錢鉦鉗鈷缽鈳鉕鈽鈸鉞鑽鉬鉭鉀鈿鈾鐵鉑鈴鑠鉛鉚鈰鉉鉈鉍鈹鐸鉶銬銠鉺銪鋏鋣鐃銍鐺銅鋁銱銦鎧鍘銖銑鋌銩銛鏵銓鉿銚鉻銘錚銫鉸銥鏟銃鐋銨銀銣鑄鐒鋪鋙錸鋱鏈鏗銷鎖鋰鋥鋤鍋鋯鋨鏽銼鋝鋒鋅鋶鐦鐧銳銻鋃鋟鋦錒錆鍺錯錨錡錁錕錩錫錮鑼錘錐錦鍁錈錇錟錠鍵鋸錳錙鍥鍈鍇鏘鍶鍔鍤鍬鍾鍛鎪鍠鍰鎄鍍鎂鏤鎡鏌鎮鎛鎘鑷鐫鎳鎿鎦鎬鎊鎰鎔鏢鏜鏍鏰鏞鏡鏑鏃鏇鏐鐔钁鐐鏷鑥鐓鑭鐠鑹鏹鐙鑊鐳鐶鐲鐮鐿鑔鑣鑞鑲長門閂閃閆閈閉問闖閏闈閑閎間閔閌悶閘鬧閨聞闥閩閭闓閥閣閡閫鬮閱閬闍閾閹閶鬩閿閽閻閼闡闌闃闠闊闋闔闐闒闕闞闤隊陽陰陣階際陸隴陳陘陝隉隕險隨隱隸雋難雛讎靂霧霽黴靄靚靜靨韃鞽韉韝韋韌韍韓韙韞韜韻頁頂頃頇項順須頊頑顧頓頎頒頌頏預顱領頗頸頡頰頲頜潁熲頦頤頻頮頹頷頴穎顆題顒顎顓顏額顳顢顛顙顥纇顫顬顰顴風颺颭颮颯颶颸颼颻飀飄飆飆飛饗饜飣饑飥餳飩餼飪飫飭飯飲餞飾飽飼飿飴餌饒餉餄餎餃餏餅餑餖餓餘餒餕餜餛餡館餷饋餶餿饞饁饃餺餾饈饉饅饊饌饢馬馭馱馴馳驅馹駁驢駔駛駟駙駒騶駐駝駑駕驛駘驍罵駰驕驊駱駭駢驫驪騁驗騂駸駿騏騎騍騅騌驌驂騙騭騤騷騖驁騮騫騸驃騾驄驏驟驥驦驤髏髖髕鬢魘魎魚魛魢魷魨魯魴魺鮁鮃鯰鱸鮋鮓鮒鮊鮑鱟鮍鮐鮭鮚鮳鮪鮞鮦鰂鮜鱠鱭鮫鮮鮺鯗鱘鯁鱺鰱鰹鯉鰣鰷鯀鯊鯇鮶鯽鯒鯖鯪鯕鯫鯡鯤鯧鯝鯢鯰鯛鯨鯵鯴鯔鱝鰈鰏鱨鯷鰮鰃鰓鱷鰍鰒鰉鰁鱂鯿鰠鼇鰭鰨鰥鰩鰟鰜鰳鰾鱈鱉鰻鰵鱅鰼鱖鱔鱗鱒鱯鱤鱧鱣鳥鳩雞鳶鳴鳲鷗鴉鶬鴇鴆鴣鶇鸕鴨鴞鴦鴒鴟鴝鴛鴬鴕鷥鷙鴯鴰鵂鴴鵃鴿鸞鴻鵐鵓鸝鵑鵠鵝鵒鷳鵜鵡鵲鶓鵪鶤鵯鵬鵮鶉鶊鵷鷫鶘鶡鶚鶻鶿鶥鶩鷊鷂鶲鶹鶺鷁鶼鶴鷖鸚鷓鷚鷯鷦鷲鷸鷺鸇鷹鸌鸏鸛鸘鹺麥麩黃黌黶黷黲黽黿鼂鼉鞀鼴齇齊齏齒齔齕齗齟齡齙齠齜齦齬齪齲齷龍龔龕龜誌製谘隻裡係範鬆冇嚐嘗鬨麵準鐘彆閒乾儘臟拚" - - -def to_tradition_string(s: str): - output_str_list = [] - str_len = len(s) - - for i in range(str_len): - found_index = SIMPLE.find(s[i]) - - if not (found_index == -1): - output_str_list.append(TRADITION[found_index]) - else: - output_str_list.append(s[i]) - - return "".join(output_str_list) - - -def to_simple_string(s: str): - output_str_list = [] - str_len = len(s) - - for i in range(str_len): - found_index = TRADITION.find(s[i]) - - if not (found_index == -1): - output_str_list.append(SIMPLE[found_index]) - else: - output_str_list.append(s[i]) - - return "".join(output_str_list) diff --git a/ATRI/utils/ub_paste.py b/ATRI/utils/ub_paste.py deleted file mode 100644 index 56b8261..0000000 --- a/ATRI/utils/ub_paste.py +++ /dev/null @@ -1,11 +0,0 @@ -from aiohttp import ClientSession - - -URL = "https://paste.ubuntu.com/" - - -async def paste(form_data) -> str: - async with ClientSession() as session: - async with session.post(url=URL, data=form_data) as r: - result = str(r.url).replace("https://", "") - return result diff --git a/ATRI/utils/yaml.py b/ATRI/utils/yaml.py deleted file mode 100644 index 1ad7377..0000000 --- a/ATRI/utils/yaml.py +++ /dev/null @@ -1,10 +0,0 @@ -import yaml - -from pathlib import Path - - -def load_yml(file: Path, encoding="utf-8") -> dict: - """加载 yml 文件""" - with open(file, "r", encoding=encoding) as f: - data = yaml.safe_load(f) - return data @@ -24,17 +24,17 @@ 实现方式为 `go-cqhttp 或其它遵守Onebot标准的协议` + `NoneBot2`。 -由于项目特殊性,故随机进行更新。更新日志:[传送门](changelog.md) +由于项目特殊性,会随机进行更新。更新日志:请关注commit ### 功能列表 -请在群内对机器人发送`/help`以获取帮助 +请在群内at机器人发送`菜单`以获取帮助 **TODO**: - [ ] 网页控制台 - [ ] RSS订阅 - [ ] B站动态订阅 - [ ] 冷重启 - - [ ] 词库(搭配nlp) + - [x] 词库(搭配nlp) ### 特别感谢 [Richard Chien](https://github.com/richardchien): [Onebot标准](https://github.com/howmanybots/onebot) @@ -54,7 +54,7 @@ · AfdianUser_quGy 5.0CNY<br> · 1752179928 56.14CNY<br> · Mikasa 66.0CNY<br> - · SkipM4 27.0CNY<br> + · SkipM4 32.0CNY<br> · Chunk7 33.0CNY<br> · Wwwwwwalnut 10.0CNY<br> · 演变 5.0CNY<br> @@ -8,22 +8,5 @@ BotSelfConfig: command_sep: ["."] session_expire_timeout: 60 -NetworkPost: - host: "127.0.0.1" - port: 20001 - -AdminPage: # 待定,无用 - host: "127.0.0.1" - port: 20002 - -NsfwCheck: # 详细请查看文档以获取帮助 - enabled: false - passing_rate: 85 - host: "127.0.0.1" - port: 20003 - SauceNAO: key: "" - -Setu: # https://api.lolicon.app/#/setu - key: ""
\ No newline at end of file |