summaryrefslogtreecommitdiff
path: root/ATRI/plugins/kimo
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/kimo')
-rw-r--r--ATRI/plugins/kimo/__init__.py59
-rw-r--r--ATRI/plugins/kimo/data_source.py136
2 files changed, 195 insertions, 0 deletions
diff --git a/ATRI/plugins/kimo/__init__.py b/ATRI/plugins/kimo/__init__.py
new file mode 100644
index 0000000..b1c5698
--- /dev/null
+++ b/ATRI/plugins/kimo/__init__.py
@@ -0,0 +1,59 @@
+from random import choice
+
+from nonebot.matcher import Matcher
+from nonebot.params import ArgPlainText, CommandArg
+from nonebot.adapters.onebot.v11 import MessageEvent, Message
+from nonebot.adapters.onebot.v11.helpers import Cooldown
+
+from ATRI.utils.apscheduler import scheduler
+from .data_source import Kimo
+
+
+_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."])
+
+
+kimo = Kimo().on_message("文爱", "闲聊(文爱")
+
+
[email protected]([Cooldown(3, prompt=_chat_flmt_notice)])
+async def _chat(event: MessageEvent):
+ user_id = event.get_user_id()
+ msg = str(event.message)
+ repo = await Kimo().deal(msg, user_id)
+ try:
+ await kimo.finish(repo)
+ except Exception:
+ return
+
+
+my_name_is = Kimo().on_command("叫我", "更改kimo时的称呼", aliases={"我是"}, priority=1)
+
+
+@my_name_is.handle([Cooldown(3, prompt=_chat_flmt_notice)])
+async def _name(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("name", args)
+
+
+@my_name_is.got("name", "欧尼酱想让咱如何称呼呢!0w0")
+async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")):
+ user_id = event.get_user_id()
+ repo = choice(
+ [
+ f"好~w 那咱以后就称呼你为{new_name}!",
+ f"噢噢噢!原来你叫{new_name}阿~",
+ f"好欸!{new_name}ちゃん~~~",
+ "很不错的称呼呢w",
+ ]
+ )
+ Kimo().name_is(user_id, new_name)
+ await my_name_is.finish(repo)
+
+
[email protected]_job("interval", name="kimo词库检查更新", hours=3, misfire_grace_time=60) # type: ignore
+async def _check_kimo():
+ try:
+ await Kimo().update_data()
+ except BaseException:
+ pass
diff --git a/ATRI/plugins/kimo/data_source.py b/ATRI/plugins/kimo/data_source.py
new file mode 100644
index 0000000..6dd448a
--- /dev/null
+++ b/ATRI/plugins/kimo/data_source.py
@@ -0,0 +1,136 @@
+import os
+import json
+from pathlib import Path
+from jieba import posseg
+from random import choice, shuffle
+
+from ATRI.service import Service
+from ATRI.rule import to_bot, is_in_service
+from ATRI.log import logger as log
+from ATRI.utils import request
+from ATRI.exceptions import ReadFileError, WriteError
+
+
+CHAT_PATH = Path(".") / "data" / "database" / "kimo"
+os.makedirs(CHAT_PATH, exist_ok=True)
+KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json"
+
+
+class Kimo(Service):
+ def __init__(self):
+ Service.__init__(
+ self, "kimo", "好像有点涩?", rule=to_bot() & is_in_service("kimo"), priority=5
+ )
+
+ @staticmethod
+ async def _request(url: str) -> dict:
+ res = await request.get(url)
+ data = res.json()
+ return data
+
+ @classmethod
+ async def _generate_data(cls) -> None:
+ file_name = "kimo.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ log.warning("未检测到词库,生成中")
+ data = await cls._request(KIMO_URL)
+ try:
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+ log.info("生成完成")
+ except WriteError:
+ raise WriteError("Writing kimo words failed!")
+
+ @classmethod
+ async def _load_data(cls) -> dict:
+ file_name = "kimo.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ await cls._generate_data()
+
+ with open(path, "r", encoding="utf-8") as r:
+ data = json.loads(r.read())
+ return data
+
+ @classmethod
+ async def update_data(cls) -> None:
+ log.info("更新闲聊词库ing...")
+ file_name = "kimo.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ await cls._generate_data()
+
+ updata_data = await cls._request(KIMO_URL)
+ data = json.loads(path.read_bytes())
+ for i in updata_data:
+ if i not in data:
+ data[i] = updata_data[i]
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+ log.info("kimo词库更新完成")
+
+ @staticmethod
+ def name_is(user_id: str, new_name: str):
+ file_name = "users.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ data = {}
+
+ data = json.loads(path.read_bytes())
+ data[user_id] = new_name
+ try:
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+ except ReadFileError:
+ raise ReadFileError("Update user name failed!")
+
+ @staticmethod
+ def load_name(user_id: str) -> str:
+ file_name = "users.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ return "你"
+
+ data = json.loads(path.read_bytes())
+ try:
+ result = data[user_id]
+ except BaseException:
+ result = "你"
+ return result
+
+ @classmethod
+ async def deal(cls, msg: str, user_id: str) -> str:
+ keywords = posseg.lcut(msg)
+ shuffle(keywords)
+
+ data = await cls._load_data()
+
+ repo = str()
+ for i in keywords:
+ a = i.word
+ b = list(a)
+ try:
+ if b[0] == b[1]:
+ a = b[0]
+ except BaseException:
+ pass
+ if a in data:
+ repo = data.get(a, str())
+
+ if not repo:
+ temp_data = list(data)
+ shuffle(temp_data)
+ for i in temp_data:
+ if i in msg:
+ repo = data.get(i, str())
+
+ a = choice(repo) if type(repo) is list else repo
+ user_name = cls.load_name(user_id)
+ repo = a.replace("你", user_name)
+ return repo