diff options
Diffstat (limited to 'ATRI/plugins/chat')
-rw-r--r-- | ATRI/plugins/chat/__init__.py | 112 | ||||
-rw-r--r-- | ATRI/plugins/chat/data_source.py | 139 |
2 files changed, 251 insertions, 0 deletions
diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py new file mode 100644 index 0000000..4864be5 --- /dev/null +++ b/ATRI/plugins/chat/__init__.py @@ -0,0 +1,112 @@ +from random import choice + +from nonebot.typing import T_State +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.utils import CoolqCodeChecker +from ATRI.utils.limit import FreqLimiter +from ATRI.utils.apscheduler import scheduler +from .data_source import Chat + + +_chat_flmt = FreqLimiter(3) +_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) + + +chat = Chat().on_message("闲聊(文爱") + +async def _chat(bot: Bot, event: MessageEvent): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await chat.finish(_chat_flmt_notice) + + msg = str(event.message) + repo = await Chat().deal(msg, user_id) + _chat_flmt.start_cd(user_id) + await chat.finish(repo) + +my_name_is = Chat().on_command("叫我", "更改闲聊(划掉 文爱)时的称呼", aliases={"我是"}, priority=1) + +@my_name_is.args_parser # type: ignore +async def _get_name(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await my_name_is.finish("好吧...") + if not msg: + await my_name_is.reject("欧尼酱想让咱如何称呼你呢!0w0") + else: + state["name"] = msg + +@my_name_is.handle() +async def _name(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await my_name_is.finish(_chat_flmt_notice) + + msg = str(event.message).strip() + if msg: + state["name"] = msg + +@my_name_is.got("name") +async def _deal_name(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + new_name = state["name"] + repo = choice([ + f"好~w 那咱以后就称呼你为{new_name}!", + f"噢噢噢!原来你叫{new_name}阿~", + f"好欸!{new_name}ちゃん~~~", + "很不错的称呼呢w" + ]) + Chat().name_is(user_id, new_name) + _chat_flmt.start_cd(user_id) + await my_name_is.finish(repo) + +say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1) + [email protected]_parser # type: ignore +async def _get_say(bot: Bot, event: MessageEvent, state: T_State): + msg = str(event.message).strip() + quit_list = ["算了", "罢了"] + if msg in quit_list: + await say.finish("好吧...") + if not msg: + await say.reject("欧尼酱想让咱如何称呼你呢!0w0") + else: + state["say"] = msg + +async def _ready_say(bot: Bot, event: MessageEvent, state: T_State): + user_id = event.get_user_id() + if not _chat_flmt.check(user_id): + await say.finish(_chat_flmt_notice) + + msg = str(event.message) + if msg: + state["say"] = msg + [email protected]("say") +async def _deal_say(bot: Bot, event: MessageEvent, state: T_State): + msg = state["say"] + check = CoolqCodeChecker(msg).check + if not check: + repo = choice([ + "不要...", + "这个咱不想复读!", + "不可以", + "不好!" + ]) + await say.finish(repo) + + user_id = event.get_user_id() + _chat_flmt.start_cd(user_id) + await say.finish(msg) + + [email protected]_job("interval", hours=3, misfire_grace_time=60) +async def _check_kimo(): + try: + await Chat().update_data() + except BaseException: + pass diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/chat/data_source.py new file mode 100644 index 0000000..1b0eec9 --- /dev/null +++ b/ATRI/plugins/chat/data_source.py @@ -0,0 +1,139 @@ +import os +import json +from pathlib import Path +from jieba import posseg +from random import choice, shuffle + +from ATRI.service import Service +from ATRI.rule import to_bot, is_in_service +from ATRI.log import logger as log +from ATRI.utils import request +from ATRI.exceptions import ReadFileError, WriteError + + +__doc__ = """ +好像有点涩?(偏文爱,需at +""" + +CHAT_PATH = Path(".") / "ATRI" / "data" / "database" / "chat" +os.makedirs(CHAT_PATH, exist_ok=True) +KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" + + +class Chat(Service): + + def __init__(self): + Service.__init__(self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5) + + @staticmethod + async def _request(url: str) -> dict: + res = await request.get(url) + data = await res.json() + return data + + @classmethod + async def _generate_data(cls) -> None: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + log.warning("未检测到闲聊词库,生成中") + data = await cls._request(KIMO_URL) + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("生成完成") + except WriteError: + raise WriteError("Writing kimo words failed!") + + @classmethod + async def _load_data(cls) -> dict: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + with open(path, "r", encoding="utf-8") as r: + data = json.loads(r.read()) + return data + + @classmethod + async def update_data(cls) -> None: + log.info("更新闲聊词库ing...") + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + updata_data = await cls._request(KIMO_URL) + data = json.loads(path.read_bytes()) + for i in updata_data: + if i not in data: + data[i] = updata_data[i] + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("闲聊词库更新完成") + + @staticmethod + def name_is(user_id: str, new_name: str): + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = {} + + data = json.loads(path.read_bytes()) + data[user_id] = new_name + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + except ReadFileError: + raise ReadFileError("Update user name failed!") + + @staticmethod + def load_name(user_id: str) -> str: + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return "你" + + data = json.loads(path.read_bytes()) + try: + result = data[user_id] + except BaseException: + result = "你" + return result + + @classmethod + async def deal(cls, msg: str, user_id: str) -> str: + keywords = posseg.lcut(msg) + shuffle(keywords) + + data = await cls._load_data() + + repo = str() + for i in keywords: + a = i.word + b = list(a) + try: + if b[0] == b[1]: + a = b[0] + except BaseException: + pass + if a in data: + repo = data.get(a, str()) + + if not repo: + temp_data = list(data) + shuffle(temp_data) + for i in temp_data: + if i in msg: + repo = data.get(i, str()) + + a = choice(repo) if type(repo) is list else repo + user_name = cls.load_name(user_id) + repo = a.replace("你", user_name) + return repo |