From d7176305b5658db05d2e2713c4b389d2081e3b51 Mon Sep 17 00:00:00 2001 From: Kyomotoi Date: Tue, 5 Apr 2022 15:18:00 +0800 Subject: =?UTF-8?q?=F0=9F=9A=9A=20=E6=8F=92=E4=BB=B6=20=20=E9=87=8D?= =?UTF-8?q?=E5=91=BD=E5=90=8D=E4=B8=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ATRI/plugins/chat/__init__.py | 74 --------------------- ATRI/plugins/chat/data_source.py | 136 --------------------------------------- ATRI/plugins/kimo/__init__.py | 59 +++++++++++++++++ ATRI/plugins/kimo/data_source.py | 136 +++++++++++++++++++++++++++++++++++++++ test/test_plugin_chat.py | 84 ------------------------ test/test_plugin_kimo.py | 43 +++++++++++++ 6 files changed, 238 insertions(+), 294 deletions(-) delete mode 100644 ATRI/plugins/chat/__init__.py delete mode 100644 ATRI/plugins/chat/data_source.py create mode 100644 ATRI/plugins/kimo/__init__.py create mode 100644 ATRI/plugins/kimo/data_source.py delete mode 100644 test/test_plugin_chat.py create mode 100644 test/test_plugin_kimo.py diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py deleted file mode 100644 index 3cb5dbb..0000000 --- a/ATRI/plugins/chat/__init__.py +++ /dev/null @@ -1,74 +0,0 @@ -from random import choice - -from nonebot.matcher import Matcher -from nonebot.params import ArgPlainText, CommandArg -from nonebot.adapters.onebot.v11 import MessageEvent, Message -from nonebot.adapters.onebot.v11.helpers import Cooldown - -from ATRI.utils.apscheduler import scheduler -from .data_source import Chat - - -_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) - - -chat = Chat().on_message("文爱", "闲聊(文爱") - - -@chat.handle([Cooldown(3, prompt=_chat_flmt_notice)]) -async def _chat(event: MessageEvent): - user_id = event.get_user_id() - msg = str(event.message) - repo = await Chat().deal(msg, user_id) - try: - await chat.finish(repo) - except Exception: - return - - -my_name_is = Chat().on_command("叫我", "更改闲聊(文爱)时的称呼", aliases={"我是"}, priority=1) - - -@my_name_is.handle([Cooldown(3, prompt=_chat_flmt_notice)]) -async def _name(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("name", args) - - -@my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0") -async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")): - user_id = event.get_user_id() - repo = choice( - [ - f"好~w 那咱以后就称呼你为{new_name}!", - f"噢噢噢!原来你叫{new_name}阿~", - f"好欸!{new_name}ちゃん~~~", - "很不错的称呼呢w", - ] - ) - Chat().name_is(user_id, new_name) - await my_name_is.finish(repo) - - -say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1) - - -@say.handle([Cooldown(3, prompt=_chat_flmt_notice)]) -async def _ready_say(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("say", args) - - -@say.got("say", "想要咱复读啥呢...") -async def _deal_say(msg: str = ArgPlainText("say")): - await say.finish(msg) - - -@scheduler.scheduled_job("interval", name="闲聊词库检查更新", hours=3, misfire_grace_time=60) # type: ignore -async def _check_kimo(): - try: - await Chat().update_data() - except BaseException: - pass diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/chat/data_source.py deleted file mode 100644 index 70c6345..0000000 --- a/ATRI/plugins/chat/data_source.py +++ /dev/null @@ -1,136 +0,0 @@ -import os -import json -from pathlib import Path -from jieba import posseg -from random import choice, shuffle - -from ATRI.service import Service -from ATRI.rule import to_bot, is_in_service -from ATRI.log import logger as log -from ATRI.utils import request -from ATRI.exceptions import ReadFileError, WriteError - - -CHAT_PATH = Path(".") / "data" / "database" / "chat" -os.makedirs(CHAT_PATH, exist_ok=True) -KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" - - -class Chat(Service): - def __init__(self): - Service.__init__( - self, "闲聊", "好像有点涩?", rule=to_bot() & is_in_service("闲聊"), priority=5 - ) - - @staticmethod - async def _request(url: str) -> dict: - res = await request.get(url) - data = res.json() - return data - - @classmethod - async def _generate_data(cls) -> None: - file_name = "kimo.json" - path = CHAT_PATH / file_name - if not path.is_file(): - log.warning("未检测到词库,生成中") - data = await cls._request(KIMO_URL) - try: - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - log.info("生成完成") - except WriteError: - raise WriteError("Writing kimo words failed!") - - @classmethod - async def _load_data(cls) -> dict: - file_name = "kimo.json" - path = CHAT_PATH / file_name - if not path.is_file(): - await cls._generate_data() - - with open(path, "r", encoding="utf-8") as r: - data = json.loads(r.read()) - return data - - @classmethod - async def update_data(cls) -> None: - log.info("更新闲聊词库ing...") - file_name = "kimo.json" - path = CHAT_PATH / file_name - if not path.is_file(): - await cls._generate_data() - - updata_data = await cls._request(KIMO_URL) - data = json.loads(path.read_bytes()) - for i in updata_data: - if i not in data: - data[i] = updata_data[i] - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - log.info("闲聊词库更新完成") - - @staticmethod - def name_is(user_id: str, new_name: str): - file_name = "users.json" - path = CHAT_PATH / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - data = {} - - data = json.loads(path.read_bytes()) - data[user_id] = new_name - try: - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - except ReadFileError: - raise ReadFileError("Update user name failed!") - - @staticmethod - def load_name(user_id: str) -> str: - file_name = "users.json" - path = CHAT_PATH / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return "你" - - data = json.loads(path.read_bytes()) - try: - result = data[user_id] - except BaseException: - result = "你" - return result - - @classmethod - async def deal(cls, msg: str, user_id: str) -> str: - keywords = posseg.lcut(msg) - shuffle(keywords) - - data = await cls._load_data() - - repo = str() - for i in keywords: - a = i.word - b = list(a) - try: - if b[0] == b[1]: - a = b[0] - except BaseException: - pass - if a in data: - repo = data.get(a, str()) - - if not repo: - temp_data = list(data) - shuffle(temp_data) - for i in temp_data: - if i in msg: - repo = data.get(i, str()) - - a = choice(repo) if type(repo) is list else repo - user_name = cls.load_name(user_id) - repo = a.replace("你", user_name) - return repo diff --git a/ATRI/plugins/kimo/__init__.py b/ATRI/plugins/kimo/__init__.py new file mode 100644 index 0000000..b1c5698 --- /dev/null +++ b/ATRI/plugins/kimo/__init__.py @@ -0,0 +1,59 @@ +from random import choice + +from nonebot.matcher import Matcher +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import MessageEvent, Message +from nonebot.adapters.onebot.v11.helpers import Cooldown + +from ATRI.utils.apscheduler import scheduler +from .data_source import Kimo + + +_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) + + +kimo = Kimo().on_message("文爱", "闲聊(文爱") + + +@kimo.handle([Cooldown(3, prompt=_chat_flmt_notice)]) +async def _chat(event: MessageEvent): + user_id = event.get_user_id() + msg = str(event.message) + repo = await Kimo().deal(msg, user_id) + try: + await kimo.finish(repo) + except Exception: + return + + +my_name_is = Kimo().on_command("叫我", "更改kimo时的称呼", aliases={"我是"}, priority=1) + + +@my_name_is.handle([Cooldown(3, prompt=_chat_flmt_notice)]) +async def _name(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("name", args) + + +@my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0") +async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")): + user_id = event.get_user_id() + repo = choice( + [ + f"好~w 那咱以后就称呼你为{new_name}!", + f"噢噢噢!原来你叫{new_name}阿~", + f"好欸!{new_name}ちゃん~~~", + "很不错的称呼呢w", + ] + ) + Kimo().name_is(user_id, new_name) + await my_name_is.finish(repo) + + +@scheduler.scheduled_job("interval", name="kimo词库检查更新", hours=3, misfire_grace_time=60) # type: ignore +async def _check_kimo(): + try: + await Kimo().update_data() + except BaseException: + pass diff --git a/ATRI/plugins/kimo/data_source.py b/ATRI/plugins/kimo/data_source.py new file mode 100644 index 0000000..6dd448a --- /dev/null +++ b/ATRI/plugins/kimo/data_source.py @@ -0,0 +1,136 @@ +import os +import json +from pathlib import Path +from jieba import posseg +from random import choice, shuffle + +from ATRI.service import Service +from ATRI.rule import to_bot, is_in_service +from ATRI.log import logger as log +from ATRI.utils import request +from ATRI.exceptions import ReadFileError, WriteError + + +CHAT_PATH = Path(".") / "data" / "database" / "kimo" +os.makedirs(CHAT_PATH, exist_ok=True) +KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" + + +class Kimo(Service): + def __init__(self): + Service.__init__( + self, "kimo", "好像有点涩?", rule=to_bot() & is_in_service("kimo"), priority=5 + ) + + @staticmethod + async def _request(url: str) -> dict: + res = await request.get(url) + data = res.json() + return data + + @classmethod + async def _generate_data(cls) -> None: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + log.warning("未检测到词库,生成中") + data = await cls._request(KIMO_URL) + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("生成完成") + except WriteError: + raise WriteError("Writing kimo words failed!") + + @classmethod + async def _load_data(cls) -> dict: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + with open(path, "r", encoding="utf-8") as r: + data = json.loads(r.read()) + return data + + @classmethod + async def update_data(cls) -> None: + log.info("更新闲聊词库ing...") + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + updata_data = await cls._request(KIMO_URL) + data = json.loads(path.read_bytes()) + for i in updata_data: + if i not in data: + data[i] = updata_data[i] + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("kimo词库更新完成") + + @staticmethod + def name_is(user_id: str, new_name: str): + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = {} + + data = json.loads(path.read_bytes()) + data[user_id] = new_name + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + except ReadFileError: + raise ReadFileError("Update user name failed!") + + @staticmethod + def load_name(user_id: str) -> str: + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return "你" + + data = json.loads(path.read_bytes()) + try: + result = data[user_id] + except BaseException: + result = "你" + return result + + @classmethod + async def deal(cls, msg: str, user_id: str) -> str: + keywords = posseg.lcut(msg) + shuffle(keywords) + + data = await cls._load_data() + + repo = str() + for i in keywords: + a = i.word + b = list(a) + try: + if b[0] == b[1]: + a = b[0] + except BaseException: + pass + if a in data: + repo = data.get(a, str()) + + if not repo: + temp_data = list(data) + shuffle(temp_data) + for i in temp_data: + if i in msg: + repo = data.get(i, str()) + + a = choice(repo) if type(repo) is list else repo + user_name = cls.load_name(user_id) + repo = a.replace("你", user_name) + return repo diff --git a/test/test_plugin_chat.py b/test/test_plugin_chat.py deleted file mode 100644 index e04f1d3..0000000 --- a/test/test_plugin_chat.py +++ /dev/null @@ -1,84 +0,0 @@ -import pytest -from nonebug import App -from nonebot.adapters.onebot.v11 import MessageSegment - -from .utils import make_fake_message, make_fake_event - - -@pytest.mark.asyncio -async def test_chat(app: App): - from ATRI.plugins.chat import chat - - Message = make_fake_message() - - async with app.test_matcher(chat) as ctx: - bot = ctx.create_bot() - - msg = Message("爱你") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True) - - -@pytest.mark.asyncio -async def test_my_name_is(app: App): - from ATRI.plugins.chat import my_name_is - - Message = make_fake_message() - - async with app.test_matcher(my_name_is) as ctx: - bot = ctx.create_bot() - - msg = Message("叫我") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True) - - msg = Message("欧尼酱") - event = make_fake_event(_message=msg)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True) - - -@pytest.mark.asyncio -async def test_say(app: App): - from ATRI.plugins.chat import say - - Message = make_fake_message() - - async with app.test_matcher(say) as ctx: - bot = ctx.create_bot() - - msg = Message("说") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "想要咱复读啥呢...", True) - - msg = Message("nya~") - event = make_fake_event(_message=msg)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "nya~", True) - - async with app.test_matcher(say) as ctx: - bot = ctx.create_bot() - - msg = Message("说") - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "想要咱复读啥呢...", True) - - msg = Message( - MessageSegment.image( - "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png" - ) - ) - event = make_fake_event(_message=msg, _to_me=True)() - - ctx.receive_event(bot, event) - ctx.should_call_send(event, "不要...", True) diff --git a/test/test_plugin_kimo.py b/test/test_plugin_kimo.py new file mode 100644 index 0000000..ad6ff0b --- /dev/null +++ b/test/test_plugin_kimo.py @@ -0,0 +1,43 @@ +import pytest +from nonebug import App +from nonebot.adapters.onebot.v11 import MessageSegment + +from .utils import make_fake_message, make_fake_event + + +@pytest.mark.asyncio +async def test_chat(app: App): + from ATRI.plugins.kimo import kimo + + Message = make_fake_message() + + async with app.test_matcher(kimo) as ctx: + bot = ctx.create_bot() + + msg = Message("爱你") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True) + + +@pytest.mark.asyncio +async def test_my_name_is(app: App): + from ATRI.plugins.kimo import my_name_is + + Message = make_fake_message() + + async with app.test_matcher(my_name_is) as ctx: + bot = ctx.create_bot() + + msg = Message("叫我") + event = make_fake_event(_message=msg, _to_me=True)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True) + + msg = Message("欧尼酱") + event = make_fake_event(_message=msg)() + + ctx.receive_event(bot, event) + ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True) -- cgit v1.2.3