diff options
Diffstat (limited to 'ATRI/plugins/kimo')
-rw-r--r-- | ATRI/plugins/kimo/__init__.py | 59 | ||||
-rw-r--r-- | ATRI/plugins/kimo/data_source.py | 136 |
2 files changed, 195 insertions, 0 deletions
diff --git a/ATRI/plugins/kimo/__init__.py b/ATRI/plugins/kimo/__init__.py new file mode 100644 index 0000000..b1c5698 --- /dev/null +++ b/ATRI/plugins/kimo/__init__.py @@ -0,0 +1,59 @@ +from random import choice + +from nonebot.matcher import Matcher +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import MessageEvent, Message +from nonebot.adapters.onebot.v11.helpers import Cooldown + +from ATRI.utils.apscheduler import scheduler +from .data_source import Kimo + + +_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."]) + + +kimo = Kimo().on_message("文爱", "闲聊(文爱") + + [email protected]([Cooldown(3, prompt=_chat_flmt_notice)]) +async def _chat(event: MessageEvent): + user_id = event.get_user_id() + msg = str(event.message) + repo = await Kimo().deal(msg, user_id) + try: + await kimo.finish(repo) + except Exception: + return + + +my_name_is = Kimo().on_command("叫我", "更改kimo时的称呼", aliases={"我是"}, priority=1) + + +@my_name_is.handle([Cooldown(3, prompt=_chat_flmt_notice)]) +async def _name(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("name", args) + + +@my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0") +async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")): + user_id = event.get_user_id() + repo = choice( + [ + f"好~w 那咱以后就称呼你为{new_name}!", + f"噢噢噢!原来你叫{new_name}阿~", + f"好欸!{new_name}ちゃん~~~", + "很不错的称呼呢w", + ] + ) + Kimo().name_is(user_id, new_name) + await my_name_is.finish(repo) + + [email protected]_job("interval", name="kimo词库检查更新", hours=3, misfire_grace_time=60) # type: ignore +async def _check_kimo(): + try: + await Kimo().update_data() + except BaseException: + pass diff --git a/ATRI/plugins/kimo/data_source.py b/ATRI/plugins/kimo/data_source.py new file mode 100644 index 0000000..6dd448a --- /dev/null +++ b/ATRI/plugins/kimo/data_source.py @@ -0,0 +1,136 @@ +import os +import json +from pathlib import Path +from jieba import posseg +from random import choice, shuffle + +from ATRI.service import Service +from ATRI.rule import to_bot, is_in_service +from ATRI.log import logger as log +from ATRI.utils import request +from ATRI.exceptions import ReadFileError, WriteError + + +CHAT_PATH = Path(".") / "data" / "database" / "kimo" +os.makedirs(CHAT_PATH, exist_ok=True) +KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json" + + +class Kimo(Service): + def __init__(self): + Service.__init__( + self, "kimo", "好像有点涩?", rule=to_bot() & is_in_service("kimo"), priority=5 + ) + + @staticmethod + async def _request(url: str) -> dict: + res = await request.get(url) + data = res.json() + return data + + @classmethod + async def _generate_data(cls) -> None: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + log.warning("未检测到词库,生成中") + data = await cls._request(KIMO_URL) + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("生成完成") + except WriteError: + raise WriteError("Writing kimo words failed!") + + @classmethod + async def _load_data(cls) -> dict: + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + with open(path, "r", encoding="utf-8") as r: + data = json.loads(r.read()) + return data + + @classmethod + async def update_data(cls) -> None: + log.info("更新闲聊词库ing...") + file_name = "kimo.json" + path = CHAT_PATH / file_name + if not path.is_file(): + await cls._generate_data() + + updata_data = await cls._request(KIMO_URL) + data = json.loads(path.read_bytes()) + for i in updata_data: + if i not in data: + data[i] = updata_data[i] + + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + log.info("kimo词库更新完成") + + @staticmethod + def name_is(user_id: str, new_name: str): + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + data = {} + + data = json.loads(path.read_bytes()) + data[user_id] = new_name + try: + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps(data, indent=4)) + except ReadFileError: + raise ReadFileError("Update user name failed!") + + @staticmethod + def load_name(user_id: str) -> str: + file_name = "users.json" + path = CHAT_PATH / file_name + if not path.is_file(): + with open(path, "w", encoding="utf-8") as w: + w.write(json.dumps({})) + return "你" + + data = json.loads(path.read_bytes()) + try: + result = data[user_id] + except BaseException: + result = "你" + return result + + @classmethod + async def deal(cls, msg: str, user_id: str) -> str: + keywords = posseg.lcut(msg) + shuffle(keywords) + + data = await cls._load_data() + + repo = str() + for i in keywords: + a = i.word + b = list(a) + try: + if b[0] == b[1]: + a = b[0] + except BaseException: + pass + if a in data: + repo = data.get(a, str()) + + if not repo: + temp_data = list(data) + shuffle(temp_data) + for i in temp_data: + if i in msg: + repo = data.get(i, str()) + + a = choice(repo) if type(repo) is list else repo + user_name = cls.load_name(user_id) + repo = a.replace("你", user_name) + return repo |