diff options
Diffstat (limited to 'ATRI/plugins/chat')
-rw-r--r-- | ATRI/plugins/chat/__init__.py | 74 | ||||
-rw-r--r-- | ATRI/plugins/chat/data_source.py | 136 |
2 files changed, 0 insertions, 210 deletions
diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py deleted file mode 100644 index 3cb5dbb..0000000 --- a/ATRI/plugins/chat/__init__.py +++ /dev/null @@ -1,74 +0,0 @@ -from random import choice - -from nonebot.matcher import Matcher -from nonebot.params import ArgPlainText, CommandArg -from nonebot.adapters.onebot.v11 import MessageEvent, Message -from nonebot.adapters.onebot.v11.helpers import Cooldown - -from ATRI.utils.apscheduler import scheduler -from .data_source import Chat - - -_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) - - -chat = Chat().on_message("文爱", "闲聊(文爱") - - [email protected]([Cooldown(3, prompt=_chat_flmt_notice)]) -async def _chat(event: MessageEvent): - user_id = event.get_user_id() - msg = str(event.message) - repo = await Chat().deal(msg, user_id) - try: - await chat.finish(repo) - except Exception: - return - - -my_name_is = Chat().on_command("叫我", "更改闲聊(文爱)时的称呼", aliases={"我是"}, priority=1) - - -@my_name_is.handle([Cooldown(3, prompt=_chat_flmt_notice)]) -async def _name(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("name", args) - - -@my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0") -async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")): - user_id = event.get_user_id() - repo = choice( - [ - f"好~w 那咱以后就称呼你为{new_name}!", - f"噢噢噢!原来你叫{new_name}阿~", - f"好欸!{new_name}ちゃん~~~", - "很不错的称呼呢w", - ] - ) - Chat().name_is(user_id, new_name) - await my_name_is.finish(repo) - - -say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1) - - [email protected]([Cooldown(3, prompt=_chat_flmt_notice)]) -async def _ready_say(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("say", args) - - [email protected]("say", "想要咱复读啥呢...") -async def _deal_say(msg: str = ArgPlainText("say")): - await say.finish(msg) - - [email protected]_job("interval", name="闲聊词库检查更新", hours=3, misfire_grace_time=60) # type: ignore -async def _check_kimo(): - try: - await Chat().update_data() - except BaseException: - pass diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/chat/data_source.py deleted file mode 100644 index 70c6345..0000000 --- a/ATRI/plugins/chat/data_source.py +++ /dev/null @@ -1,136 +0,0 @@ -import os -import json -from pathlib import Path -from jieba import posseg -from random import choice, shuffle - -from ATRI.service import Service -from ATRI.rule import to_bot, is_in_service -from ATRI.log import logger as log -from ATRI.utils import request -from ATRI.exceptions import ReadFileError, WriteError - - -CHAT_PATH = Path(".") / "data" / "database" / "chat" -os.makedirs(CHAT_PATH, exist_ok=True) -KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" - - -class Chat(Service): - def __init__(self): - Service.__init__( - self, "闲聊", "好像有点涩?", rule=to_bot() & is_in_service("闲聊"), priority=5 - ) - - @staticmethod - async def _request(url: str) -> dict: - res = await request.get(url) - data = res.json() - return data - - @classmethod - async def _generate_data(cls) -> None: - file_name = "kimo.json" - path = CHAT_PATH / file_name - if not path.is_file(): - log.warning("未检测到词库,生成中") - data = await cls._request(KIMO_URL) - try: - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - log.info("生成完成") - except WriteError: - raise WriteError("Writing kimo words failed!") - - @classmethod - async def _load_data(cls) -> dict: - file_name = "kimo.json" - path = CHAT_PATH / file_name - if not path.is_file(): - await cls._generate_data() - - with open(path, "r", encoding="utf-8") as r: - data = json.loads(r.read()) - return data - - @classmethod - async def update_data(cls) -> None: - log.info("更新闲聊词库ing...") - file_name = "kimo.json" - path = CHAT_PATH / file_name - if not path.is_file(): - await cls._generate_data() - - updata_data = await cls._request(KIMO_URL) - data = json.loads(path.read_bytes()) - for i in updata_data: - if i not in data: - data[i] = updata_data[i] - - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - log.info("闲聊词库更新完成") - - @staticmethod - def name_is(user_id: str, new_name: str): - file_name = "users.json" - path = CHAT_PATH / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - data = {} - - data = json.loads(path.read_bytes()) - data[user_id] = new_name - try: - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps(data, indent=4)) - except ReadFileError: - raise ReadFileError("Update user name failed!") - - @staticmethod - def load_name(user_id: str) -> str: - file_name = "users.json" - path = CHAT_PATH / file_name - if not path.is_file(): - with open(path, "w", encoding="utf-8") as w: - w.write(json.dumps({})) - return "你" - - data = json.loads(path.read_bytes()) - try: - result = data[user_id] - except BaseException: - result = "你" - return result - - @classmethod - async def deal(cls, msg: str, user_id: str) -> str: - keywords = posseg.lcut(msg) - shuffle(keywords) - - data = await cls._load_data() - - repo = str() - for i in keywords: - a = i.word - b = list(a) - try: - if b[0] == b[1]: - a = b[0] - except BaseException: - pass - if a in data: - repo = data.get(a, str()) - - if not repo: - temp_data = list(data) - shuffle(temp_data) - for i in temp_data: - if i in msg: - repo = data.get(i, str()) - - a = choice(repo) if type(repo) is list else repo - user_name = cls.load_name(user_id) - repo = a.replace("你", user_name) - return repo |