diff options
Diffstat (limited to 'ATRI/plugins')
48 files changed, 2550 insertions, 2804 deletions
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py index 6bb11e8..de64501 100644 --- a/ATRI/plugins/anime_search.py +++ b/ATRI/plugins/anime_search.py @@ -1,65 +1,48 @@ import re -import json from aiohttp import FormData +from random import choice -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp.message import Message, MessageSegment -from ATRI.service import Service as sv +from ATRI.service import Service from ATRI.rule import is_in_service +from ATRI.utils import request, UbuntuPaste, Translate +from ATRI.utils.limit import FreqLimiter from ATRI.exceptions import RequestError -from ATRI.utils.request import get_bytes -from ATRI.utils.translate import to_simple_string -from ATRI.utils.ub_paste import paste URL = "https://trace.moe/api/search?url=" +_anime_flmt = FreqLimiter(10) +_anime_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) __doc__ = """ -以图搜番 -权限组:所有人 -用法: - 以图搜番 (pic) +通过一张图片搜索你需要的番!据说里*也可以 """ -anime_search = sv.on_command(cmd="以图搜番", docs=__doc__, rule=is_in_service("以图搜番")) - - -@anime_search.args_parser # type: ignore -async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) - quit_list = ["算了", "罢了", "不搜了", "取消"] - if msg in quit_list: - await anime_search.finish("好吧...") - if not msg: - await anime_search.reject("图呢?") - else: - state["pic_anime"] = msg - - -@anime_search.handle() -async def _anime_search(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["pic_anime"] = msg - - -@anime_search.got("pic_anime", prompt="图呢?") -async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["pic_anime"] - img = re.findall(r"url=(.*?)]", msg) - if not img: - await anime_search.reject("请发送图片而不是其它东西!!") - - try: - req = await get_bytes(URL + img[0]) - except RequestError: - raise RequestError("Request failed!") - data = json.loads(req)["docs"] - try: +class Anime(Service): + + def __init__(self): + Service.__init__(self, "以图搜番", __doc__, rule=is_in_service("以图搜番")) + + @staticmethod + async def _request(url: str) -> dict: + aim = URL + url + try: + res = await request.get(aim) + except RequestError: + raise RequestError("Request failed!") + result = await res.json() + return result + + @classmethod + async def search(cls, url: str) -> str: + data = await cls._request(url) + data = data["docs"] + d = dict() for i in range(len(data)): if data[i]["title_chinese"] in d.keys(): @@ -73,38 +56,70 @@ async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None: else: n = data[i]["episode"] - d[to_simple_string(data[i]["title_chinese"])] = [ + d[Translate(data[i]["title_chinese"]).to_simple()] = [ data[i]["similarity"], f"第{n}集", f"{int(m)}分{int(s)}秒处", ] - except Exception as err: - raise Exception(f"Invalid data.\n{err}") - - result = sorted(d.items(), key=lambda x: x[1], reverse=True) - - t = 0 - - msg0 = f"> {event.sender.nickname}" - for i in result: - t += 1 - s = "%.2f%%" % (i[1][0] * 100) - msg0 = msg0 + ( - "\n——————————\n" - f"({t}) Similarity: {s}\n" - f"Name: {i[0]}\n" - f"Time: {i[1][1]} {i[1][2]}" - ) - - if len(result) == 2: - await anime_search.finish(Message(msg0)) + + result = sorted(d.items(), key=lambda x: x[1], reverse=True) + t = 0 + msg0 = str() + for i in result: + t += 1 + s = "%.2f%%" % (i[1][0] * 100) + msg0 = msg0 + ( + "\n——————————\n" + f"({t}) Similarity: {s}\n" + f"Name: {i[0]}\n" + f"Time: {i[1][1]} {i[1][2]}" + ) + + if len(result) == 2: + return msg0 + else: + data = FormData() + data.add_field("poster", "ATRI running log") + data.add_field("syntax", "text") + data.add_field("expiration", "day") + data.add_field("content", msg0) + + repo = f"详细请移步此处~\n{await UbuntuPaste(data).paste()}" + return repo + + +anime_search = Anime().on_command("以图搜番", "发送一张图以搜索可能的番剧") + +@anime_search.args_parser # type: ignore +async def _get_anime(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了", "不搜了", "取消"] + if msg in quit_list: + await anime_search.finish("好吧...") + if not msg: + await anime_search.reject("图呢?") else: - data = FormData() - data.add_field("poster", "ATRI running log") - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", msg0) - - repo = f"> {event.sender.nickname}\n" - repo = repo + f"详细请移步此处~\n{await paste(data)}" - await anime_search.finish(repo) + state["anime"] = msg + +@anime_search.handle() +async def _ready_sear(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _anime_flmt.check(user_id): + await anime_search.finish(_anime_flmt_notice) + + msg = str(event.message).strip() + if msg: + state["anime"] = msg + +@anime_search.got("anime", "图呢?") +async def _deal_sear(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + msg = state["anime"] + img = re.findall(r"url=(.*?)]", msg) + if not img: + await anime_search.reject("请发送图片而不是其它东西!!") + + a = await Anime().search(img[0]) + result = f"> {MessageSegment.at(user_id)}\n" + a + _anime_flmt.start_cd(user_id) + await anime_search.finish(Message(result)) diff --git a/ATRI/plugins/call_owner.py b/ATRI/plugins/call_owner.py deleted file mode 100644 index bcbae73..0000000 --- a/ATRI/plugins/call_owner.py +++ /dev/null @@ -1,78 +0,0 @@ -from nonebot.permission import SUPERUSER -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.config import BotSelfConfig -from ATRI.utils.apscheduler import scheduler -from ATRI.utils.list import count_list - - -repo_list = [] -__doc__ = """ -给维护者留言 -权限组:所有人 -用法: - 来杯红茶 (msg) -""" - -repo = sv.on_command(cmd="来杯红茶", docs=__doc__) - - [email protected]_parser # type: ignore -async def _repo_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) - if msg == "算了": - await repo.finish("好吧") - - if not msg: - await repo.reject("话呢?") - else: - state["msg_repo"] = msg - - -async def _repo(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["msg_repo"] = msg - - [email protected]("msg_repo", prompt="请告诉咱需要反馈的内容~!") -async def _repo_deal(bot: Bot, event: MessageEvent, state: T_State) -> None: - global repo_list - msg = state["msg_repo"] - user = event.user_id - - if count_list(repo_list, user) == 5: - await repo.finish("吾辈已经喝了五杯红茶啦!明天再来吧。") - - repo_list.append(user) - - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}") - - await repo.finish("吾辈的心愿已由咱转告给咱的维护者了~!") - - [email protected]_job("cron", hour=0, misfire_grace_time=60) -async def _() -> None: - global repo_list - repo_list.clear() - - -__doc__ = """ -重置给维护者的留言次数 -权限组:维护者 -用法: - /重置红茶 -""" - -reset_repo = sv.on_command(cmd="重置红茶", docs=__doc__, permission=SUPERUSER) - - -@reset_repo.handle() -async def _reset_repo(bot: Bot, event: MessageEvent) -> None: - global repo_list - repo_list.clear() - await reset_repo.finish("红茶重置完成~!") diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py new file mode 100644 index 0000000..4864be5 --- /dev/null +++ b/ATRI/plugins/chat/__init__.py @@ -0,0 +1,112 @@ +from random import choice + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.utils import CoolqCodeChecker +from ATRI.utils.limit import FreqLimiter +from ATRI.utils.apscheduler import scheduler +from .data_source import Chat + + +_chat_flmt = FreqLimiter(3) +_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) + + +chat = Chat().on_message("闲聊(文爱") + +async def _chat(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await chat.finish(_chat_flmt_notice) + + msg = str(event.message) + repo = await Chat().deal(msg, user_id) + _chat_flmt.start_cd(user_id) + await chat.finish(repo) + +my_name_is = Chat().on_command("叫我", "更改闲聊(划掉 文爱)时的称呼", aliases={"我是"}, priority=1) + +@my_name_is.args_parser # type: ignore +async def _get_name(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await my_name_is.finish("好吧...") + if not msg: + await my_name_is.reject("欧尼酱想让咱如何称呼你呢!0w0") + else: + state["name"] = msg + +@my_name_is.handle() +async def _name(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await my_name_is.finish(_chat_flmt_notice) + + msg = str(event.message).strip() + if msg: + state["name"] = msg + +@my_name_is.got("name") +async def _deal_name(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + new_name = state["name"] + repo = choice([ + f"好~w 那咱以后就称呼你为{new_name}!", + f"噢噢噢!原来你叫{new_name}阿~", + f"好欸!{new_name}ちゃん~~~", + "很不错的称呼呢w" + ]) + Chat().name_is(user_id, new_name) + _chat_flmt.start_cd(user_id) + await my_name_is.finish(repo) + +say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1) + [email protected]_parser # type: ignore +async def _get_say(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await say.finish("好吧...") + if not msg: + await say.reject("欧尼酱想让咱如何称呼你呢!0w0") + else: + state["say"] = msg + +async def _ready_say(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await say.finish(_chat_flmt_notice) + + msg = str(event.message) + if msg: + state["say"] = msg + [email protected]("say") +async def _deal_say(bot: Bot, event: MessageEvent, state: T_State): + msg = state["say"] + check = CoolqCodeChecker(msg).check + if not check: + repo = choice([ + "不要...", + "这个咱不想复读!", + "不可以", + "不好!" + ]) + await say.finish(repo) + + user_id = event.get_user_id() + _chat_flmt.start_cd(user_id) + await say.finish(msg) + + [email protected]_job("interval", hours=3, misfire_grace_time=60) +async def _check_kimo(): + try: + await Chat().update_data() + except BaseException: + pass diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/chat/data_source.py new file mode 100644 index 0000000..1b0eec9 --- /dev/null +++ b/ATRI/plugins/chat/data_source.py @@ -0,0 +1,139 @@ +import os +import json +from pathlib import Path +from jieba import posseg +from random import choice, shuffle + +from ATRI.service import Service +from ATRI.rule import to_bot, is_in_service +from ATRI.log import logger as log +from ATRI.utils import request +from ATRI.exceptions import ReadFileError, WriteError + + +__doc__ = """ +好像有点涩?(偏文爱,需at +""" + +CHAT_PATH = Path(".") / "ATRI" / "data" / "database" / "chat" +os.makedirs(CHAT_PATH, exist_ok=True) +KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" + + +class Chat(Service): + + def __init__(self): + Service.__init__(self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5) + + @staticmethod + async def _request(url: str) -> dict: + res = await request.get(url) + data = await res.json() + return data + + @classmethod + async def _generate_data(cls) -> None: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + log.warning("未检测到闲聊词库,生成中") + data = await cls._request(KIMO_URL) + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("生成完成") + except WriteError: + raise WriteError("Writing kimo words failed!") + + @classmethod + async def _load_data(cls) -> dict: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + with open(path, "r", encoding="utf-8") as r: + data = json.loads(r.read()) + return data + + @classmethod + async def update_data(cls) -> None: + log.info("更新闲聊词库ing...") + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + updata_data = await cls._request(KIMO_URL) + data = json.loads(path.read_bytes()) + for i in updata_data: + if i not in data: + data[i] = updata_data[i] + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("闲聊词库更新完成") + + @staticmethod + def name_is(user_id: str, new_name: str): + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = {} + + data = json.loads(path.read_bytes()) + data[user_id] = new_name + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + except ReadFileError: + raise ReadFileError("Update user name failed!") + + @staticmethod + def load_name(user_id: str) -> str: + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return "你" + + data = json.loads(path.read_bytes()) + try: + result = data[user_id] + except BaseException: + result = "你" + return result + + @classmethod + async def deal(cls, msg: str, user_id: str) -> str: + keywords = posseg.lcut(msg) + shuffle(keywords) + + data = await cls._load_data() + + repo = str() + for i in keywords: + a = i.word + b = list(a) + try: + if b[0] == b[1]: + a = b[0] + except BaseException: + pass + if a in data: + repo = data.get(a, str()) + + if not repo: + temp_data = list(data) + shuffle(temp_data) + for i in temp_data: + if i in msg: + repo = data.get(i, str()) + + a = choice(repo) if type(repo) is list else repo + user_name = cls.load_name(user_id) + repo = a.replace("你", user_name) + return repo diff --git a/ATRI/plugins/code_runner.py b/ATRI/plugins/code_runner.py deleted file mode 100644 index 363815f..0000000 --- a/ATRI/plugins/code_runner.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -Idea from: https://github.com/cczu-osa/aki -""" -import json -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.rule import is_in_service -from ATRI.service import Service as sv -from ATRI.utils.request import post_bytes -from ATRI.exceptions import RequestError - - -RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest" -SUPPORTED_LANGUAGES = { - "assembly": {"ext": "asm"}, - "bash": {"ext": "sh"}, - "c": {"ext": "c"}, - "clojure": {"ext": "clj"}, - "coffeescript": {"ext": "coffe"}, - "cpp": {"ext": "cpp"}, - "csharp": {"ext": "cs"}, - "erlang": {"ext": "erl"}, - "fsharp": {"ext": "fs"}, - "go": {"ext": "go"}, - "groovy": {"ext": "groovy"}, - "haskell": {"ext": "hs"}, - "java": {"ext": "java", "name": "Main"}, - "javascript": {"ext": "js"}, - "julia": {"ext": "jl"}, - "kotlin": {"ext": "kt"}, - "lua": {"ext": "lua"}, - "perl": {"ext": "pl"}, - "php": {"ext": "php"}, - "python": {"ext": "py"}, - "ruby": {"ext": "rb"}, - "rust": {"ext": "rs"}, - "scala": {"ext": "scala"}, - "swift": {"ext": "swift"}, - "typescript": {"ext": "ts"}, -} - - -__doc__ = """ -在线运行代码 -权限组:所有人 -用法: - /code (lang) (code) -示例: - /code python - print('Hello world!') -""" - -code_runner = sv.on_command(cmd="/code", docs=__doc__, rule=is_in_service("code")) - - -@code_runner.handle() -async def _code_runner(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split("\n") - - if msg[0] == "list": - msg0 = "咱现在支持的语言如下:\n" - msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys())) - - await code_runner.finish(msg0) - elif not msg[0]: - await code_runner.finish("请键入/help以获取更多支持...") - - laug = msg[0].replace("\r", "") - if laug not in SUPPORTED_LANGUAGES: - await code_runner.finish("该语言暂不支持...") - - del msg[0] - code = "\n".join(map(str, msg)) - try: - req = await post_bytes( - RUN_API_URL_FORMAT.format(laug), - json={ - "files": [ - { - "name": ( - SUPPORTED_LANGUAGES[laug].get("name", "main") - + f".{SUPPORTED_LANGUAGES[laug]['ext']}" - ), - "content": code, - } - ], - "stdin": "", - "command": "", - }, - ) - except RequestError: - raise RequestError("Failed to request!") - - payload = json.loads(req) - sent = False - for k in ["stdout", "stderr", "error"]: - v = payload.get(k) - lines = v.splitlines() - lines, remained_lines = lines[:10], lines[10:] - out = "\n".join(lines) - out, remained_out = out[: 60 * 10], out[60 * 10 :] - - if remained_lines or remained_out: - out += f"\n(太多了太多了...)" - - if out: - await bot.send(event, f"{k}:\n\n{out}") - sent = True - - if not sent: - await code_runner.finish("Running success! Nothing print.") diff --git a/ATRI/plugins/code_runner/__init__.py b/ATRI/plugins/code_runner/__init__.py new file mode 100644 index 0000000..dfe6162 --- /dev/null +++ b/ATRI/plugins/code_runner/__init__.py @@ -0,0 +1,35 @@ +from random import choice + +from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp.message import Message, MessageSegment + +from ATRI.utils.limit import FreqLimiter +from .data_source import CodeRunner + + +_flmt = FreqLimiter(5) +_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) + + +code_runner = CodeRunner().on_command("/code", "在线运行一段代码,帮助:/code help") + +@code_runner.handle() +async def _code_runner(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _flmt.check(user_id): + await code_runner.finish(_flmt_notice) + + msg = str(event.get_message()) + args = msg.split("\n") + + if not args: + content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!" + elif args[0] == "help": + content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().help() + elif args[0] == "list": + content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().list_supp_lang() + else: + content = MessageSegment.at(user_id) + await CodeRunner().runner(msg) + + _flmt.start_cd(user_id) + await code_runner.finish(Message(content)) diff --git a/ATRI/plugins/code_runner/data_source.py b/ATRI/plugins/code_runner/data_source.py new file mode 100644 index 0000000..bbe0d2e --- /dev/null +++ b/ATRI/plugins/code_runner/data_source.py @@ -0,0 +1,111 @@ +from ATRI.rule import is_in_service +from ATRI.service import Service +from ATRI.utils import request +from ATRI.exceptions import RequestError + + +RUN_API_URL_FORMAT = "https://glot.io/run/{}?version=latest" +SUPPORTED_LANGUAGES = { + "assembly": {"ext": "asm"}, + "bash": {"ext": "sh"}, + "c": {"ext": "c"}, + "clojure": {"ext": "clj"}, + "coffeescript": {"ext": "coffe"}, + "cpp": {"ext": "cpp"}, + "csharp": {"ext": "cs"}, + "erlang": {"ext": "erl"}, + "fsharp": {"ext": "fs"}, + "go": {"ext": "go"}, + "groovy": {"ext": "groovy"}, + "haskell": {"ext": "hs"}, + "java": {"ext": "java", "name": "Main"}, + "javascript": {"ext": "js"}, + "julia": {"ext": "jl"}, + "kotlin": {"ext": "kt"}, + "lua": {"ext": "lua"}, + "perl": {"ext": "pl"}, + "php": {"ext": "php"}, + "python": {"ext": "py"}, + "ruby": {"ext": "rb"}, + "rust": {"ext": "rs"}, + "scala": {"ext": "scala"}, + "swift": {"ext": "swift"}, + "typescript": {"ext": "ts"}, +} + + +__doc__ = """ +在线跑代码 +""" + + +class CodeRunner(Service): + + def __init__(self): + Service.__init__(self, "在线跑代码", __doc__, rule=is_in_service("在线跑代码")) + + @staticmethod + def help() -> str: + return ( + "/code {语言}\n" + "{代码}\n" + "For example:\n" + "/code python\n" + "print('hello world')" + ) + + @staticmethod + def list_supp_lang() -> str: + msg0 = "咱现在支持的语言如下:\n" + msg0 += ", ".join(map(str, SUPPORTED_LANGUAGES.keys())) + return msg0 + + @staticmethod + async def runner(msg: str): + args = msg.split("\n") + if not args: + return "请检查键入内容..." + + lang = args[0].replace("\r", "") + if lang not in SUPPORTED_LANGUAGES: + return "该语言暂不支持..." + + del args[0] + code = "\n".join(map(str, args)) + url = RUN_API_URL_FORMAT.format(lang) + js = { + "files": [ + { + "name": ( + SUPPORTED_LANGUAGES[lang].get("name", "main") + + f".{SUPPORTED_LANGUAGES[lang]['ext']}" + ), + "content": code, + } + ], + "stdin": "", + "command": "" + } + + try: + res = await request.post(url, json=js) + except RequestError: + raise RequestError("Request failed!") + + payload = await res.json() + sent = False + for k in ["stdout", "stderr", "error"]: + v = payload.get(k) + lines = v.splitlines() + lines, remained_lines = lines[:10], lines[10:] + out = "\n".join(lines) + out, remained_out = out[: 60 * 10], out[60 * 10 :] + + if remained_lines or remained_out: + out += f"\n(太多了太多了...)" + + if out: + return f"\n{k}:\n{out}" + + if not sent: + return "\n运行完成,没任何输出呢..."
\ No newline at end of file diff --git a/ATRI/plugins/curse.py b/ATRI/plugins/curse.py new file mode 100644 index 0000000..b2dbc05 --- /dev/null +++ b/ATRI/plugins/curse.py @@ -0,0 +1,57 @@ +from random import choice + +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.rule import is_in_service, to_bot +from ATRI.service import Service +from ATRI.utils import request +from ATRI.utils.limit import FreqLimiter + + +URL = "https://zuanbot.com/api.php?level=min&lang=zh_cn" + +_curse_flmt = FreqLimiter(3) +_curse_flmt_notice = choice(["我看你是找🔨是吧", "给我适可而止阿!?", "扎布多得了😅", "z?是m吗?我凑那也太恐怖了", "?"]) + + +__doc__ = """ +口臭!你急了你急了! +""" + + +class Curse(Service): + + def __init__(self): + Service.__init__(self, "口臭", __doc__, rule=is_in_service("口臭")) + + @staticmethod + async def now() -> str: + res = await request.get(URL) + result = await res.text # type: ignore + return result + + +normal_curse = Curse().on_command("口臭一下", "主命令,骂你一下", aliases={"骂我", "口臭"}, rule=to_bot()) + +@normal_curse.handle() +async def _deal_n_curse(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _curse_flmt.check(user_id): + await normal_curse.finish(_curse_flmt_notice) + + result = await Curse().now() + _curse_flmt.start_cd(user_id) + await normal_curse.finish(result) + + +super_curse = Curse().on_regex(r"[来求有](.*?)骂我吗?", "有求必应") + +@super_curse.handle() +async def _deal_s_curse(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _curse_flmt.check(user_id): + await normal_curse.finish(_curse_flmt_notice) + + result = await Curse().now() + _curse_flmt.start_cd(user_id) + await normal_curse.finish(result) diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py deleted file mode 100644 index 702e5ac..0000000 --- a/ATRI/plugins/curse/__init__.py +++ /dev/null @@ -1,42 +0,0 @@ -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service, to_bot -from ATRI.utils.list import count_list, del_list_aim -from ATRI.utils.request import get_text -from ATRI.exceptions import RequestError - - -URL = "https://zuanbot.com/api.php?level=min&lang=zh_cn" -sick_list = [] - - -__doc__ = """ -口臭一下 -权限组:所有人 -用法: - 口臭,口臭一下,骂我 -""" - -curse = sv.on_command( - cmd="口臭", docs=__doc__, aliases={"口臭一下,骂我"}, rule=is_in_service("口臭") & to_bot() -) - - -async def _curse(bot: Bot, event: MessageEvent) -> None: - global sick_list - user = event.get_user_id() - if count_list(sick_list, user) == 3: - sick_list.append(user) - repo = "不是??你这么想被咱骂的嘛??" "被咱骂就这么舒服的吗?!" "该......你该不会是.....M吧!" - await curse.finish(repo) - elif count_list(sick_list, user) == 6: - sick_list = del_list_aim(sick_list, user) - await curse.finish("给我适可而止阿!?") - else: - sick_list.append(user) - try: - await curse.finish(await get_text(URL)) - except RequestError: - raise RequestError("Time out!") diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 6a14f1a..5369c9b 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -1,16 +1,16 @@ import os -import time import json -import shutil -from pathlib import Path -from random import choice +import asyncio from datetime import datetime +from pydantic.main import BaseModel +from random import choice, randint +from pathlib import Path +import nonebot from nonebot.typing import T_State from nonebot.matcher import Matcher from nonebot.message import run_preprocessor from nonebot.exception import IgnoredException -from nonebot.adapters.cqhttp.message import Message from nonebot.adapters.cqhttp import ( Bot, MessageEvent, @@ -21,246 +21,224 @@ from nonebot.adapters.cqhttp import ( GroupDecreaseNoticeEvent, GroupAdminNoticeEvent, GroupBanNoticeEvent, - LuckyKingNotifyEvent, - GroupUploadNoticeEvent, GroupRecallNoticeEvent, FriendRecallNoticeEvent, ) import ATRI -from ATRI.log import logger -from ATRI.exceptions import WriteError +from ATRI.service import Service +from ATRI.log import logger as log +from ATRI.rule import is_in_service from ATRI.config import BotSelfConfig -from ATRI.service import Service as sv -from ATRI.utils.cqcode import coolq_code_check +from ATRI.utils import CoolqCodeChecker -PLUGIN_INFO_DIR = Path(".") / "ATRI" / "data" / "service" / "services" +driver = ATRI.driver() +bots = nonebot.get_bots() + ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" -os.makedirs(PLUGIN_INFO_DIR, exist_ok=True) +MANEGE_DIR = Path(".") / "ATRI" / "data" / "database" / "manege" os.makedirs(ESSENTIAL_DIR, exist_ok=True) - - -driver = ATRI.driver() +os.makedirs(MANEGE_DIR, exist_ok=True) @driver.on_startup -async def startup() -> None: - logger.info("アトリは、高性能ですから!") +async def startup(): + log.info("アトリは、高性能ですから!") @driver.on_shutdown -async def shutdown() -> None: - logger.info("Thanks for using.") - logger.debug("bot已停止运行,正在清理插件信息...") - try: - shutil.rmtree(PLUGIN_INFO_DIR) - logger.debug("成功!") - except Exception: - repo = ("清理插件信息失败", "请前往 ATRI/data/service/services 下", "将 services 整个文件夹删除") - time.sleep(10) - raise Exception(repo) - - [email protected]_bot_connect -async def connect(bot) -> None: - for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 成功连接,数据开始传输。") - - [email protected]_bot_disconnect -async def disconnect(bot) -> None: - for superuser in BotSelfConfig.superusers: - try: - await sv.NetworkPost.send_private_msg(int(superuser), "WebSocket 貌似断开了呢...") - except: - logger.error("WebSocket 已断开,等待重连") +async def shutdown(): + log.info("Thanks for using.") @run_preprocessor # type: ignore async def _check_block( matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State ) -> None: - user = str(event.user_id) - try: - msg = str(event.message) - except: - msg = "" - if not sv.BlockSystem.auth_user(user): - raise IgnoredException(f"Block user: {user}") - - if not sv.Dormant.is_dormant(): - if "/dormant" not in msg: - raise IgnoredException("Bot has been dormant.") + user_file = "block_user.json" + path = MANEGE_DIR / user_file + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + + user_id = event.get_user_id() + if user_id in data: + raise IgnoredException(f"Block user: {user_id}") if isinstance(event, GroupMessageEvent): - group = str(event.group_id) - if not sv.BlockSystem.auth_group(group): - raise IgnoredException(f"Block group: {group}") - + group_file = "block_group.json" + path = MANEGE_DIR / group_file + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + + group_id = str(event.group_id) + if group_id in data: + raise IgnoredException(f"Block group: {user_id}") -@run_preprocessor # type: ignore -async def _store_message(matcher: Matcher, bot: Bot, event, state: T_State) -> None: - if isinstance(event, GroupMessageEvent): - if event.sub_type == "normal": - now_time = datetime.now().strftime("%Y-%m-%d") - GROUP_DIR = ESSENTIAL_DIR / "chat_history" / f"{event.group_id}" - os.makedirs(GROUP_DIR, exist_ok=True) - path = GROUP_DIR / f"{now_time}.chat.json" - now_time = datetime.now().strftime("%Y%m%d-%H%M%S") - - try: - data = json.loads(path.read_bytes()) - except: - data = dict() - data[str(event.message_id)] = { - "date": now_time, - "time": str(time.time()), - "post_type": str(event.post_type), - "sub_type": str(event.sub_type), - "user_id": str(event.user_id), - "group_id": str(event.group_id), - "message_type": str(event.message_type), - "message": str(event.message), - "raw_message": event.raw_message, - "font": str(event.font), - "sender": { - "user_id": str(event.sender.user_id), - "nickname": event.sender.nickname, - "sex": event.sender.sex, - "age": str(event.sender.age), - "card": event.sender.card, - "area": event.sender.area, - "level": event.sender.level, - "role": event.sender.role, - "title": event.sender.title, - }, - "to_me": str(event.to_me), - } - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - logger.debug(f"写入消息成功,id: {event.message_id}") - except WriteError: - logger.error("消息记录失败,可能是缺少文件的原因!") - else: - pass - else: - pass - else: - pass +class FriendRequestInfo(BaseModel): + user_id: str + comment: str + time: str + is_approve: bool -# 处理:好友请求 -request_friend_event = sv.on_request() +class GroupRequestInfo(BaseModel): + user_id: str + comment: str + time: str + is_approve: bool -@request_friend_event.handle() -async def _request_friend_event(bot, event: FriendRequestEvent) -> None: - file_name = "request_friend.json" - path = ESSENTIAL_DIR / file_name - path.parent.mkdir(exist_ok=True, parents=True) - try: - data = json.loads(path.read_bytes()) - except: - data = dict() - data[event.flag] = {"user_id": event.user_id, "comment": event.comment} - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - except WriteError: - raise WriteError("Writing file failed!") +__doc__ = """ +对bot基础/必须请求进行处理 +""" - for superuser in BotSelfConfig.superusers: - msg = ( - "主人,收到一条好友请求:\n" - f"请求人:{event.get_user_id()}\n" - f"申请信息:{event.comment}\n" - f"申请码:{event.flag}" - ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) +class Essential(Service): + + def __init__(self): + Service.__init__(self, "基础部件", __doc__) -# 处理:邀请入群,如身为管理,还附有入群请求 -request_group_event = sv.on_request() +friend_add_event = Essential().on_request("好友添加") -@request_group_event.handle() -async def _request_group_event(bot, event: GroupRequestEvent) -> None: - file_name = "request_group.json" +@friend_add_event.handle() +async def _friend_add(bot: Bot, event: FriendRequestEvent): + """ + 存储文件结构: + { + "Apply code": { + "user_id": "User ID", + "comment": "Comment content" + "time": "Time", + "is_approve": bool # Default: False + } + } + """ + file_name = "friend_add.json" path = ESSENTIAL_DIR / file_name - path.parent.mkdir(exist_ok=True, parents=True) - - try: - data = json.loads(path.read_bytes()) - except: + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) data = dict() - data[event.flag] = { - "user_id": event.user_id, - "group_id": event.group_id, - "sub_type": event.sub_type, - "comment": event.comment, + + apply_code = event.flag + apply_comment = event.comment + user_id = event.get_user_id() + now_time = datetime.now() + + data = json.loads(path.read_bytes()) + data[apply_code] = FriendRequestInfo( + user_id=user_id, + comment=apply_comment, + time=now_time, + is_approve=False + ) + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + + repo = ( + "咱收到一条好友请求...\n" + f"请求人:{user_id}\n" + f"申请信息:{apply_comment}\n" + f"申请码:{apply_code}\n" + "Tip:好友申请 帮助" + ) + for superuser in BotSelfConfig.superusers: + await bot.send_private_msg(user_id=superuser, message=repo) + + +group_invite_event = Essential().on_request("邀请入群") + +@group_invite_event.handle() +async def _group_invite(bot: Bot, event: GroupRequestEvent): + """ + 存储文件结构: + { + "Apply code": { + "user_id": "User ID", + "comment": "Comment content" + "time": "Time", + "is_approve": bool # Default: False + } } - try: - with open(path, "w", encoding="utf-8") as r: - r.write(json.dumps(data, indent=4)) - except WriteError: - raise WriteError("Writing file failed!") - + """ + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + apply_code = event.flag + apply_comment = event.comment + user_id = event.get_user_id() + now_time = datetime.now() + + data = json.loads(path.read_bytes()) + data[apply_code] = GroupRequestInfo( + user_id=user_id, + comment=apply_comment, + time=now_time, + is_approve=False + ) + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data.dict(), indent=4)) + + repo = ( + "咱收到一条群聊邀请请求...\n" + f"请求人:{user_id}\n" + f"申请信息:{apply_comment}\n" + f"申请码:{apply_code}\n" + "Tip:群聊邀请 帮助" + ) for superuser in BotSelfConfig.superusers: - msg = ( - "主人,收到一条入群请求:\n" - f"请求人:{event.get_user_id()}\n" - f"申请信息:{event.comment}\n" - f"申请码:{event.flag}" - ) - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) - + await bot.send_private_msg(user_id=superuser, message=repo) -# 处理群成员变动 -group_member_event = sv.on_notice() +group_member_event = Essential().on_notice("群成员变动") @group_member_event.handle() -async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None: - msg = "好欸!事新人!\n" f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!" +async def _group_member_join(bot: Bot, event: GroupIncreaseNoticeEvent): + await asyncio.sleep(randint(1, 6)) + msg = ( + "好欸!事新人!\n" + f"在下 {choice(list(BotSelfConfig.nickname))} 哒!w!" + ) await group_member_event.finish(msg) - @group_member_event.handle() -async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: - if event.is_tome(): - if event.user_id != event.self_id: - return - msg = "呜呜呜,主人" f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." - for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) - else: - await group_member_event.finish("阿!有人离开了我们...") +async def _group_member_left(bot: Bot, event: GroupDecreaseNoticeEvent): + await asyncio.sleep(randint(1, 6)) + await group_member_event.finish("呜——有人跑了...") -# 处理群管理事件 -group_admin_event = sv.on_notice() - +group_admin_event = Essential().on_notice("群管理变动") @group_admin_event.handle() -async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: +async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent): if not event.is_tome(): return for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg( + await bot.send_private_msg( user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" ) -# 处理群禁言事件 -group_ban_event = sv.on_notice() - +group_ban_event = Essential().on_notice("群禁言变动") @group_ban_event.handle() -async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: +async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent): if not event.is_tome(): return @@ -271,70 +249,63 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: f"时长...是 {event.duration} 秒" ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) else: msg = "好欸!主人\n" f"咱在群 {event.group_id} 的口球被 {event.operator_id} 解除了!" for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) -# 处理群红包运气王事件 -lucky_read_bag_event = sv.on_notice() - - -@lucky_read_bag_event.handle() -async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: - msg = "8行,这可忍?" f"gkd [CQ:at,qq={event.user_id}] 发一个!" - await lucky_read_bag_event.finish(Message(msg)) - - -# 处理群文件上传事件 -group_file_upload_event = sv.on_notice() - - -@group_file_upload_event.handle() -async def _group_file_upload_event(bot, event: GroupUploadNoticeEvent) -> None: - await group_file_upload_event.finish("让我康康传了啥好东西") - - -# 处理撤回事件 -recall_event = sv.on_notice() - +recall_event = Essential().on_notice("撤回事件") @recall_event.handle() -async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None: +async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent): + if event.is_tome(): + return + try: repo = await bot.get_msg(message_id=event.message_id) - except: + except BaseException: return + user = event.user_id group = event.group_id repo = str(repo["message"]) - check = await coolq_code_check(repo, group=group) + check = CoolqCodeChecker(repo).check if not check: repo = repo.replace("CQ", "QC") - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[群:{event.group_id}]\n" "撤回了\n" f"{repo}" - + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{user}@[群:{group}]\n" + "撤回了\n" + f"{repo}" + ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) @recall_event.handle() -async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: +async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent): + if event.is_tome(): + return + try: repo = await bot.get_msg(message_id=event.message_id) - except: + except BaseException: return user = event.user_id repo = str(repo["message"]) - check = await coolq_code_check(repo, user) + check = CoolqCodeChecker(repo).check if not check: repo = repo.replace("CQ", "QC") - msg = "主人,咱拿到了一条撤回信息!\n" f"{event.user_id}@[私聊]" "撤回了\n" f"{repo}" - - await bot.send(event, "咱看到惹~!") + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{user}@[私聊]" + "撤回了\n" + f"{repo}" + ) for superuser in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=int(superuser), message=msg) + await bot.send_private_msg(user_id=int(superuser), message=msg) diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py deleted file mode 100644 index a2092e3..0000000 --- a/ATRI/plugins/funny.py +++ /dev/null @@ -1,148 +0,0 @@ -import json -import re -import asyncio -from pathlib import Path -from random import choice, randint - -from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment - -from ATRI.service import Service as sv -from ATRI.utils.limit import is_too_exciting -from ATRI.rule import is_in_service -from ATRI.utils.request import post_bytes -from ATRI.utils.translate import to_simple_string -from ATRI.exceptions import RequestError - - -__doc__ = """ -看不懂的笑话 -权限组:所有人 -用法: - 来句笑话 -""" - -get_laugh = sv.on_command(cmd="来句笑话", docs=__doc__, rule=is_in_service("来句笑话")) - - -@get_laugh.handle() -async def _get_laugh(bot: Bot, event: MessageEvent) -> None: - user_name = event.sender.nickname - laugh_list = [] - - FILE = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt" - with open(FILE, "r", encoding="utf-8") as r: - for line in r: - laugh_list.append(line.strip("\n")) - - result = choice(laugh_list) - await get_laugh.finish(result.replace("%name", user_name)) - - -me_to_you = sv.on_message(priority=5) - - -@me_to_you.handle() -async def _me_to_you(bot: Bot, event: MessageEvent) -> None: - if randint(0, 15) == 5: - msg = str(event.message) - if "我" in msg and "CQ" not in msg: - await me_to_you.finish(msg.replace("我", "你")) - - -__doc__ = """ -伪造转发 -权限组:所有人 -用法: - /fakemsg qq*name*msg... -补充: - qq: QQ号 - name: 消息中的ID - msg: 对应信息 -示例: - /fakemsg 123456789*生草人*草 114514*仙贝*臭死了 -""" - -fake_msg = sv.on_command(cmd="/fakemsg", docs=__doc__, rule=is_in_service("fakemsg")) - - -@fake_msg.handle() -async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None: - msg = str(event.message).split(" ") - user = event.user_id - group = event.group_id - node = list() - check = is_too_exciting(user, 1, seconds=600) - - if check: - for i in msg: - args = i.split("*") - qq = args[0] - name = args[1].replace("[", "[") - name = name.replace("]", "]") - repo = args[2].replace("[", "[") - repo = repo.replace("]", "]") - dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}} - node.append(dic) - await bot.send_group_forward_msg(group_id=group, messages=node) - - -EAT_URL = "https://wtf.hiigara.net/api/run/{}" - -eat_wat = sv.on_regex(r"[今|明|后|大后]天(.*?)吃什么", rule=is_in_service("今天吃什么")) - - -@eat_wat.handle() -async def _eat(bot: Bot, event: MessageEvent) -> None: - msg = str(event.raw_message).strip() - msg = re.search(r"大?[今|明|后]天(.*?)吃什么", msg).group() - user = event.user_id - user_n = event.sender.nickname - arg = re.findall(r"大?[今|明|后]天(.*?)吃什么", msg)[0] - nd = re.match(r"大?[今|明|后]天", msg)[0] - - if arg == "中午": - a = f"LdS4K6/{randint(0, 999999)}" - url = EAT_URL.format(a) - params = {"event": "ManualRun"} - try: - data = json.loads(await post_bytes(url, params)) - except RequestError: - raise RequestError("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", nd) - get_a = re.search(r"非常(.*?)的", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace(get_a, "") - - elif arg == "晚上": - a = f"KaTMS/{randint(0, 999999)}" - url = EAT_URL.format(a) - params = {"event": "ManualRun"} - try: - data = json.loads(await post_bytes(url, params)) - except RequestError: - raise RequestError("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", "") - result = f"> {MessageSegment.at(user)}\n" + text - - else: - rd = randint(1, 10) - if rd == 5: - result = "吃我吧 ❤" - else: - a = f"JJr1hJ/{randint(0, 999999)}" - url = EAT_URL.format(a) - params = {"event": "ManualRun"} - try: - data = json.loads(await post_bytes(url, params)) - except RequestError: - raise RequestError("Request failed!") - - text = to_simple_string(data["text"]).replace("今天", nd) - get_a = re.match(r"(.*?)的智商", text)[0] - result = f"> {MessageSegment.at(user)}\n" + text.replace( - get_a, f"{user_n}的智商" - ) - - await eat_wat.finish(Message(result)) diff --git a/ATRI/plugins/funny/__init__.py b/ATRI/plugins/funny/__init__.py new file mode 100644 index 0000000..de98381 --- /dev/null +++ b/ATRI/plugins/funny/__init__.py @@ -0,0 +1,86 @@ +from random import choice, randint + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent +from nonebot.adapters.cqhttp.message import Message + +from ATRI.utils.limit import FreqLimiter, DailyLimiter +from.data_source import Funny + + +get_laugh = Funny().on_command("来句笑话", "隐晦的笑话...") + +@get_laugh.handle() +async def _get_laugh(bot: Bot, event: MessageEvent): + user_name = event.sender.nickname or "该裙友" + await get_laugh.finish(Funny().idk_laugh(user_name)) + + +me_re_you = Funny().on_regex(r"我", "我也不懂咋解释", block=False) + +@me_re_you.handle() +async def _me_re_you(bot: Bot, event: MessageEvent): + if randint(0, 15) == 5: + msg = str(event.get_message()) + content, is_ok = Funny().me_re_you(msg) + if is_ok: + await me_re_you.finish(content) + + +fake_msg = Funny().on_command("/fakemsg", "伪造假转发内容,格式:qq-name-content\n可构造多条,使用空格隔开,仅限群聊") + +_fake_daliy_max = DailyLimiter(3) +_fake_max_notice = "不能继续下去了!明早再来" +_fake_flmt = FreqLimiter(60) +_fake_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) + +@fake_msg.args_parser # type: ignore +async def _perp_fake(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await fake_msg.finish("好吧...") + if not msg: + await fake_msg.reject("内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开") + else: + state["content"] = msg + +@fake_msg.handle() +async def _ready_fake(bot: Bot, event: GroupMessageEvent, state: T_State): + user_id = event.get_user_id() + if not _fake_daliy_max.check(user_id): + await fake_msg.finish(_fake_max_notice) + if not _fake_flmt.check(user_id): + await fake_msg.finish(_fake_flmt_notice) + + msg = str(event.message).strip() + if msg: + state["content"] = msg + +@fake_msg.got("content", "内容呢?格式:qq-name-content\n可构造多条,以上仅为一条,使用空格隔开") +async def _deal_fake(bot: Bot, event: GroupMessageEvent, state: T_State): + content = state["content"] + group_id = event.group_id + user_id = event.get_user_id() + node = Funny().fake_msg(content) + await bot.send_group_forward_msg(group_id=group_id, messages=node) + + _fake_flmt.start_cd(user_id) + _fake_daliy_max.increase(user_id) + + +eat_what = Funny().on_regex(r"大?[今明后]天(.*?)吃[什啥]么?", "我来决定你吃什么!") + +_eat_flmt = FreqLimiter(15) + +@eat_what.handle() +async def _eat_what(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _eat_flmt.check(user_id): + return + + msg = str(event.get_message()) + user_name = event.sender.nickname or "裙友" + eat = await Funny().eat_what(user_name, msg) + _eat_flmt.start_cd(user_id) + await eat_what.finish(Message(eat)) diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py new file mode 100644 index 0000000..a27cdb5 --- /dev/null +++ b/ATRI/plugins/funny/data_source.py @@ -0,0 +1,113 @@ +import re + +from pathlib import Path +from random import choice, randint +from nonebot.adapters.cqhttp.utils import unescape +from nonebot.adapters.cqhttp.message import MessageSegment + +from ATRI.service import Service +from ATRI.exceptions import RequestError +from ATRI.utils import request, Translate +from ATRI.rule import is_in_service + + +__doc__ = """ +乐1乐,莫当真 +""" + + +class Funny(Service): + + def __init__(self): + Service.__init__(self, "乐", __doc__, rule=is_in_service("乐")) + + @staticmethod + def idk_laugh(name: str) -> str: + laugh_list = list() + + file_path = Path(".") / "ATRI" / "data" / "database" / "funny" / "laugh.txt" + with open(file_path, encoding="utf-8") as r: + for line in r: + laugh_list.append(line.strip("\n")) + + rd: str = choice(laugh_list) + result = rd.replace("%name", name) + return result + + @staticmethod + def me_re_you(msg: str) -> tuple: + if "我" in msg and "[CQ" not in msg: + return msg.replace("我", "你"), True + else: + return msg, False + + @staticmethod + def fake_msg(text: str) -> list: + arg = text.split(" ") + node = list() + + for i in arg: + args = i.split("-") + qq = args[0] + name = unescape(args[1]) + repo = unescape(args[2]) + dic = {"type": "node", "data": {"name": name, "uin": qq, "content": repo}} + node.append(dic) + return node + + @staticmethod + async def eat_what(name: str, msg: str) -> str: + EAT_URL = "https://wtf.hiigara.net/api/run/" + params = {"event": "ManualRun"} + pattern_0 = r"大?[今明后]天(.*?)吃[什啥]么?" + pattern_1 = r"(今|明|后|大后)天" + arg = re.findall(pattern_0, msg)[0] + day = re.match(pattern_1, msg).group(0) # type: ignore + + if arg == "中午": + a = f"LdS4K6/{randint(0, 1145141919810)}" + url = EAT_URL + a + try: + data = await request.post(url, params=params) + data = await data.json() + except RequestError: + raise RequestError("Request failed!") + + text = Translate(data["text"]).to_simple().replace("今天", day) + get_a = re.search(r"非常(.*?)的", text).group(0) # type: ignore + result = text.replace(get_a, "") + + elif arg == "晚上": + a = f"KaTMS/{randint(0, 1145141919810)}" + url = EAT_URL + a + try: + data = await request.post(url, params=params) + data = await data.json() + except RequestError: + raise RequestError("Request failed!") + + result = Translate(data["text"]).to_simple().replace("今天", day) + + else: + rd = randint(1, 10) + if rd == 5: + result = [ + "吃我吧 ❤", + "(脸红)请...请享用咱吧......", + "都可以哦~不能挑食呢~" + ] + return choice(result) + else: + a = f"JJr1hJ/{randint(0, 1145141919810)}" + url = EAT_URL + a + try: + data = await request.post(url, params=params) + data = await data.json() + except RequestError: + raise RequestError("Request failed!") + + text = Translate(data["text"]).to_simple().replace("今天", day) + get_a = re.match(r"(.*?)的智商", text).group(0) # type: ignore + result = text.replace(get_a, f"{name}的智商") + + return result
\ No newline at end of file diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py deleted file mode 100644 index c8c8f2b..0000000 --- a/ATRI/plugins/github.py +++ /dev/null @@ -1,43 +0,0 @@ -import re -import json -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.utils.request import get_bytes -from ATRI.exceptions import RequestError - - -URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}" - - -github_issues = sv.on_message() - - -@github_issues.handle() -async def _github_issues(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message) - patt = r"https://github.com/(.*)/(.*)/issues/(.*)" - need_info = re.findall(patt, msg) - if not need_info: - return - - for i in need_info: - need_info = list(i) - owner = need_info[0] - repo = need_info[1] - issue_number = need_info[2] - url = URL.format(owner=owner, repo=repo, issue_number=issue_number) - - try: - data = await get_bytes(url) - except RequestError: - return - - data = json.loads(data) - msg0 = ( - f"{repo}: #{issue_number} {data['state']}\n" - f"comments: {data['comments']}\n" - f"update: {data['updated_at']}\n" - f"{data['body']}" - ) - await github_issues.finish(msg0) diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py deleted file mode 100644 index d2754b1..0000000 --- a/ATRI/plugins/help.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -import json - -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import SERVICE_DIR -from ATRI.service import Service as sv - - -SERVICE_DIR = SERVICE_DIR / "services" - - -__doc__ = """ -查询命令用法 -权限组:所有人 -用法: - /help - /help list - /help info (cmd) -""" - -help = sv.on_command(cmd="/help", docs=__doc__) - - -async def _help(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - if msg[0] == "": - msg = ( - "呀?找不到路了?\n" - "/help list 查看可用命令列表\n" - "/help info (cmd) 查看命令具体帮助\n" - "项目地址:github.com/Kyomotoi/ATRI\n" - "咱只能帮你这么多了qwq" - ) - await help.finish(msg) - elif msg[0] == "list": - files = [] - for _, _, i in os.walk(SERVICE_DIR): - for a in i: - f = SERVICE_DIR / a - files.append(json.loads(f.read_bytes())["command"]) - cmds = " | ".join(map(str, files)) - msg = "咱能做很多事!比如:\n" + cmds - msg0 = msg + "\n没反应可能是没权限...或者为探测类型...不属于可直接触发命令..." - await help.finish(msg0) - elif msg[0] == "info": - cmd = msg[1] - data = {} - path = SERVICE_DIR / f"{cmd.replace('/', '')}.json" - try: - data = json.loads(path.read_bytes()) - except: - await help.finish("未找到相关命令...") - - msg = f"{cmd} INFO:\n" f"Enabled: {data['enabled']}\n" f"{data['docs']}" - await help.finish(msg) - else: - await help.finish("请检查输入...") diff --git a/ATRI/plugins/help/__init__.py b/ATRI/plugins/help/__init__.py new file mode 100644 index 0000000..1d9af19 --- /dev/null +++ b/ATRI/plugins/help/__init__.py @@ -0,0 +1,48 @@ +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.rule import to_bot +from .data_source import Helper + + +main_help = Helper().on_command("菜单", "获取食用bot的方法", rule=to_bot(), aliases={"/help", "menu"}) + +@main_help.handle() +async def _main_help(bot: Bot, event: MessageEvent): + repo = Helper().menu() + await main_help.finish(repo) + + +about_me = Helper().on_command("关于", "获取关于bot的信息", rule=to_bot(), aliases={"about"}) + +@about_me.handle() +async def _about_me(bot: Bot, event: MessageEvent): + repo = Helper().about() + await about_me.finish(repo) + + +service_list = Helper().on_command("服务列表", "查看所有可用服务", rule=to_bot(), aliases={"功能列表"}) + +@service_list.handle() +async def _service_list(bot: Bot, event: MessageEvent): + repo = Helper().service_list() + await service_list.finish(repo) + + +service_info = Helper().on_command("帮助", "获取服务详细帮助") + +@service_info.handle() +async def _ready_service_info(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).split(" ") + service = msg[0] + try: + cmd = msg[1] + except BaseException: + cmd = str() + + if not cmd: + repo = Helper().service_info(service) + await service_info.finish(repo) + + repo = Helper().cmd_info(service, cmd) + await service_info.finish(repo) diff --git a/ATRI/plugins/help/data_source.py b/ATRI/plugins/help/data_source.py new file mode 100644 index 0000000..930f4bb --- /dev/null +++ b/ATRI/plugins/help/data_source.py @@ -0,0 +1,112 @@ +import os + +from ATRI import __version__ +from ATRI.service import Service, SERVICES_DIR, ServiceTools +from ATRI.config import BotSelfConfig +from ATRI.exceptions import ReadFileError + + +SERVICE_INFO_FORMAT = """ +服务名:{service} +说明:{docs} +可用命令:\n{cmd_list} +是否全局启用:{enabled} +Tip: 帮助 [服务] [命令] 以查看对应命令详细信息 +""".strip() + + +COMMAND_INFO_FORMAT = """ +命令:{cmd} +类型:{cmd_type} +说明:{docs} +更多触发方式:{aliases} +""".strip() + + +class Helper(Service): + + def __init__(self): + Service.__init__(self, "帮助", "bot的食用指南~") + + @staticmethod + def menu() -> str: + return ( + "哦呀?~需要帮助?\n" + "关于 -查看bot基本信息\n" + "服务列表 -以查看所有可用服务\n" + "帮助 [服务] -以查看对应服务帮助\n" + "Tip: 均需要at触发。菜单 打开此页面" + ) + + @staticmethod + def about() -> str: + temp_list = list() + for i in BotSelfConfig.nickname: + temp_list.append(i) + nickname = "、".join(map(str, temp_list)) + return ( + "唔...是来认识咱的么\n" + f"可以称呼咱:{nickname}\n" + f"咱的型号是:{__version__}\n" + "想进一步了解:\n" + "https://github.com/Kyomotoi/ATRI" + ) + + @staticmethod + def service_list() -> str: + files = os.listdir(SERVICES_DIR) + temp_list = list() + for i in files: + service = i.replace(".json", "") + temp_list.append(service) + + msg0 = "咱搭载了以下服务~\n" + services = " | ".join(map(str, temp_list)) + msg0 = msg0 + services + repo = msg0 + "\n帮助 [服务] -以查看对应服务帮助" + return repo + + @staticmethod + def service_info(service: str) -> str: + try: + data = ServiceTools().load_service(service) + except ReadFileError: + return "请检查是否输入错误呢..." + + service_name = data.get("service", "error") + service_docs = data.get("docs", "error") + service_enabled = data.get("enabled", True) + + _service_cmd_list = list(data.get("cmd_list", {"error"})) + service_cmd_list = "\n".join(map(str, _service_cmd_list)) + + repo = SERVICE_INFO_FORMAT.format( + service=service_name, + docs=service_docs, + cmd_list=service_cmd_list, + enabled=service_enabled + ) + return repo + + @staticmethod + def cmd_info(service: str, cmd: str) -> str: + try: + data = ServiceTools().load_service(service) + except ReadFileError: + return "请检查是否输入错误..." + + cmd_list: dict = data["cmd_list"] + cmd_info = cmd_list.get(cmd, dict()) + if not cmd_info: + return "请检查命令是否输入错误..." + cmd_type = cmd_info.get("type", "ignore") + docs = cmd_info.get("docs", "ignore") + aliases = cmd_info.get("aliases", "ignore") + + repo = COMMAND_INFO_FORMAT.format( + cmd=cmd, + cmd_type=cmd_type, + docs=docs, + aliases=aliases + ) + return repo diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py deleted file mode 100644 index 19c96f6..0000000 --- a/ATRI/plugins/hitokoto.py +++ /dev/null @@ -1,51 +0,0 @@ -import json -from random import choice, randint -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.rule import is_in_service, to_bot -from ATRI.service import Service as sv -from ATRI.exceptions import RequestError -from ATRI.utils.list import count_list, del_list_aim -from ATRI.utils.request import get_bytes - -URL = [ - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/a.json", - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/b.json", - "https://cdn.jsdelivr.net/gh/hitokoto-osc/[email protected]/sentences/c.json", -] -sick_list = [] - - -__doc__ = """ -抑郁一下 -权限组:所有人 -用法: - @ 一言 -""" - -hitokoto = sv.on_command( - cmd="一言", aliases={"抑郁一下", "网抑云"}, docs=__doc__, rule=is_in_service("一言") & to_bot() -) - - -async def _hitokoto(bot: Bot, event: MessageEvent) -> None: - global sick_list - msg = str(event.message) - user = event.get_user_id() - - if count_list(sick_list, user) == 3: - sick_list.append(user) - await hitokoto.finish("额......需要咱安慰一下嘛~?") - elif count_list(sick_list, user) == 6: - sick_list = del_list_aim(sick_list, user) - msg = "如果心里感到难受就赶快去睡觉!别再憋自己了!\n" "我...我会守在你身边的!...嗯..一定" - await hitokoto.finish(msg) - else: - sick_list.append(user) - url = choice(URL) - try: - data = json.loads(await get_bytes(url)) - except RequestError: - raise RequestError("Request failed!") - await hitokoto.finish(data[randint(1, len(data) - 1)]["hitokoto"]) diff --git a/ATRI/plugins/manage/__init__.py b/ATRI/plugins/manage/__init__.py deleted file mode 100644 index b316020..0000000 --- a/ATRI/plugins/manage/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -import nonebot -from pathlib import Path - - -_sub_plugins = set() - -_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve())) diff --git a/ATRI/plugins/manage/modules/block.py b/ATRI/plugins/manage/modules/block.py deleted file mode 100644 index b439ba1..0000000 --- a/ATRI/plugins/manage/modules/block.py +++ /dev/null @@ -1,153 +0,0 @@ -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv - - -__doc__ = """ -封禁用户 -权限组:维护者 -用法: - 封禁用户 QQ号 -""" - -block_user = sv.on_command(cmd="封禁用户", docs=__doc__, permission=SUPERUSER) - - -@block_user.args_parser # type: ignore -async def _block_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await block_user.finish("好吧...") - if not msg: - await block_user.reject("是谁呢?!GKD!") - else: - state["noob"] = msg - - -@block_user.handle() -async def _block_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["noob"] = msg - - -@block_user.got("noob", prompt="是谁呢?!GKD!") -async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob = state["noob"] - sv.BlockSystem.control_list(True, user=noob) - msg = f"用户[{noob}]已被封禁(;′⌒`)" - await block_user.finish(msg) - - -__doc__ = """ -解封用户 -权限组:维护者 -用法: - 解封用户 QQ号 -""" - -unblock_user = sv.on_command(cmd="解封用户", docs=__doc__, permission=SUPERUSER) - - -@unblock_user.args_parser # type: ignore -async def _unblock_user_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await unblock_user.finish("好吧...") - if not msg: - await unblock_user.reject("要原谅谁呢...") - else: - state["forgive"] = msg - - -@unblock_user.handle() -async def _unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["forgive"] = msg - - -@unblock_user.got("forgive", prompt="要原谅谁呢...") -async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State) -> None: - forgive = state["forgive"] - sv.BlockSystem.control_list(False, user=forgive) - msg = f"用户[{forgive}]已被解封ヾ(´・ω・`)ノ" - await unblock_user.finish(msg) - - -__doc__ = """ -封禁群 -权限组:维护者 -用法: - 封禁群 群号 -""" - -block_group = sv.on_command(cmd="封禁群", docs=__doc__, permission=SUPERUSER) - - -@block_group.args_parser # type: ignore -async def _block_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await block_user.finish("好吧...") - if not msg: - await block_user.reject("是哪个群?!GKD!") - else: - state["noob_g"] = msg - - -@block_group.handle() -async def _block_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["noob_g"] = msg - - -@block_group.got("noob_g", prompt="是哪个群?!GKD!") -async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - noob_g = state["noob_g"] - sv.BlockSystem.control_list(True, group=noob_g) - msg = f"群[{noob_g}]已被封禁(;′⌒`)" - await block_user.finish(msg) - - -__doc__ = """ -解封群 -权限组:维护者 -用法: - 解封 群号 -""" - -unblock_group = sv.on_command(cmd="解封群", docs=__doc__, permission=SUPERUSER) - - -@unblock_group.args_parser # type: ignore -async def _unblock_group_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await block_user.finish("好吧...") - if not msg: - await block_user.reject("要原谅哪个群呢...") - else: - state["forgive_g"] = msg - - -@unblock_group.handle() -async def _unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["forgive_g"] = msg - - -@unblock_group.got("forgive_g", prompt="要原谅哪个群呢...") -async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State) -> None: - forgive_g = state["forgive_g"] - sv.BlockSystem.control_list(False, group=forgive_g) - msg = f"群[{forgive_g}]已被解封ヾ(´・ω・`)ノ" - await unblock_user.finish(msg) diff --git a/ATRI/plugins/manage/modules/broadcast.py b/ATRI/plugins/manage/modules/broadcast.py deleted file mode 100644 index 7f7816d..0000000 --- a/ATRI/plugins/manage/modules/broadcast.py +++ /dev/null @@ -1,66 +0,0 @@ -import asyncio -from random import randint - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv - - -__doc__ = """ -群发内容 -权限组:维护者 -用法: - 广播 内容 -""" - -broadcast = sv.on_command(cmd="广播", docs=__doc__, permission=SUPERUSER) - - [email protected]_parser # type: ignore -async def _broadcast_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await broadcast.finish("好吧...") - if not msg: - await broadcast.reject("想群发啥呢0w0") - else: - state["msg"] = msg - - -async def _broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["msg"] = msg - - [email protected]("msg", prompt="请告诉咱需要群发的内容~!") -async def _deal_broadcast(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["msg"] - group_list = await bot.get_group_list() - succ_list = [] - err_list = [] - - for group in group_list: - await asyncio.sleep(randint(0, 2)) - try: - await bot.send_group_msg(group_id=group["group_id"], message=msg) - except BaseException: - err_list.append(group["group_id"]) - - msg0 = "" - for i in err_list: - msg0 += f" {i}\n" - - repo_msg = ( - f"推送消息:\n{msg}\n" - "————————\n" - f"总共:{len(group_list)}\n" - f"成功推送:{len(succ_list)}\n" - f"失败[{len(err_list)}]个:\n" - ) + msg0 - - await broadcast.finish(repo_msg) diff --git a/ATRI/plugins/manage/modules/debug.py b/ATRI/plugins/manage/modules/debug.py deleted file mode 100644 index fa21c5b..0000000 --- a/ATRI/plugins/manage/modules/debug.py +++ /dev/null @@ -1,135 +0,0 @@ -from aiohttp import FormData - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.log import LOGGER_DIR, NOW_TIME -from ATRI.utils.file import open_file -from ATRI.utils.ub_paste import paste -from ATRI.exceptions import load_error - - -level_list = ["info", "warning", "error", "debug"] - - -__doc__ = """ -获取控制台信息 -权限组:维护者 -用法: - 获取log 等级 行数 -示例: - 获取log info -20(最新20行) -""" - -get_console = sv.on_command( - cmd="获取log", - aliases={"获取LOG", "获取控制台", "获取控制台信息"}, - docs=__doc__, - permission=SUPERUSER, -) - - -@get_console.handle() -async def _get_console(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).split(" ") - if msg: - state["level"] = msg[0] - try: - state["line"] = msg[1] - except Exception: - pass - - -@get_console.got("level", prompt="需要获取的等级是?") -async def _got(bot: Bot, event: MessageEvent, state: T_State) -> None: - quit_list = ["算了", "罢了", "不了"] - if state["level"] in quit_list: - await get_console.finish("好吧...") - - -@get_console.got("line", prompt="需要获取的行数是?") -async def _deal_get(bot: Bot, event: MessageEvent, state: T_State) -> None: - level = state["level"] - line = state["line"] - repo = str() - - path = LOGGER_DIR / f"{level}" / f"{NOW_TIME}.log" - logs = await open_file(path, "readlines") - - try: - content = logs[int(line) :] # type: ignore - repo = "\n".join(content).replace("[36mATRI[0m", "ATRI") - except IndexError: - await get_console.finish(f"行数错误...max: {len(logs)}") # type: ignore - - data = FormData() - data.add_field("poster", "ATRI running log") - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", repo) - - msg0 = f"> {event.sender.nickname}\n" - msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" - await track_error.finish(msg0) - - -__doc__ = """ -追踪错误 -权限组:维护者 -用法: - track 追踪ID -""" - -track_error = sv.on_command( - cmd="track", aliases={"追踪"}, docs=__doc__, permission=SUPERUSER -) - - -@track_error.args_parser # type: ignore -async def _track_error_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await track_error.finish("好吧...") - if not msg: - await track_error.reject("欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") - else: - state["track"] = msg - - -@track_error.handle() -async def _track_error(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["track"] = msg - - -@track_error.got("track", prompt="欸?!要开始debug了吗,请提供追踪ID...Ծ‸Ծ") -async def _deal_track(bot: Bot, event: MessageEvent, state: T_State) -> None: - track_id = state["track"] - data = dict() - - try: - data = load_error(track_id) - except BaseException: - await track_error.finish("未发现对应ID呢...(⇀‸↼‶)") - - msg = ( - f"ID: [{track_id}]\n" - f"Time: [{data['time']}]\n" - f"Prompt: [{data['prompt']}]\n" - "——————\n" - f"{data['content']}" - ) - - data = FormData() - data.add_field("poster", track_id) - data.add_field("syntax", "text") - data.add_field("expiration", "day") - data.add_field("content", msg) - - msg0 = f"> {event.sender.nickname}\n" - msg0 = msg0 + f"详细请移步此处~\n{await paste(data)}" - await track_error.finish(msg0) diff --git a/ATRI/plugins/manage/modules/dormant.py b/ATRI/plugins/manage/modules/dormant.py deleted file mode 100644 index cd72d47..0000000 --- a/ATRI/plugins/manage/modules/dormant.py +++ /dev/null @@ -1,43 +0,0 @@ -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.rule import to_bot - - -__doc__ = """ -休眠,不处理任何信息 -权限组:维护者 -用法: - @ 休眠 -""" - -dormant_enabled = sv.on_command( - cmd="休眠", docs=__doc__, rule=to_bot(), permission=SUPERUSER -) - - -@dormant_enabled.handle() -async def _dormant_enabled(bot: Bot, event: MessageEvent) -> None: - sv.Dormant.control_dormant(True) - msg = "已进入休眠状态...期间咱不会回应任何人的消息哦..." - await dormant_enabled.finish(msg) - - -__doc__ = """ -苏醒,开始处理信息 -权限组:维护者 -用法: - @ 苏醒 -""" - -dormant_disabled = sv.on_command( - cmd="苏醒", docs=__doc__, rule=to_bot(), permission=SUPERUSER -) - - -@dormant_disabled.handle() -async def _dormant_disabled(bot: Bot, event: MessageEvent) -> None: - sv.Dormant.control_dormant(False) - msg = "唔...早上好...——哇哈哈" - await dormant_disabled.finish(msg) diff --git a/ATRI/plugins/manage/modules/request.py b/ATRI/plugins/manage/modules/request.py deleted file mode 100644 index 26100b1..0000000 --- a/ATRI/plugins/manage/modules/request.py +++ /dev/null @@ -1,114 +0,0 @@ -import re -import json -from pathlib import Path - -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.exceptions import ReadFileError, FormatError - - -ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" - - -__doc__ = """ -查看好友/群申请列表 -权限组:维护者 -用法: - 查看申请列表 -""" - -req_list = sv.on_command(cmd="申请列表", docs=__doc__, permission=SUPERUSER) - - -@req_list.handle() -async def _req_list(bot: Bot, event: MessageEvent) -> None: - path_f = ESSENTIAL_DIR / "request_friend.json" - path_g = ESSENTIAL_DIR / "request_group.json" - data_f, data_g = dict() - try: - data_f = json.loads(path_f.read_bytes()) - except ReadFileError: - msg_f = "[读取文件失败]" - try: - data_g = json.loads(path_g.read_bytes()) - except ReadFileError: - msg_g = "[读取文件失败]" - - msg_f = str() - for i in data_f.keys(): - msg_f += f"{i} | {data_f[i]['user_id']} | {data_f[i]['comment']}\n" - - msg_g = str() - for i in data_g.keys(): - msg_g += f"{i} | {data_g[i]['sub_type']} | {data_g[i]['user_id']} | {data_g[i]['comment']}\n" - - msg = "好友/群申请列表如下:\n" "· 好友:\n" f"{msg_f}" "· 群:\n" f"{msg_g}" - await req_list.finish(msg) - - -req_deal = sv.on_regex(r"(同意|拒绝)(好友|群)申请", permission=SUPERUSER) - - -@req_deal.handle() -async def _req_deal(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - arg = re.findall(r"(同意|拒绝)(好友|群)申请", msg[0])[0] - app = arg[0] - _type = arg[1] - - if not msg[1]: - await req_deal.finish(f"正确用法!速看\n{app}{_type}申请 (reqid)") - - reqid = msg[1] - if app == "同意": - if _type == "好友": - try: - await bot.set_friend_add_request(flag=reqid, approve=True) - await req_deal.finish("完成~!已同意申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - elif _type == "群": - path = ESSENTIAL_DIR / "request_group.json" - data_g = dict() - try: - data_g = json.loads(path.read_bytes()) - except FileExistsError: - await req_deal.finish("读取群数据失败,可能并没有请求...") - - try: - await bot.set_group_add_request( - flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=True - ) - await req_deal.finish("完成~!已同意申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - else: - await req_deal.finish("请检查输入的值是否正确——!") - elif app == "拒绝": - if _type == "好友": - try: - await bot.set_friend_add_request(flag=reqid, approve=False) - await req_deal.finish("完成~!已拒绝申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - elif _type == "群": - path = ESSENTIAL_DIR / "request_group.json" - data_g = dict() - try: - data_g = json.loads(path.read_bytes()) - except FileExistsError: - await req_deal.finish("读取群数据失败,可能并没有请求...") - - try: - await bot.set_group_add_request( - flag=reqid, sub_type=data_g[reqid]["sub_type"], approve=False - ) - await req_deal.finish("完成~!已拒绝申请") - except FormatError: - await req_deal.finish("请检查输入的值是否正确——!") - else: - await req_deal.finish("请检查输入的值是否正确——!") - else: - await req_deal.finish("请检查输入的值是否正确——!") diff --git a/ATRI/plugins/manage/modules/service.py b/ATRI/plugins/manage/modules/service.py deleted file mode 100644 index a3624df..0000000 --- a/ATRI/plugins/manage/modules/service.py +++ /dev/null @@ -1,179 +0,0 @@ -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp.permission import GROUP_ADMIN, GROUP_OWNER -from nonebot.adapters.cqhttp import ( - Bot, - MessageEvent, - GroupMessageEvent, - PrivateMessageEvent, -) - -from ATRI.service import Service as sv - - -__doc__ = """ -启用功能,针对单群 -权限组:维护者,群管理 -用法: - 启用 目标命令 -示例: - 启用 以图搜图 -""" - -cur_service_ena = sv.on_command( - cmd="启用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER -) - - -@cur_service_ena.args_parser # type: ignore -async def _cur_ena_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await cur_service_ena.finish("好吧...") - if not msg: - await cur_service_ena.reject("请告诉咱目标命令!") - else: - state["service_e"] = msg - - -@cur_service_ena.handle() -async def _cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_e"] = msg - - -@cur_service_ena.got("service_e", prompt="请告诉咱目标命令!") -async def _deal_cur_ena(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - cmd = state["service_e"] - group = str(event.group_id) - sv.control_service(cmd, False, True, group=group) - await cur_service_ena.finish(f"成功!本群已启用:{cmd}") - - -@cur_service_ena.handle() -async def _refuse_cur_ena(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: - await cur_service_ena.finish("只能在群聊中决定哦...") - - -__doc__ = """ -禁用功能,针对单群 -权限组:维护者,群管理 -用法: - 禁用 目标命令 -示例: - 禁用 以图搜图 -""" - -cur_service_dis = sv.on_command( - cmd="禁用功能", docs=__doc__, permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER -) - - -@cur_service_dis.args_parser # type: ignore -async def _cur_dis_load(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await cur_service_dis.finish("好吧...") - if not msg: - await cur_service_dis.reject("请告诉咱目标命令!") - else: - state["service_d"] = msg - - -@cur_service_dis.handle() -async def _cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_d"] = msg - - -@cur_service_dis.got("service_d", prompt="请告诉咱目标命令!") -async def _deal_cur_dis(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - cmd = state["service_d"] - group = str(event.group_id) - sv.control_service(cmd, False, False, group=group) - await cur_service_dis.finish(f"成功!本群已禁用:{cmd}") - - -@cur_service_dis.handle() -async def _refuse_cur_dis(bot: Bot, event: PrivateMessageEvent, state: T_State) -> None: - await cur_service_dis.finish("只能在群聊中决定哦...") - - -__doc__ = """ -全局启用功能 -权限组:维护者 -用法: - 全局启用 目标命令 -示例: - 全局启用 以图搜图 -""" - -glo_service_ena = sv.on_command(cmd="全局启用", docs=__doc__, permission=SUPERUSER) - - -@glo_service_ena.args_parser # type: ignore -async def _glo_ena_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await glo_service_ena.finish("好吧...") - if not msg: - await glo_service_ena.reject("请告诉咱目标命令!") - else: - state["service_e_g"] = msg - - -@glo_service_ena.handle() -async def _glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_e_g"] = msg - - -@glo_service_ena.got("service_e_g", prompt="请告诉咱目标命令!") -async def _deal_glo_ena(bot: Bot, event: MessageEvent, state: T_State) -> None: - cmd = state["service_e_g"] - sv.control_service(cmd, True, True) - await glo_service_ena.finish(f"成功!已全局启用:{cmd}") - - -__doc__ = """ -全局禁用功能 -权限组:维护者 -用法: - 全局禁用 目标命令 -示例: - 全局禁用 以图搜图 -""" - -glo_service_dis = sv.on_command(cmd="全局禁用", docs=__doc__, permission=SUPERUSER) - - -@glo_service_dis.args_parser # type: ignore -async def _glo_dis_load(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await glo_service_dis.finish("好吧...") - if not msg: - await glo_service_dis.reject("请告诉咱目标命令!") - else: - state["service_d_g"] = msg - - -@glo_service_dis.handle() -async def _glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["service_d_g"] = msg - - -@glo_service_dis.got("service_d_g", prompt="请告诉咱目标命令!") -async def _deal_glo_dis(bot: Bot, event: MessageEvent, state: T_State) -> None: - cmd = state["service_d_g"] - sv.control_service(cmd, True, False) - await glo_service_dis.finish(f"成功!已全局禁用:{cmd}") diff --git a/ATRI/plugins/manage/modules/shutdown.py b/ATRI/plugins/manage/modules/shutdown.py deleted file mode 100644 index 78f23e8..0000000 --- a/ATRI/plugins/manage/modules/shutdown.py +++ /dev/null @@ -1,32 +0,0 @@ -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv - - -__doc__ = """ -紧急停机 -权限组:维护者 -用法: - @ 关机 -""" - -shutdown = sv.on_command(cmd="关机", docs=__doc__, permission=SUPERUSER) - - -async def _shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["msg"] = msg - - [email protected]("msg", prompt="[WARNING]此项操作将强行终止bot运行,是否继续(y/n)") -async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: - t = ["y", "Y", "是"] - if state["msg"] in t: - await bot.send(event, "咱还会醒来的,一定") - exit(0) - else: - await shutdown.finish("再考虑下吧 ;w;") diff --git a/ATRI/plugins/manege/__init__.py b/ATRI/plugins/manege/__init__.py new file mode 100644 index 0000000..d0337e9 --- /dev/null +++ b/ATRI/plugins/manege/__init__.py @@ -0,0 +1,451 @@ +import re + +from nonebot.typing import T_State +from nonebot.permission import SUPERUSER +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent +from nonebot.adapters.cqhttp.permission import GROUP_OWNER, GROUP_ADMIN + +from .data_source import Manege + + +# 求1个pr把这里优化,写得我想吐了 + + +block_user = Manege().on_command("封禁用户", "对目标用户进行封禁", permission=SUPERUSER) + +@block_user.args_parser # type: ignore +async def _get_block_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await block_user.finish("...看来有人逃过一劫呢") + if not msg: + await block_user.reject("哪位?GKD!") + else: + state["block_user"] = msg + +@block_user.handle() +async def _ready_block_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["block_user"] = msg + +@block_user.got("block_user", "哪位?GKD!") +async def _deal_block_user(bot: Bot, event: MessageEvent, state: T_State): + user_id = state["block_user"] + is_ok = Manege().block_user(user_id) + if not is_ok: + await block_user.finish("kuso!封禁失败了...") + + await block_user.finish(f"用户 {user_id} 危!") + + +unblock_user = Manege().on_command("解封用户", "对目标用户进行解封", permission=SUPERUSER) + +@unblock_user.args_parser # type: ignore +async def _get_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await unblock_user.finish("...有人又得继续在小黑屋呆一阵子了") + if not msg: + await unblock_user.reject("哪位?GKD!") + else: + state["unblock_user"] = msg + +@unblock_user.handle() +async def _ready_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["unblock_user"] = msg + +@unblock_user.got("unblock_user", "哪位?GKD!") +async def _deal_unblock_user(bot: Bot, event: MessageEvent, state: T_State): + user_id = state["unblock_user"] + is_ok = Manege().unblock_user(user_id) + if not is_ok: + await unblock_user.finish("kuso!解封失败了...") + + await unblock_user.finish(f"好欸!{user_id} 重获新生!") + + +block_group = Manege().on_command("封禁群", "对目标群进行封禁", permission=SUPERUSER) + +@block_group.args_parser # type: ignore +async def _get_block_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await block_group.finish("...看来有一群逃过一劫呢") + if not msg: + await block_group.reject("哪个群?GKD!") + else: + state["block_group"] = msg + +@block_group.handle() +async def _ready_block_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["block_group"] = msg + +@block_group.got("block_group", "哪个群?GKD!") +async def _deal_block_group(bot: Bot, event: MessageEvent, state: T_State): + group_id = state["block_group"] + is_ok = Manege().block_group(group_id) + if not is_ok: + await block_group.finish("kuso!封禁失败了...") + + await block_group.finish(f"群 {group_id} 危!") + + +unblock_group = Manege().on_command("解封群", "对目标群进行解封", permission=SUPERUSER) + +@unblock_group.args_parser # type: ignore +async def _get_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await unblock_group.finish("...有一群又得继续在小黑屋呆一阵子了") + if not msg: + await unblock_group.reject("哪个群?GKD!") + else: + state["unblock_group"] = msg + +@unblock_group.handle() +async def _ready_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["unblock_group"] = msg + +@unblock_group.got("unblock_group", "哪个群?GKD!") +async def _deal_unblock_group(bot: Bot, event: MessageEvent, state: T_State): + group_id = state["unblock_group"] + is_ok = Manege().unblock_group(group_id) + if not is_ok: + await unblock_group.finish("kuso!解封失败了...") + + await unblock_group.finish(f"好欸!群 {group_id} 重获新生!") + + +global_block_service = Manege().on_command("全局禁用", "全局禁用某服务", permission=SUPERUSER) + +@global_block_service.args_parser # type: ignore +async def _get_global_block_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await global_block_service.finish("好吧...") + if not msg: + await global_block_service.reject("阿...是哪个服务呢") + else: + state["global_block_service"] = msg + +@global_block_service.handle() +async def _ready_block_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["global_block_service"] = msg + +@global_block_service.got("global_block_service", "阿...是哪个服务呢") +async def _deal_global_block_service(bot: Bot, event: MessageEvent, state: T_State): + block_service = state["global_block_service"] + is_ok = Manege().control_global_service(block_service, False) + if not is_ok: + await global_block_service.finish("kuso!禁用失败了...") + + await global_block_service.finish(f"服务 {block_service} 已被禁用") + + +global_unblock_service = Manege().on_command("全局启用", "全局启用某服务", permission=SUPERUSER) + +@global_unblock_service.args_parser # type: ignore +async def _get_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await global_unblock_service.finish("好吧...") + if not msg: + await global_unblock_service.reject("阿...是哪个服务呢") + else: + state["global_unblock_service"] = msg + +@global_unblock_service.handle() +async def _ready_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["global_unblock_service"] = msg + +@global_unblock_service.got("global_unblock_service", "阿...是哪个服务呢") +async def _deal_global_unblock_service(bot: Bot, event: MessageEvent, state: T_State): + block_service = state["global_unblock_service"] + is_ok = Manege().control_global_service(block_service, True) + if not is_ok: + await global_unblock_service.finish("kuso!启用服务失败了...") + + await global_unblock_service.finish(f"服务 {block_service} 已启用") + + +user_block_service = Manege().on_regex(r"对用户(.*?)禁用(.*)", "针对某一用户禁用服务", permission=SUPERUSER) + +@user_block_service.handle() +async def _user_block_service(bot: Bot, event: MessageEvent): + msg = str(event.message).strip() + pattern = r"对用户(.*?)禁用(.*)" + reg = re.findall(pattern, msg) + aim_user = reg[0] + aim_service = reg[1] + + is_ok = Manege().control_user_service(aim_service, aim_user, False) + if not is_ok: + await user_block_service.finish("禁用失败...请检查服务名是否正确") + await user_block_service.finish(f"完成~已禁止用户 {aim_user} 使用 {aim_service}") + + + +user_unblock_service = Manege().on_regex(r"对用户(.*?)启用(.*)", "针对某一用户启用服务", permission=SUPERUSER) + +@user_unblock_service.handle() +async def _user_unblock_service(bot: Bot, event: MessageEvent): + msg = str(event.message).strip() + pattern = r"对用户(.*?)启用(.*)" + reg = re.findall(pattern, msg) + aim_user = reg[0] + aim_service = reg[1] + + is_ok = Manege().control_user_service(aim_service, aim_user, True) + if not is_ok: + await user_unblock_service.finish("启用失败...请检查服务名是否正确,或者此人并不存在于名单中") + await user_unblock_service.finish(f"完成~已允许用户 {aim_user} 使用 {aim_service}") + + +group_block_service = Manege().on_command("禁用", "针对所在群禁用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) + +@group_block_service.args_parser # type: ignore +async def _get_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await group_block_service.finish("好吧...") + if not msg: + await group_block_service.reject("阿...是哪个服务呢") + else: + state["group_block_service"] = msg + +@group_block_service.handle() +async def _ready_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["group_block_service"] = msg + +@group_block_service.got("group_block_service", "阿...是哪个服务呢") +async def _deal_group_block_service(bot: Bot, event: GroupMessageEvent, state: T_State): + aim_service = state["group_block_service"] + group_id = str(event.group_id) + + is_ok = Manege().control_group_service(aim_service, group_id, False) + if not is_ok: + await group_block_service.finish("禁用失败...请检查服务名是否输入正确") + await group_block_service.finish(f"完成!~已禁止本群使用服务:{aim_service}") + + +group_unblock_service = Manege().on_command("启用", "针对所在群启用某服务", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN) + +@group_unblock_service.args_parser # type: ignore +async def _get_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await group_unblock_service.finish("好吧...") + if not msg: + await group_unblock_service.reject("阿...是哪个服务呢") + else: + state["group_unblock_service"] = msg + +@group_unblock_service.handle() +async def _ready_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["group_unblock_service"] = msg + +@group_unblock_service.got("group_unblock_service", "阿...是哪个服务呢") +async def _deal_group_unblock_service(bot: Bot, event: GroupMessageEvent, state: T_State): + aim_service = state["group_unblock_service"] + group_id = str(event.group_id) + + is_ok = Manege().control_group_service(aim_service, group_id, True) + if not is_ok: + await group_unblock_service.finish("启用失败...请检查服务名是否输入正确,或群不存在于名单中") + await group_unblock_service.finish(f"完成!~已允许本群使用服务:{aim_service}") + + +get_friend_add_list = Manege().on_command("获取好友申请", "获取好友申请列表", permission=SUPERUSER) + +@get_friend_add_list.handle() +async def _get_friend_add_list(bot: Bot, event: MessageEvent): + data = Manege().load_friend_apply_list() + temp_list = list() + for i in data: + apply_code = i + apply_user = data[i]["user_id"] + apply_comment = data[i]["comment"] + temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" + temp_list.append(temp_msg) + + msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) + msg1 = msg0 + "\nTip: 使用 同意/拒绝好友 [申请码] 以决定" + await get_friend_add_list.finish(msg1) + + +approve_friend_add = Manege().on_command("同意好友", "同意好友申请", permission=SUPERUSER) + +@approve_friend_add.args_parser # type: ignore +async def _get_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await approve_friend_add.finish("好吧...") + if not msg: + await approve_friend_add.reject("申请码GKD!") + else: + state["approve_friend_add"] = msg + +@approve_friend_add.handle() +async def _ready_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["approve_friend_add"] + +@approve_friend_add.got("approve_friend_add", "申请码GKD!") +async def _deal_approve_friend_add(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["approve_friend_add"] + try: + await bot.set_friend_add_request(flag=apply_code, approve=True) + except BaseException: + await approve_friend_add.finish("同意失败...尝试下手动?") + data = Manege().load_friend_apply_list() + data.pop(apply_code) + Manege().save_friend_apply_list(data) + await approve_friend_add.finish("好欸!申请已通过!") + + +refuse_friend_add = Manege().on_command("拒绝好友", "拒绝好友申请", permission=SUPERUSER) + +@refuse_friend_add.args_parser # type: ignore +async def _get_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await refuse_friend_add.finish("好吧...") + if not msg: + await refuse_friend_add.reject("申请码GKD!") + else: + state["refuse_friend_add"] = msg + +@refuse_friend_add.handle() +async def _ready_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["refuse_friend_add"] + +@refuse_friend_add.got("refuse_friend_add", "申请码GKD!") +async def _deal_refuse_friend_add(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["refuse_friend_add"] + try: + await bot.set_friend_add_request(flag=apply_code, approve=False) + except BaseException: + await refuse_friend_add.finish("拒绝失败...尝试下手动?") + data = Manege().load_friend_apply_list() + data.pop(apply_code) + Manege().save_friend_apply_list(data) + await refuse_friend_add.finish("已拒绝!") + + +get_group_invite_list = Manege().on_command("获取邀请列表", "获取群邀请列表", permission=SUPERUSER) + +@get_group_invite_list.handle() +async def _get_group_invite_list(bot: Bot, event: MessageEvent): + data = Manege().load_invite_apply_list() + temp_list = list() + for i in data: + apply_code = i + apply_user = data[i]["user_id"] + apply_comment = data[i]["comment"] + temp_msg = f"{apply_user} | {apply_comment} | {apply_code}" + temp_list.append(temp_msg) + + msg0 = "申请人ID | 申请信息 | 申请码\n" + "\n".join(map(str, temp_list)) + msg1 = msg0 + "\nTip: 使用 同意/拒绝邀请 [申请码] 以决定" + await get_friend_add_list.finish(msg1) + + +approve_group_invite = Manege().on_command("同意邀请", "同意群聊邀请", permission=SUPERUSER) + +@approve_group_invite.args_parser # type: ignore +async def _get_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await approve_group_invite.finish("好吧...") + if not msg: + await approve_group_invite.reject("申请码GKD!") + else: + state["approve_group_invite"] = msg + +@approve_group_invite.handle() +async def _ready_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["approve_group_invite"] + +@approve_group_invite.got("approve_group_invite", "申请码GKD!") +async def _deal_approve_group_invite(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["approve_group_invite"] + try: + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=True) + except BaseException: + await approve_group_invite.finish("同意失败...尝试下手动?") + data = Manege().load_invite_apply_list() + data.pop(apply_code) + Manege().save_invite_apply_list(data) + await approve_group_invite.finish("好欸!申请已通过!") + + +refuse_group_invite = Manege().on_command("拒绝邀请", "拒绝群聊邀请", permission=SUPERUSER) + +@refuse_group_invite.args_parser # type: ignore +async def _get_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await refuse_group_invite.finish("好吧...") + if not msg: + await refuse_group_invite.reject("申请码GKD!") + else: + state["refuse_group_invite"] = msg + +@refuse_group_invite.handle() +async def _ready_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["refuse_group_invite"] + +@refuse_group_invite.got("refuse_group_invite", "申请码GKD!") +async def _deal_refuse_group_invite(bot: Bot, event: MessageEvent, state: T_State): + apply_code = state["refuse_group_invite"] + try: + await bot.set_group_add_request(flag=apply_code, sub_type="invite", approve=False) + except BaseException: + await refuse_group_invite.finish("拒绝失败...尝试下手动?") + data = Manege().load_invite_apply_list() + data.pop(apply_code) + Manege().save_invite_apply_list(data) + await refuse_group_invite.finish("已拒绝!") + + +track_error = Manege().on_command("追踪", "获取报错信息,传入追踪码", aliases={"/track"}) + +@track_error.handle() +async def _track_error(bot: Bot, event: MessageEvent): + track_id = str(event.message).strip() + repo = await Manege().track_error(track_id) + await track_error.finish(repo) diff --git a/ATRI/plugins/manege/data_source.py b/ATRI/plugins/manege/data_source.py new file mode 100644 index 0000000..933f6c6 --- /dev/null +++ b/ATRI/plugins/manege/data_source.py @@ -0,0 +1,281 @@ +import os +import json +from pathlib import Path +from aiohttp import FormData +from datetime import datetime + +from ATRI.service import Service, ServiceTools +from ATRI.utils import UbuntuPaste +from ATRI.exceptions import ReadFileError, load_error + + +MANEGE_DIR = Path(".") / "ATRI" / "data"/ "database" / "manege" +ESSENTIAL_DIR = Path(".") / "ATRI" / "data" / "database" / "essential" +os.makedirs(MANEGE_DIR, exist_ok=True) +os.makedirs(ESSENTIAL_DIR, exist_ok=True) + + +TRACK_BACK_FORMAT = """ +Track ID:{track_id} +Prompt: {prompt} +Time: {time} +{content} +""".strip() + + +__doc__ = """ +控制bot的各项服务 +""" + + +class Manege(Service): + + def __init__(self): + Service.__init__(self, "管理", __doc__, True) + + @staticmethod + def _load_block_user_list() -> dict: + """ + 文件结构: + { + "Block user ID": { + "time": "Block time" + } + } + """ + file_name = "block_user.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + return data + + + @staticmethod + def _save_block_user_list(data: dict) -> None: + file_name = "block_user.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + def _load_block_group_list() -> dict: + """ + 文件结构: + { + "Block group ID": { + "time": "Block time" + } + } + """ + file_name = "block_group.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + data = json.loads(path.read_bytes()) + return data + + @staticmethod + def _save_block_group_list(data: dict) -> None: + file_name = "block_group.json" + path = MANEGE_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = dict() + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @classmethod + def block_user(cls, user_id: str) -> bool: + data = cls._load_block_user_list() + now_time = datetime.now() + data[user_id] = { + "time": now_time + } + try: + cls._save_block_user_list(data) + return True + except BaseException: + return False + + @classmethod + def unblock_user(cls, user_id: str) -> bool: + data: dict = cls._load_block_user_list() + if user_id not in data: + return False + + try: + data.pop(user_id) + cls._save_block_user_list(data) + return True + except BaseException: + return False + + @classmethod + def block_group(cls, group_id: str) -> bool: + data = cls._load_block_group_list() + now_time = datetime.now() + data[group_id] = { + "time": now_time + } + try: + cls._save_block_group_list(data) + return True + except BaseException: + return False + + @classmethod + def unblock_group(cls, group_id: str) -> bool: + data: dict = cls._load_block_group_list() + if group_id not in data: + return False + + try: + data.pop(group_id) + cls._save_block_group_list(data) + return True + except BaseException: + return False + + @staticmethod + def control_global_service(service: str, is_enabled: bool) -> bool: + """ + Only SUPERUSER. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + data["enabled"] = is_enabled + ServiceTools().save_service(data, service) + return True + + @staticmethod + def control_user_service(service: str, user_id: str, is_enabled: bool) -> bool: + """ + Only SUPERUSER. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + temp_list: list = data.get("disable_user", list()) + + if is_enabled: + try: + temp_list.remove(user_id) + except BaseException: + return False + else: + temp_list.append(user_id) + data["disable_user"] = temp_list + ServiceTools().save_service(data, service) + return True + + @staticmethod + def control_group_service(service: str, group_id: str, is_enabled: bool) -> bool: + """ + SUPERUSER and GROUPADMIN or GROUPOWNER. + Only current group. + """ + try: + data = ServiceTools().load_service(service) + except BaseException: + return False + temp_list: list = data.get("disable_group", list()) + + if is_enabled: + try: + temp_list.remove(group_id) + except BaseException: + return False + else: + temp_list.append(group_id) + data["disable_group"] = temp_list + ServiceTools().save_service(data, service) + return True + + @staticmethod + def load_friend_apply_list() -> dict: + file_name = "friend_add.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + + data = json.loads(path.read_bytes()) + return data + + @staticmethod + def save_friend_apply_list(data: dict) -> None: + file_name = "friend_add.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + def load_invite_apply_list() -> dict: + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return dict() + + data = json.loads(path.read_bytes()) + return data + + @staticmethod + def save_invite_apply_list(data: dict) -> None: + file_name = "group_invite.json" + path = ESSENTIAL_DIR / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + + @staticmethod + async def track_error(track_id: str) -> str: + try: + data = load_error(track_id) + except ReadFileError: + return "请检查ID是否正确..." + + prompt = data.get("prompt", "ignore") + time = data.get("time", "ignore") + content = data.get("content", "ignore") + + msg0 = TRACK_BACK_FORMAT.format( + track_id=track_id, + prompt=prompt, + time=time, + content=content + ) + f_data = FormData() + f_data.add_field("poster", "ATRI running log") + f_data.add_field("syntax", "text") + f_data.add_field("expiration", "day") + f_data.add_field("content", msg0) + + repo = f"详细请移步此处~\n{await UbuntuPaste(data).paste()}" + return repo
\ No newline at end of file diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py deleted file mode 100644 index 692ac48..0000000 --- a/ATRI/plugins/nsfw.py +++ /dev/null @@ -1,116 +0,0 @@ -import re -import json - -from nonebot.adapters.cqhttp import Bot, GroupMessageEvent -from nonebot.typing import T_State - -from ATRI.log import logger as log -from ATRI.config import BotSelfConfig, NsfwCheck -from ATRI.service import Service as sv -from ATRI.exceptions import RequestError -from ATRI.rule import is_in_service -from ATRI.utils.request import get_bytes -from ATRI.utils.cqcode import coolq_code_check - - -nsfw_url = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url=" - - -nsfw_checking = sv.on_message() - - -@nsfw_checking.handle() -async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None: - if NsfwCheck.enabled: - msg = str(event.message) - user = event.user_id - group = event.group_id - check = await coolq_code_check(msg, user, group) - - if check: - if "image" not in msg: - return - - url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0] - try: - data = json.loads(await get_bytes(url)) - except: - log.warning("检测涩图失败,请查阅文档以获取帮助") - return - if round(data["score"], 4) * 100 >= NsfwCheck.passing_rate: - score = "{:.2%}".format(round(data["score"], 4)) - log.debug(f"截获涩图,得分:{score}") - for sup in BotSelfConfig.superusers: - await bot.send_private_msg( - user_id=sup, message=f"{msg}\n涩值: {score}" - ) - await bot.send(event, f"好涩哦!涩值:{score}\n不行了咱要发给主人看!") - else: - pass - - -__doc__ = """ -检测你图片的涩值 -权限组:所有人 -用法: - /nsfw (pic) -补充: - pic: 图片 -示例: - /nsfw 然后Bot会向你索取图片 -""" - -nsfw_reading = sv.on_command(cmd="/nsfw", docs=__doc__, rule=is_in_service("nsfw")) - - -@nsfw_reading.args_parser # type: ignore -async def _nsfw(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = str(event.message) - quit_list = ["算了", "罢了", "不搜了"] - if msg in quit_list: - await nsfw_reading.finish("好吧") - - if not msg: - await nsfw_reading.reject("图呢?") - else: - state["pic_nsfw"] = msg - - -@nsfw_reading.handle() -async def _nsfw_r(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - user = event.user_id - group = event.group_id - msg = str(event.message).strip() - check = await coolq_code_check(msg, user, group) - if check and msg: - state["pic_nsfw"] = msg - - -@nsfw_reading.got("pic_nsfw", prompt="图呢?") -async def _nsfw_reading(bot: Bot, event: GroupMessageEvent, state: T_State) -> None: - msg = state["pic_nsfw"] - pic = re.findall(r"url=(.*?)]", msg) - if not pic: - await nsfw_reading.reject("请发送图片而不是其它东西!!") - - url = nsfw_url + pic[0] - try: - data = json.loads(await get_bytes(url)) - except RequestError: - raise RequestError("Time out!") - - score = round(data["score"], 4) - result = "{:.2%}".format(round(data["score"], 4)) - if score >= 0.9: - level = "hso! 我要发给主人看!" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg( - user_id=sup, message=f"{state['pic_nsfw']}\n涩值: {result}" - ) - elif 0.9 > score >= 0.6: - level = "嗯,可冲" - else: - level = "?能不能换张55完全冲不起来" - - repo = f"涩值:{result}\n{level}" - await nsfw_reading.finish(repo) diff --git a/ATRI/plugins/repo.py b/ATRI/plugins/repo.py new file mode 100644 index 0000000..db68721 --- /dev/null +++ b/ATRI/plugins/repo.py @@ -0,0 +1,71 @@ +from random import choice + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.service import Service +from ATRI.config import BotSelfConfig +from ATRI.utils.limit import FreqLimiter, DailyLimiter + + +_repo_flmt = FreqLimiter(20) +_repo_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) +_repo_dlmt = DailyLimiter(5) +_repo_dlmt_notice = "阿!不能再喝了,再喝就晕过去了!" + + +REPO_FORMAT = """ +来自用户{user}反馈: +{msg} +""" + + +class Repo(Service): + + def __init__(self): + Service.__init__(self, "反馈", "向维护者发送消息") + + +repo = Repo().on_command("来杯红茶", "向维护者发送消息", aliases={"反馈", "报告"}) + [email protected]_parser # type: ignore +async def _get_repo(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了", "不搜了", "取消"] + if msg in quit_list: + await repo.finish("好吧...") + if not msg: + await repo.reject("需要反馈的内容呢?~") + else: + state["repo"] = msg + +async def _ready_repo(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _repo_flmt.check(user_id): + await repo.finish(_repo_flmt_notice) + if not _repo_dlmt.check(user_id): + await repo.finish(_repo_dlmt_notice) + + msg = str(event.message).strip() + if msg: + state["repo"] = msg + [email protected]("repo", "需要反馈的内容呢?~") +async def _deal_repo(bot: Bot, event: MessageEvent, state: T_State): + msg = state["repo"] + user_id = event.get_user_id() + repo_0 = REPO_FORMAT.format( + user=user_id, + msg=msg + ) + + for superuser in BotSelfConfig.superusers: + try: + await bot.send_private_msg(user_id=superuser, message=repo_0) + except BaseException: + await repo.finish("发送失败了呢...") + + _repo_flmt.start_cd(user_id) + _repo_dlmt.increase(user_id) + await repo.finish("吾辈的心愿已由咱转告维护者!") diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py index 476eb88..87a2b02 100644 --- a/ATRI/plugins/rich/__init__.py +++ b/ATRI/plugins/rich/__init__.py @@ -1,70 +1,26 @@ -import re -import json -from random import choice -from aiohttp.client import ClientSession - from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import MessageSegment - -from ATRI.service import Service as sv -from ATRI.utils.request import get_bytes -from ATRI.utils.limit import is_too_exciting - -from .data_source import dec +from ATRI.utils.limit import FreqLimiter +from .data_source import Rich -temp_list = [] -img_url = [ - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/fkrich.png", - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg", -] +_rich_flmt = FreqLimiter(2) -bilibili_rich = sv.on_message() +bili_rich = Rich().on_message("小程序爪巴", block=False) -@bilibili_rich.handle() -async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: - global temp_list +@bili_rich.handle() +async def _fk_bili(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _rich_flmt.check(user_id): + return + + msg = str(event.message) try: - msg = str(event.raw_message).replace("\\", "") - bv = False - - if "qqdocurl" not in msg: - if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace("av", "") - else: - bv = re.findall(r"(BV\w+)", msg) - av = str(dec(bv[0])) - else: - patt = r"(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+" - bv_url = re.findall(patt, msg) - bv_url = bv_url[3] - async with ClientSession() as session: - async with session.get(url=bv_url) as r: - bv = re.findall(r"(BV\w+)", str(r.url)) - av = dec(bv[0]) - - if not bv: - if "av" in msg: - av = re.findall(r"(av\d+)", msg)[0].replace("av", "") - else: - return - - user = event.user_id - check = is_too_exciting(user, 1, 10) - if not check: - return - - URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}" - data = json.loads(await get_bytes(URL))["data"] - repo = ( - f"{data['bvid']} INFO:\n" - f"Title: {data['title']}\n" - f"Link: {data['short_link']}\n" - "にまねげぴのTencent rich!" - ) - await bot.send(event, MessageSegment.image(file=choice(img_url))) - await bilibili_rich.finish(repo) + result, is_ok = await Rich().fk_bili(msg) except BaseException: return + if not is_ok: + return + _rich_flmt.start_cd(user_id) + await bili_rich.finish(result) diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py index 59474ff..3277a58 100644 --- a/ATRI/plugins/rich/data_source.py +++ b/ATRI/plugins/rich/data_source.py @@ -1,5 +1,15 @@ +import re + +from ATRI.service import Service +from ATRI.utils import request +from ATRI.rule import is_in_service +from ATRI.exceptions import RequestError + + +URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid=" + table = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF" -tr = {} +tr = dict() for i in range(58): tr[table[i]] = i s = [11, 10, 3, 8, 4, 6] @@ -7,16 +17,90 @@ xor = 177451812 add = 8728348608 -def dec(x) -> int: - r = 0 - for i in range(6): - r += tr[x[s[i]]] * 58 ** i - return (r - add) ^ xor +__doc__ = """ +啥b腾讯小程序给👴爪巴 +目前只整了b站的 +""" -def enc(x) -> str: - x = (x ^ xor) + add - r = list("BV1 4 1 7 ") - for i in range(6): - r[s[i]] = table[x // 58 ** i % 58] - return "".join(r) +class Rich(Service): + + def __init__(self): + Service.__init__(self, "小程序处理", __doc__, rule=is_in_service("小程序处理")) + + @staticmethod + def _bv_dec(x) -> str: + r = 0 + for i in range(6): + r += tr[x[s[i]]] * 58 ** i + result = "av" + str((r - add) ^ xor) + return result + + @staticmethod + def _bv_enc(x) -> str: + x = (x ^ xor) + add + r = list("BV1 4 1 7 ") + for i in range(6): + r[s[i]] = table[x // 58 ** i % 58] + return "".join(r) + + @classmethod + async def fk_bili(cls, text: str) -> tuple: + """ + 为何本函数这么多 try,因为此函数被用于监听所有信息 + 如果真出现错误,就会一直刷屏 + """ + msg = text.replace("\\", "") + bv = False + + if "qqdocurl" not in msg: + if "av" in msg: + av = re.findall(r"(av\d+)", msg) + if not av: + return "Get value (av) failed!", False + av = av[0].replace("av", "") + else: + bv = re.findall(r"([Bb][Vv]\w+)", msg) + if not bv: + return "Get value (bv) failed!", False + av = str(cls._bv_dec(bv[0])).replace("av", "") + else: + pattern = r"(?:(?:https?):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+" + bv_url = re.findall(pattern, msg) + if not bv_url: + return "Get value (bv url) failed!", False + bv_url = bv_url[3] + + try: + res = await request.get(bv_url) + except RequestError: + return "Request failed!", False + bv = re.findall(r"(BV\w+)", str(res.url)) + if not bv: + return "Get value (bv) failed!", False + av = cls._bv_dec(bv[0]) + + if not bv: + if "av" in msg: + av = re.findall(r"(av\d+)", msg) + if not av: + return "Get value (av) failed!", False + av = av[0].replace("av", "") + else: + return "Not found av", False + + url = URL + av + try: + res = await request.get(url) + except RequestError: + return "Request failed!", False + res_data = await res.json() + data = res_data["data"] + + result = ( + f"{data['bvid']} INFO:\n" + f"Title: {data['title']}\n" + f"Link: {data['short_link']}" + ) + return result, True +
\ No newline at end of file diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index 6361ade..9e52d66 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -1,80 +1,51 @@ -import re -import json +from re import findall from random import choice +from nonebot.typing import T_State from nonebot.adapters.cqhttp import Bot, MessageEvent from nonebot.adapters.cqhttp.message import Message, MessageSegment -from nonebot.typing import T_State from ATRI.config import SauceNAO -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.exceptions import RequestError - -from .data_source import SauceNao +from ATRI.utils.limit import FreqLimiter +from .data_source import SaouceNao -__doc__ = """ -以图搜图 -权限组:所有人 -用法: - 以图搜图 (pic) -""" +_search_flmt = FreqLimiter(5) +_search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) -saucenao = sv.on_command(cmd="以图搜图", docs=__doc__, rule=is_in_service("以图搜图")) +saucenao = SaouceNao().on_command("以图搜图", "透过一张图搜索可能的来源") @saucenao.args_parser # type: ignore -async def _load_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message) +async def _get_img(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() quit_list = ["算了", "罢了", "不搜了"] if msg in quit_list: await saucenao.finish("好吧...") - if not msg: await saucenao.reject("图呢?") else: - state["pic_sau"] = msg - + state["img"] = msg @saucenao.handle() -async def _sauce_nao(bot: Bot, event: MessageEvent, state: T_State) -> None: +async def _ready_search(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _search_flmt.check(user_id): + await saucenao.finish(_search_flmt_notice) + msg = str(event.message).strip() if msg: - state["pic_sau"] = msg + state["img"] = msg - [email protected]("pic_sau", prompt="图呢?") -async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["pic_sau"] - img = re.findall(r"url=(.*?)]", msg) [email protected]("img", "图呢?") +async def _deal_search(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + msg = state["img"] + img = findall(r"url=(.*?)]", msg) if not img: - await saucenao.finish("请发送图片而不是其他东西!!") - - try: - task = SauceNao(api_key=SauceNAO.key) - data = json.loads(await task.search(img[0])) - except RequestError: - raise RequestError("Request failed!") - - res = data["results"] - result = list() - for i in range(0, 3): - data = res[i] - - _result = dict() - _result["similarity"] = data["header"]["similarity"] - _result["index_name"] = data["header"]["index_name"] - _result["url"] = choice(data["data"].get("ext_urls", ["None"])) - result.append(_result) - - msg0 = f"> {MessageSegment.at(event.user_id)}" - for i in result: - msg0 = msg0 + ( - "\n——————————\n" - f"Similarity: {i['similarity']}\n" - f"Name: {i['index_name']}\n" - f"URL: {i['url'].replace('https://', '')}" - ) - - await saucenao.finish(Message(msg0)) + await saucenao.reject("请发送图片而不是其他东西!!") + + a = SaouceNao(SauceNAO.key) + result = f"> {MessageSegment.at(user_id)}" + await a.search(img[0]) + _search_flmt.start_cd(user_id) + await saucenao.finish(Message(result)) diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py index efe8fe1..d948f91 100644 --- a/ATRI/plugins/saucenao/data_source.py +++ b/ATRI/plugins/saucenao/data_source.py @@ -1,13 +1,25 @@ -from ATRI.utils.request import post_bytes +from random import choice +from aiohttp import FormData + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.exceptions import RequestError +from ATRI.utils import request, UbuntuPaste URL = "https://saucenao.com/search.php" -class SauceNao: - def __init__( - self, api_key: str, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5 - ) -> None: +__doc__ = """ +以图搜图,仅限二刺螈 +""" + + +class SaouceNao(Service): + + def __init__(self, api_key: str = None, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5): + Service.__init__(self, "以图搜图", __doc__, rule=is_in_service("以图搜图")) + params = dict() params["api_key"] = api_key params["output_type"] = output_type @@ -16,8 +28,48 @@ class SauceNao: params["db"] = db params["numres"] = numres self.params = params - - async def search(self, url: str): + + async def _request(self, url: str): self.params["url"] = url - res = await post_bytes(url=URL, params=self.params) - return res + + try: + res = await request.post(URL, params=self.params) + except RequestError: + raise RequestError("Request failed!") + data = await res.json() + return data + + async def search(self, url: str) -> str: + data = await self._request(url) + res = data["results"] + + result = list() + for i in range(len(res)): + data = res[i] + + _result = dict() + _result["similarity"] = data["header"]["similarity"] + _result["index_name"] = data["header"]["index_name"] + _result["url"] = choice(data["data"].get("ext_urls", ["None"])) + result.append(_result) + + msg0 = str() + for i in result: + msg0 += ( + "\n——————————\n" + f"Similarity: {i['similarity']}\n" + f"Name: {i['index_name']}\n" + f"URL: {i['url'].replace('https://', '')}" + ) + + if len(res) <= 3: + return msg0 + else: + data = FormData() + data.add_field("poster", "ATRI running log") + data.add_field("syntax", "text") + data.add_field("expiration", "day") + data.add_field("content", msg0) + + repo = f"\n详细请移步此处~\n{await UbuntuPaste(data).paste()}" + return repo diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py index b316020..7eb2c61 100644 --- a/ATRI/plugins/setu/__init__.py +++ b/ATRI/plugins/setu/__init__.py @@ -1,7 +1,82 @@ -import nonebot -from pathlib import Path +import re +import asyncio +from random import choice +from nonebot.adapters.cqhttp import Bot, MessageEvent, Message +from ATRI.utils.limit import FreqLimiter, DailyLimiter +from ATRI.utils.apscheduler import scheduler +from .data_source import Setu -_sub_plugins = set() -_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve())) +_setu_flmt = FreqLimiter(120) +_setu_dlmt = DailyLimiter(5) + + +random_setu = Setu().on_command("来张涩图", "来张随机涩图,冷却2分钟,每天限5张", aliases={"涩图来", "来点涩图", "来份涩图"}) + +@random_setu.handle() +async def _random_setu(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _setu_flmt.check(user_id): + await random_setu.finish() + if not _setu_dlmt.check(user_id): + await random_setu.finish() + + setu, title, p_id = await Setu().random_setu() + repo = ( + f"Title: {title}\n" + f"Pid: {p_id}" + ) + await bot.send(event, repo) + msg_1 = await bot.send(event, Message(setu)) + event_id = msg_1["message_id"] + _setu_flmt.start_cd(user_id) + _setu_dlmt.increase(user_id) + await asyncio.sleep(30) + await bot.delete_msg(message_id=event_id) + + +tag_setu = Setu().on_regex(r"来[张点丶份](.*?)的[涩色🐍]图", "根据提供的tag查找涩图") + +@tag_setu.handle() +async def _tag_setu(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _setu_flmt.check(user_id): + await random_setu.finish() + if not _setu_dlmt.check(user_id): + await random_setu.finish() + + msg = str(event.message).strip() + pattern = r"来[张点丶份](.*?)的[涩色🐍]图" + tag = re.findall(pattern, msg)[0] + setu, title, p_id, is_ok = await Setu().tag_setu(tag) + if not is_ok: + await tag_setu.finish(f"没有 {tag} 的涩图呢...") + repo_0 = ( + f"Title: {title}\n" + f"Pid: {p_id}" + ) + + await bot.send(event, repo_0) + msg_1 = await bot.send(event, Message(setu)) + event_id = msg_1["message_id"] + _setu_flmt.start_cd(user_id) + _setu_dlmt.increase(user_id) + await asyncio.sleep(30) + await bot.delete_msg(message_id=event_id) + + [email protected]_job("interval", hours=1, misfire_grace_time=60, args=[Bot]) +async def _scheduler_setu(bot): + try: + group_list = await bot.get_group_list() + lucky_group = choice(group_list) + group_id = lucky_group["group_id"] + setu = await Setu().scheduler() + msg_0 = await bot.send_group_msg(group_id=int(group_id), message=Message(setu)) + message_id = msg_0["message_id"] + await asyncio.sleep(60) + await bot.delete_msg(message_id=message_id) + + except BaseException: + pass diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py new file mode 100644 index 0000000..4070fd5 --- /dev/null +++ b/ATRI/plugins/setu/data_source.py @@ -0,0 +1,90 @@ +from random import choice +from nonebot.adapters.cqhttp import MessageSegment + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.utils import request + + +LOLICON_URL = "https://api.lolicon.app/setu/v2" +SETU_TEMP_FORMAT = "https://pixiv.cat/{p_id}.{ext}" # 为何要这样组,因为 i.pixiv.cat 不稳定! +SCHEDULER_FORMAT = """ +是{tag}哦~❤ +{setu} +""" + + +class Setu(Service): + + def __init__(self): + Service.__init__(self, "涩图", "hso!", rule=is_in_service("涩图")) + + @staticmethod + async def random_setu() -> tuple: + """ + 随机涩图. + """ + res = await request.get(LOLICON_URL) + data: dict = await res.json() + temp_data: dict = data.get("data", list())[0] + + title = temp_data.get("title", "木陰のねこ") + p_id = temp_data.get("pid", 88124144) + ext = temp_data.get("ext", "jpg") + url = SETU_TEMP_FORMAT.format( + p_id=p_id, + ext=ext + ) + setu = MessageSegment.image(url) + return setu, title, p_id + + @staticmethod + async def tag_setu(tag: str) -> tuple: + """ + 指定tag涩图. + """ + url = LOLICON_URL + f"?tag={tag}" + res = await request.get(url) + data: dict = await res.json() + + temp_data: dict = data.get("data", list())[0] + if not temp_data: + is_ok = False + is_ok = True + + title = temp_data.get("title", "木陰のねこ") + p_id = temp_data.get("pid", 88124144) + ext = temp_data.get("ext", "jpg") + url = SETU_TEMP_FORMAT.format( + p_id=p_id, + ext=ext + ) + setu = MessageSegment.image(url) + return setu, title, p_id, is_ok + + @staticmethod + async def scheduler() -> str: + """ + 每隔指定时间随机抽取一个群发送涩图. + 格式: + 是{tag}哦~❤ + {setu} + """ + res = await request.get(LOLICON_URL) + data: dict = await res.json() + temp_data: dict = data.get("data", list())[0] + + p_id = temp_data.get("pid", 88124144) + tag = choice(temp_data.get("tags", ["女孩子"])) + ext = temp_data.get("ext", "jpg") + + url = SETU_TEMP_FORMAT.format( + p_id=p_id, + ext=ext + ) + setu = MessageSegment.image(url) + repo = SCHEDULER_FORMAT.format( + tag=tag, + setu=setu + ) + return repo diff --git a/ATRI/plugins/setu/modules/data_source.py b/ATRI/plugins/setu/modules/data_source.py deleted file mode 100644 index bda7363..0000000 --- a/ATRI/plugins/setu/modules/data_source.py +++ /dev/null @@ -1,181 +0,0 @@ -import os -import json -import string -import aiosqlite -from aiosqlite.core import Connection -from pathlib import Path -from random import sample, choice -from aiohttp import ClientSession -from nonebot.adapters.cqhttp.message import MessageSegment, Message - -from ATRI.log import logger as log -from ATRI.config import NsfwCheck -from ATRI.exceptions import RequestError, WriteError -from ATRI.utils.request import get_bytes -from ATRI.utils.img import compress_image - - -TEMP_DIR: Path = Path(".") / "ATRI" / "data" / "temp" / "setu" -SETU_DIR = Path(".") / "ATRI" / "data" / "database" / "setu" -os.makedirs(TEMP_DIR, exist_ok=True) -os.makedirs(SETU_DIR, exist_ok=True) -NSFW_URL = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url=" -SIZE_REDUCE: bool = True - - -class Hso: - @staticmethod - async def nsfw_check(url: str) -> float: - url = NSFW_URL + url - try: - data = json.loads(await get_bytes(url)) - except RequestError: - raise RequestError("Request failed!") - return round(data["score"], 4) - - @staticmethod - async def _comp_setu(url: str) -> str: - temp_id = "".join(sample(string.ascii_letters + string.digits, 8)) - file = TEMP_DIR / f"{temp_id}.png" - - try: - async with ClientSession() as session: - async with session.get(url) as r: - data = await r.read() - except RequestError: - raise RequestError("Request img failed!") - - try: - with open(file, "wb") as r: - r.write(data) - except WriteError: - raise WriteError("Writing img failed!") - - return compress_image(os.path.abspath(file)) - - @classmethod - async def setu(cls, data: dict) -> str: - pid = data["pid"] - title = data["title"] - if SIZE_REDUCE: - img = MessageSegment.image( - "file:///" + await cls._comp_setu(data["url"]), proxy=False - ) - else: - img = MessageSegment.image(data["url"], proxy=False) - - msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}" - return msg - - @classmethod - async def acc_setu(cls, d: list) -> str: - data: dict = choice(d) - - for i in data["tags"]: - if i["name"] == "R-18": - return "太涩了不方便发w" - - pid = data["id"] - title = data["title"] - try: - pic = data["meta_single_page"]["original_image_url"].replace( - "pximg.net", "pixiv.cat" - ) - except Exception: - pic = choice(data["meta_pages"])["original"]["image_urls"].replace( - "pximg.net", "pixiv.cat" - ) - if SIZE_REDUCE: - img = MessageSegment.image( - "file:///" + await cls._comp_setu(pic), proxy=False - ) - else: - img = MessageSegment.image(pic, proxy=False) - - msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}" - return msg - - -class SetuData: - SETU_DATA = SETU_DIR / "setu.db" - - @classmethod - async def _check_database(cls) -> bool: - if not cls.SETU_DATA.exists(): - log.warning(f"未发现数据库\n-> {cls.SETU_DATA}\n将开始创建") - async with aiosqlite.connect(cls.SETU_DATA) as db: - cur = await db.cursor() - await cur.execute( - """ - CREATE TABLE setu( - pid PID, title TITLE, tags TAGS, - user_id USER_ID, user_name USER_NAME, - user_account USER_ACCOUNT, url URL, - UNIQUE( - pid, title, tags, user_id, - user_name, user_account, url - ) - ); - """ - ) - await db.commit() - log.warning(f"...创建数据库\n-> {cls.SETU_DATA}\n完成!") - return True - return True - - @classmethod - async def add_data(cls, d: dict) -> None: - data = ( - d["pid"], - d["title"], - d["tags"], - d["user_id"], - d["user_name"], - d["user_account"], - d["url"], - ) - - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - await db.execute( - """ - INSERT INTO setu( - pid, title, tags, user_id, - user_name, user_account, url - ) VALUES( - ?, ?, ?, ?, ?, ?, ? - ); - """, - data, - ) - await db.commit() - - @classmethod - async def del_data(cls, pid: int) -> None: - if not isinstance(pid, int): # 防注入 - raise ValueError("Please provide int.") - - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - await db.execute(f"DELETE FROM setu WHERE pid = {str(pid)};") - await db.commit() - - @classmethod - async def count(cls): - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - async with db.execute("SELECT * FROM setu") as cursor: - return len(await cursor.fetchall()) # type: ignore - - @classmethod - async def get_setu(cls): - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - async with db.execute( - "SELECT * FROM setu ORDER BY RANDOM() limit 1;" - ) as cursor: - return await cursor.fetchall() diff --git a/ATRI/plugins/setu/modules/main_setu.py b/ATRI/plugins/setu/modules/main_setu.py deleted file mode 100644 index a82ef83..0000000 --- a/ATRI/plugins/setu/modules/main_setu.py +++ /dev/null @@ -1,186 +0,0 @@ -import re -import json -from random import choice, random - -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.utils.request import get_bytes, post_bytes -from ATRI.utils.limit import is_too_exciting -from ATRI.config import Setu, BotSelfConfig -from ATRI.exceptions import RequestError - -from .data_source import Hso, SIZE_REDUCE, SetuData - - -LOLICON_URL: str = "https://api.lolicon.app/setu/" -PIXIV_URL: str = ( - "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word=" -) -R18_ENABLED: int = 0 -USE_LOCAL_DATA: bool = False -MIX_LOCAL_DATA: bool = False - - -setu = sv.on_regex( - r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]", rule=is_in_service("setu") -) - - -async def _setu(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 3, hours=1) - if not check: - return - - await bot.send(event, "别急,在找了!") - params = {"apikey": Setu.key, "r18": str(R18_ENABLED), "size1200": "true"} - try: - data = json.loads(await post_bytes(LOLICON_URL, params))["data"][0] - except RequestError: - raise RequestError("Request failed!") - - check = await Hso.nsfw_check(data["url"]) - score = "{:.2%}".format(check, 4) - - if not MIX_LOCAL_DATA: - if USE_LOCAL_DATA: - data = choice(await SetuData.get_setu()) # type: ignore - data = {"pid": data[0], "title": data[1], "url": data[6]} - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if check >= 0.9: - if random() <= 0.2: - repo = "我找到图了,但我发给主人了❤\n" f"涩值:{score}" - await bot.send(event, repo) - msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if random() <= 0.5: - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - data = choice(await SetuData.get_setu()) # type: ignore - data = {"pid": data[0], "title": data[1], "url": data[6]} - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - - -key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service("setu")) - - -@key_setu.handle() -async def _key_setu(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 10, hours=1) - if not check: - await setu.finish("休息一下吧❤") - - await bot.send(event, "别急,在找了!") - msg = str(event.message).strip() - tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0] - URL = PIXIV_URL + tag - - try: - data = json.loads(await get_bytes(URL))["illusts"] - except RequestError: - raise RequestError("Request msg failed!") - - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.acc_setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.acc_setu(data))) - - -__doc__ = """ -涩图设置 -权限组:维护者 -用法: - 涩图设置 启用/禁用r18 - 涩图设置 启用/禁用压缩 - 涩图设置 启用/禁用本地涩图 - 涩图设置 启用/禁用混合本地涩图 -""" - -setu_config = sv.on_command(cmd="涩图设置", docs=__doc__, permission=SUPERUSER) - - -@setu_config.handle() -async def _setu_config(bot: Bot, event: MessageEvent) -> None: - global R18_ENABLED, SIZE_REDUCE, USE_LOCAL_DATA, MIX_LOCAL_DATA - msg = str(event.message).split(" ") - if msg[0] == "": - repo = "可用设置如下:\n启用/禁用r18\n启用/禁用压缩\n启用/禁用本地涩图\n启用/禁用混合本地涩图" - await setu_config.finish(repo) - elif msg[0] == "启用r18": - R18_ENABLED = 1 - await setu_config.finish("已启用r18") - elif msg[0] == "禁用r18": - R18_ENABLED = 0 - await setu_config.finish("已禁用r18") - elif msg[0] == "启用压缩": - SIZE_REDUCE = True - await setu_config.finish("已启用图片压缩") - elif msg[0] == "禁用压缩": - SIZE_REDUCE = False - await setu_config.finish("已禁用图片压缩") - elif msg[0] == "启用本地涩图": - USE_LOCAL_DATA = True - await setu_config.finish("已启用本地涩图") - elif msg[0] == "禁用本地涩图": - USE_LOCAL_DATA = False - await setu_config.finish("已禁用本地涩图") - elif msg[0] == "启用混合本地涩图": - MIX_LOCAL_DATA = True - await setu_config.finish("已启用混合本地涩图") - elif msg[0] == "禁用混合本地涩图": - MIX_LOCAL_DATA = False - await setu_config.finish("已禁用混合本地涩图") - else: - await setu_config.finish("阿!请检查拼写") - - -not_get_se = sv.on_command("不够涩") - - -@not_get_se.handle() -async def _not_se(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 1, 120) - if check: - msg = choice(["那你来发", "那你来发❤"]) - await not_get_se.finish(msg) diff --git a/ATRI/plugins/setu/modules/scheduler.py b/ATRI/plugins/setu/modules/scheduler.py deleted file mode 100644 index e66c398..0000000 --- a/ATRI/plugins/setu/modules/scheduler.py +++ /dev/null @@ -1,15 +0,0 @@ -import shutil -from ATRI.log import logger as log -from ATRI.utils.apscheduler import scheduler - -from .data_source import TEMP_DIR - - [email protected]_job("interval", days=7, misfire_grace_time=10) -async def clear_temp(): - log.info("正在清除涩图缓存") - try: - shutil.rmtree(TEMP_DIR) - log.info("清除缓存成功!") - except Exception: - log.warn("清除图片缓存失败!") diff --git a/ATRI/plugins/setu/modules/store.py b/ATRI/plugins/setu/modules/store.py deleted file mode 100644 index e278c77..0000000 --- a/ATRI/plugins/setu/modules/store.py +++ /dev/null @@ -1,136 +0,0 @@ -import json -from random import choice - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment - -from ATRI.service import Service as sv -from ATRI.utils.request import get_bytes -from ATRI.exceptions import RequestError - -from .data_source import SetuData - - -API_URL: str = "https://api.kyomotoi.moe/api/pixiv/illust?id=" - - -__doc__ = """ -为本地添加涩图! -权限组:维护者 -用法: - 添加涩图 (pid) -补充: - pid: Pixiv 作品id -""" - - -add_setu = sv.on_command(cmd="添加涩图", docs=__doc__, permission=SUPERUSER) - - -@add_setu.args_parser # type: ignore -async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await add_setu.finish("好吧...") - if not msg: - await add_setu.reject("涩图(pid)速发!") - else: - state["setu_add"] = msg - - -@add_setu.handle() -async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["setu_add"] = msg - - -@add_setu.got("setu_add", prompt="涩图(pid)速发!") -async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - pid = state["setu_add"] - - URL = API_URL + pid - try: - data = json.loads(await get_bytes(URL))["illust"] - except RequestError: - raise RequestError("Request failed!") - - try: - pic = data["meta_single_page"]["original_image_url"].replace( - "pximg.net", "pixiv.cat" - ) - except Exception: - pic = choice(data["meta_pages"])["image_urls"]["original"].replace( - "pximg.net", "pixiv.cat" - ) - - d = { - "pid": pid, - "title": data["title"], - "tags": str(data["tags"]), - "user_id": data["user"]["id"], - "user_name": data["user"]["name"], - "user_account": data["user"]["account"], - "url": pic, - } - await SetuData.add_data(d) - - show_img = data["image_urls"]["medium"].replace("pximg.net", "pixiv.cat") - msg = ( - "好欸!是新涩图:\n" - f"Pid: {pid}\n" - f"Title: {data['title']}\n" - f"{MessageSegment.image(show_img)}" - ) - await add_setu.finish(Message(msg)) - - -__doc__ = """ -删除涩图! -权限组:维护者 -用法: - 删除涩图 (pid) -补充: - pid: Pixiv 作品id -""" - - -del_setu = sv.on_command(cmd="删除涩图", docs=__doc__, permission=SUPERUSER) - - -@del_setu.args_parser # type: ignore -async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await add_setu.finish("好吧...") - if not msg: - await add_setu.reject("涩图(pid)速发!") - else: - state["setu_del"] = msg - - -@del_setu.handle() -async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["setu_del"] = msg - - -@del_setu.got("setu_del", prompt="涩图(pid)速发!") -async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - pid = int(state["setu_del"]) - await SetuData.del_data(pid) - await del_setu.finish(f"涩图({pid})已删除...") - - -count_setu = sv.on_command(cmd="涩图总量", permission=SUPERUSER) - - -@count_setu.handle() -async def _count_setu(bot: Bot, event: MessageEvent) -> None: - msg = f"咱本地搭载了 {await SetuData.count()} 张涩图!" - await count_setu.finish(msg) diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py deleted file mode 100644 index c49570b..0000000 --- a/ATRI/plugins/status.py +++ /dev/null @@ -1,115 +0,0 @@ -import time -import psutil -from datetime import datetime -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.log import logger as log -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.exceptions import GetStatusError -from ATRI.utils.apscheduler import scheduler -from ATRI.config import BotSelfConfig - - -__doc__ = """ -测试机器人状态 -权限组:所有人 -用法: - /ping -""" - -ping = sv.on_command(cmd="/ping", docs="测试机器人", rule=is_in_service("ping")) - - -async def _ping(bot: Bot, event: MessageEvent) -> None: - await ping.finish("I'm fine.") - - -__doc__ = """ -检查机器人性能 -权限组:所有人 -用法: - /status -""" - -status = sv.on_command(cmd="/status", docs=__doc__, rule=is_in_service("status")) - - -async def _status(bot: Bot, event: MessageEvent) -> None: - try: - cpu = psutil.cpu_percent(interval=1) - mem = psutil.virtual_memory().percent - disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore - now = time.time() - boot = psutil.boot_time() - up_time = str( - datetime.utcfromtimestamp(now).replace(microsecond=0) - - datetime.utcfromtimestamp(boot).replace(microsecond=0) - ) - except GetStatusError: - raise GetStatusError("Failed to get status.") - - msg = "アトリは、高性能ですから!" - - if cpu > 90: # type: ignore - msg = "咱感觉有些头晕..." - if mem > 90: - msg = "咱感觉有点头晕并且有点累..." - elif mem > 90: - msg = "咱感觉有点累..." - elif disk > 90: - msg = "咱感觉身体要被塞满了..." - - msg0 = ( - "Self status:\n" - f"* CPU: {cpu}%\n" - f"* MEM: {mem}%\n" - f"* DISK: {disk}%\n" - f"* netSENT: {inteSENT}MB\n" - f"* netRECV: {inteRECV}MB\n" - f"* Runtime: {up_time}\n" - ) + msg - - await status.finish(msg0) - - [email protected]_job("interval", minutes=5, misfire_grace_time=10) -async def _(): - log.info("开始自检") - try: - cpu = psutil.cpu_percent(interval=1) - mem = psutil.virtual_memory().percent - disk = psutil.disk_usage("/").percent - inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore - inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore - except GetStatusError: - raise GetStatusError("Failed to get status.") - - msg = str() - if cpu > 90: # type: ignore - msg = "咱感觉有些头晕..." - if mem > 90: - msg = "咱感觉有点头晕并且有点累..." - elif mem > 90: - msg = "咱感觉有点累..." - elif disk > 90: - msg = "咱感觉身体要被塞满了..." - else: - log.info("运作正常") - return - - msg0 = ( - "Self status:\n" - f"* CPU: {cpu}%\n" - f"* MEM: {mem}%\n" - f"* DISK: {disk}%\n" - f"* netSENT: {inteSENT}MB\n" - f"* netRECV: {inteRECV}MB\n" - ) + msg - - for sup in BotSelfConfig.superusers: - await sv.NetworkPost.send_private_msg(user_id=sup, message=msg0) diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py new file mode 100644 index 0000000..8955cb1 --- /dev/null +++ b/ATRI/plugins/status/__init__.py @@ -0,0 +1,26 @@ +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.utils.apscheduler import scheduler +from .data_source import IsSurvive + + +ping = IsSurvive().on_command("/ping", "检测bot简单信息处理速度") + +async def _ping(bot: Bot, event: MessageEvent): + await ping.finish(IsSurvive.ping()) + + +status = IsSurvive().on_command("/status", "查看运行资源占用") + +async def _status(bot: Bot, event: MessageEvent): + msg, _ = IsSurvive.get_status() + await status.finish(msg) + + [email protected]_job("interval", minutes=10, misfire_grace_time=15) +async def _status_checking(): + msg, stat = IsSurvive().get_status() + if not stat: + await status.finish(msg) diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py new file mode 100644 index 0000000..c353313 --- /dev/null +++ b/ATRI/plugins/status/data_source.py @@ -0,0 +1,71 @@ +import time +import psutil +from datetime import datetime + +from ATRI.service import Service +from ATRI.log import logger as log +from ATRI.rule import is_in_service +from ATRI.exceptions import GetStatusError + + +__doc__ = """ +检查咱自身状态 +""" + + +class IsSurvive(Service): + + def __init__(self): + Service.__init__(self, "状态", __doc__, rule=is_in_service("状态")) + + @staticmethod + def ping() -> str: + return "I'm fine." + + @staticmethod + def get_status(): + log.info("开始检查资源消耗...") + try: + cpu = psutil.cpu_percent(interval=1) + mem = psutil.virtual_memory().percent + disk = psutil.disk_usage("/").percent + inteSENT = psutil.net_io_counters().bytes_sent / 1000000 # type: ignore + inteRECV = psutil.net_io_counters().bytes_recv / 1000000 # type: ignore + + now = time.time() + boot = psutil.boot_time() + up_time = str( + datetime.utcfromtimestamp(now).replace(microsecond=0) + - datetime.utcfromtimestamp(boot).replace(microsecond=0) + ) + except GetStatusError: + raise GetStatusError("Failed to get status.") + + msg = "アトリは、高性能ですから!" + if cpu > 90: # type: ignore + msg = "咱感觉有些头晕..." + is_ok = False + if mem > 90: + msg = "咱感觉有点头晕并且有点累..." + is_ok = False + elif mem > 90: + msg = "咱感觉有点累..." + is_ok = False + elif disk > 90: + msg = "咱感觉身体要被塞满了..." + is_ok = False + else: + log.info("资源占用正常") + is_ok = True + + msg0 = ( + "Self status:\n" + f"* CPU: {cpu}%\n" + f"* MEM: {mem}%\n" + f"* DISK: {disk}%\n" + f"* netSENT: {inteSENT}MB\n" + f"* netRECV: {inteRECV}MB\n" + f"* Runtime: {up_time}\n" + ) + msg + + return msg0, is_ok diff --git a/ATRI/plugins/util/__init__.py b/ATRI/plugins/util/__init__.py new file mode 100644 index 0000000..1e31bff --- /dev/null +++ b/ATRI/plugins/util/__init__.py @@ -0,0 +1,134 @@ +import re +from random import choice, random + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.utils.limit import FreqLimiter +from .data_source import Encrypt, Utils, Yinglish + + +roll = Utils().on_command("/roll", "骰子~用法:1d10 或 2d10+2d10+more") + [email protected]_parser # type: ignore +async def _get_roll(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await roll.finish("好吧...") + if not msg: + await roll.reject("参数呢?!格式:1d10 或 2d10+2d10+more") + else: + state["roll"] = msg + +async def _ready_roll(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["roll"] = msg + [email protected]("roll", "参数呢?!格式:1d10 或 2d10+2d10+more") +async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State): + text = state["roll"] + match = re.match(r"^([\dd+\s]+?)$", text) + + if not match: + await roll.finish("阿——!参数不对!格式:1d10 或 2d10+2d10+more") + + msg = Utils().roll_dice(text) + await roll.finish(msg) + + +encrypt_en = Utils().on_command("加密", "我们之前的秘密❤") + +@encrypt_en.args_parser # type: ignore +async def _get_encr_en_text(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await roll.finish("好吧...") + if not msg: + await roll.reject("内容呢?!") + else: + state["encr_en_text"] = msg + +@encrypt_en.handle() +async def _ready_en(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["encr_en_text"] = msg + +@encrypt_en.got("encr_en_text", "内容呢?!") +async def _deal_en(bot: Bot, event: MessageEvent, state: T_State): + text = state["encr_en_text"] + is_ok = len(text) + if is_ok < 10: + await encrypt_en.reject("太短不加密!") + en = Encrypt() + result = en.encode(text) + await encrypt_en.finish(result) + + +encrypt_de = Utils().on_command("解密", "解开我们的秘密❤") + +@encrypt_de.args_parser # type: ignore +async def _get_encr_de_text(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await encrypt_de.finish("好吧...") + if not msg: + await encrypt_de.reject("内容呢?!") + else: + state["encr_de_text"] = msg + +@encrypt_de.handle() +async def _ready_de(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + if msg: + state["encr_de_text"] = msg + +@encrypt_de.got("encr_de_text", "内容呢?!") +async def _deal_de(bot: Bot, event: MessageEvent, state: T_State): + text = state["encr_de_text"] + en = Encrypt() + result = en.decode(text) + await encrypt_de.finish(result) + + +sepi = Utils().on_command("涩批一下", "将正常的句子涩一涩~") + +_sepi_flmt = FreqLimiter(3) +_sepi_flmt_notice = ["涩批爬", "✌🥵✌"] + [email protected]_parser # type: ignore +async def _get_sepi(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了", "取消"] + if msg in quit_list: + await sepi.finish("好吧...") + if not msg: + await sepi.reject("内容呢?!") + else: + state["sepi_text"] = msg + +async def _ready_sepi(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _sepi_flmt.check(user_id): + await sepi.finish(choice(_sepi_flmt_notice)) + + msg = str(event.message).strip() + if msg: + state["sepi_text"] = msg + [email protected]("sepi_text", "内容呢?!") +async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + msg = state["sepi_text"] + if len(msg) < 4: + await sepi.finish("这么短?涩不起来!") + + result = Yinglish.deal(msg, random()) + _sepi_flmt.start_cd(user_id) + await sepi.finish(result) diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/util/data_source.py index 92ae930..a6afdf3 100644 --- a/ATRI/plugins/utils/data_source.py +++ b/ATRI/plugins/util/data_source.py @@ -4,40 +4,58 @@ import jieba.posseg as pseg from typing import Union, Optional from random import random, choice, randint +from ATRI.service import Service +from ATRI.rule import is_in_service -def roll_dice(par: str) -> str: - result = 0 - proc = "" - proc_list = [] - p = par.split("+") - for i in p: - args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i) - args = list(args[0]) +__doc__ = """ +非常实用(?)的工具们! +""" - args[0] = args[0] or 1 - if int(args[0]) >= 5000 or int(args[2]) >= 5000: - return "阿...好大......" - - for a in range(1, int(args[0]) + 1): - rd = randint(1, int(args[2])) - result = result + rd - - if len(proc_list) <= 10: - proc_list.append(rd) - - if len(proc_list) <= 10: - proc += "+".join(map(str, proc_list)) - elif len(proc_list) > 10: - proc += "太长了不展示了就酱w" - else: - proc += str(result) - - result = f"{par}=({proc})={result}" - return result +class Utils(Service): + + def __init__(self): + Service.__init__(self, "小工具", __doc__, rule=is_in_service("小工具")) + + @staticmethod + def roll_dice(par: str) -> str: + result = 0 + proc = "" + proc_list = [] + p = par.split("+") + + for i in p: + args = re.findall(r"(\d{0,10})(?:(d)(\d{1,10}))", i) + args = list(args[0]) + + args[0] = args[0] or 1 + if int(args[0]) >= 5000 or int(args[2]) >= 5000: + return "阿...好大......" + + for a in range(1, int(args[0]) + 1): + rd = randint(1, int(args[2])) + result = result + rd + + if len(proc_list) <= 10: + proc_list.append(rd) + + if len(proc_list) <= 10: + proc += "+".join(map(str, proc_list)) + elif len(proc_list) > 10: + proc += "太长了不展示了就酱w" + else: + proc += str(result) -class Encrypt: + result = f"{par}=({proc})={result}" + return result + +class Encrypt(Utils): + """ + 某nb改的(逃 + 总之就是非常nb + """ + cr = "ĀāĂ㥹ÀÁÂÃÄÅ" cc = "ŢţŤťŦŧṪṫṬṭṮṯṰṱ" cn = "ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr" @@ -168,9 +186,9 @@ class Encrypt: return self._decodeBytes(s).decode(encoding) except UnicodeDecodeError: raise ValueError("Decoding failed") - - -class Yinglish: + +class Yinglish(Utils): + @staticmethod def _to_ying(x, y, ying) -> str: if random() > ying: diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py deleted file mode 100644 index 608f1f5..0000000 --- a/ATRI/plugins/utils/__init__.py +++ /dev/null @@ -1,120 +0,0 @@ -import re -from random import random - -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from .data_source import roll_dice, Encrypt, Yinglish - - -__doc__ = """ -roll一下 -权限组:所有人 -用法: - /roll (int)d(int)+... -补充: - int: 阿拉伯数字 -示例: - /roll 1d10+10d9+4d5+2d3 -""" - -roll = sv.on_command(cmd="/roll", docs=__doc__, rule=is_in_service("roll")) - - [email protected]_parser # type: ignore -async def _load_roll(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await roll.finish("好吧...") - if not msg: - await roll.reject("点呢?(1d10+...)") - else: - state["resu"] = msg - - -async def _roll(bot: Bot, event: MessageEvent, state: T_State) -> None: - args = str(event.message).strip() - if args: - state["resu"] = args - - [email protected]("resu", prompt="roll 参数不能为空~!\ndemo:1d10 或 2d10+2d10") -async def _deal_roll(bot: Bot, event: MessageEvent, state: T_State) -> None: - resu = state["resu"] - match = re.match(r"^([\dd+\s]+?)$", resu) - - if not match: - await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10") - - await roll.finish(roll_dice(resu)) - - -__doc__ = """ -加密传输(bushi -权限组:所有人 -用法: - /enc e,d msg -补充: - e,d:对应 编码/解码 - msg: 目标内容 -示例: - /enc e アトリは高性能ですから! -""" - -encrypt = sv.on_command(cmd="/enc", docs=__doc__, rule=is_in_service("enc")) - - -async def _encrypt(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - _type = msg[0] - s = msg[1] - e = Encrypt() - - if _type == "e": - await encrypt.finish(e.encode(s)) - elif _type == "d": - await encrypt.finish(e.decode(s)) - else: - await encrypt.finish("请检查输入~!") - - -__doc__ = """ -涩批一下! -权限组:所有人 -用法: - 涩批一下 (msg) -""" - -sepi = sv.on_command(cmd="涩批一下", docs=__doc__, rule=is_in_service("涩批一下")) - - -async def _load_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - quit_list = ["算了", "罢了", "取消"] - if msg in quit_list: - await sepi.finish("好吧...") - if not msg: - await sepi.reject("话呢?") - else: - state["sepi_msg"] = msg - - -async def _sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["sepi_msg"] = msg - - [email protected]("sepi_msg", prompt="话呢?") -async def _deal_sepi(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["sepi_msg"] - if len(msg) < 4: - await sepi.finish("这么短?涩不起来!") - await sepi.finish(Yinglish.deal(msg, random())) diff --git a/ATRI/plugins/wife/__init__.py b/ATRI/plugins/wife/__init__.py deleted file mode 100644 index 2b6f6a3..0000000 --- a/ATRI/plugins/wife/__init__.py +++ /dev/null @@ -1,116 +0,0 @@ -import re -import asyncio -from random import choice - -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import ( - Bot, - MessageEvent, - GroupMessageEvent, - PrivateMessageEvent, -) - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.utils.limit import is_too_exciting - -from .data_source import Tsuma - - -__doc__ = """ -好欸!是老婆! -权限组:所有人 -用法: - 抽老婆 # 获取一位老婆 - 查老婆 # 查询老婆,如+at对象可查询对方 - 我要离婚 # 离婚... -""" - -roll_wife = sv.on_command(cmd="抽老婆", docs=__doc__, rule=is_in_service("抽老婆")) - - -@roll_wife.handle() -async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None: - user = event.user_id - gender = event.sender.sex - group = event.group_id - user_name = await bot.get_group_member_info(group_id=group, user_id=user) - user_name = user_name["nickname"] - run = is_too_exciting(user, 1, seconds=5) - if not run: - return - - check_repo, if_h = Tsuma.check_tsuma(str(user)) - if if_h: - await roll_wife.finish(check_repo) - - msg = "5秒后咱将随机抽取一位群友成为\n" f"{user_name} 的老婆!究竟是谁呢~?" - await bot.send(event, msg) - await asyncio.sleep(5) - - async def get_luck_user(): - luck_list = await bot.get_group_member_list(group_id=group) - return choice(luck_list) - - while True: - luck_user = await get_luck_user() - luck_qq = luck_user["user_id"] - if user != luck_qq: - break - - luck_gender = luck_user["sex"] - luck_user = luck_user["nickname"] - d = { - "nickname": user_name, - "gender": gender, - "lassie": {"nickname": luck_user, "qq": luck_qq, "gender": luck_gender}, - } - - if str(luck_qq) == str(event.self_id): - Tsuma.got_tsuma(str(user), d) - msg = "老婆竟是我自己~❤" - else: - msg = Tsuma.got_tsuma(str(user), d) - - await roll_wife.finish(msg) - - -@roll_wife.handle() -async def _no_pr(bot: Bot, event: PrivateMessageEvent) -> None: - await roll_wife.finish("对8起...该功能只对群聊开放(") - - -inquire_wife = sv.on_command(cmd="查老婆", rule=is_in_service("抽老婆")) - - -@inquire_wife.handle() -async def _inq_wife(bot: Bot, event: MessageEvent) -> None: - msg = str(event.message).split(" ") - if msg[0] == "": - user = str(event.user_id) - await inquire_wife.finish(Tsuma.inquire_tsuma(user)) - else: - aim = re.findall(r"qq=(.*?)]", msg[0])[0] - await inquire_wife.finish(Tsuma.inquire_tsuma(aim).replace("你", "ta")) - - -want_divorce = sv.on_command(cmd="我要离婚", rule=is_in_service("抽老婆")) - - -@want_divorce.handle() -async def _want_div(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["is_d"] = msg - - -@want_divorce.got("is_d", prompt="你确定吗?(是/否)") -async def _deal_div(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = state["is_d"] - user = str(event.user_id) - name = event.sender.nickname - - if msg in ["是", "确定"]: - await want_divorce.finish(Tsuma.divorce(user)) - else: - await want_divorce.finish(f"({name})回心转意了!") diff --git a/ATRI/plugins/wife/data_source.py b/ATRI/plugins/wife/data_source.py deleted file mode 100644 index 9665351..0000000 --- a/ATRI/plugins/wife/data_source.py +++ /dev/null @@ -1,87 +0,0 @@ -import os -import json -from pathlib import Path - - -WIFE_DIR = Path(".") / "ATRI" / "data" / "database" / "wife" -MERRY_LIST_PATH = WIFE_DIR / "merry_list.json" -os.makedirs(WIFE_DIR, exist_ok=True) - - -class Tsuma: - @staticmethod - def _load_tsuma() -> dict: - try: - return json.loads(MERRY_LIST_PATH.read_bytes()) - except FileNotFoundError: - with open(MERRY_LIST_PATH, "w") as r: - r.write(json.dumps({}, indent=4)) - return dict() - - @staticmethod - def _store_tsuma(data: dict) -> None: - with open(MERRY_LIST_PATH, "w") as r: - r.write(json.dumps(data, indent=4)) - - @classmethod - def check_tsuma(cls, user: str): - data = cls._load_tsuma() - if user in data: - msg = "阿,你已经有老婆惹!" f"ta是:{data[user]['lassie']['nickname']}" - return msg, True - else: - return "悲——你还没老婆...", False - - @classmethod - def inquire_tsuma(cls, user: str) -> str: - data = cls._load_tsuma() - if user in data: - return f"你的老婆是:{data[user]['lassie']['nickname']} 哦~❤" - else: - return "悲——你还没老婆..." - - @classmethod - def got_tsuma(cls, user: str, d: dict) -> str: - check_repo, if_h = cls.check_tsuma(user) # 防止出现多人同时操作导致 NTR 事件 - if if_h: - return check_repo - else: - data = cls._load_tsuma() - data[user] = { - "nickname": d["nickname"], - "gender": d["gender"], - "lassie": { - "nickname": d["lassie"]["nickname"], - "qq": d["lassie"]["qq"], - "gender": d["lassie"]["gender"], - }, - } - cls._store_tsuma(data) - - data[d["lassie"]["qq"]] = { - "nickname": d["lassie"]["nickname"], - "gender": d["lassie"]["gender"], - "lassie": { - "nickname": d["nickname"], - "qq": user, - "gender": d["gender"], - }, - } - cls._store_tsuma(data) - - msg = ( - f"> {d['lassie']['nickname']}({d['lassie']['qq']})\n" - f"恭喜成为 {d['nickname']} 的老婆~⭐" - ) - return msg - - @classmethod - def divorce(cls, user: str) -> str: - data = cls._load_tsuma() - if not user in data: - return "悲——你还没老婆。。" - - msg = f"悲——,({data[user]['nickname']})抛弃了({data[user]['lassie']['nickname']})" - del data[user] - cls._store_tsuma(data) - return msg |