summaryrefslogtreecommitdiff
path: root/ATRI/plugins/chat
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/chat')
-rw-r--r--ATRI/plugins/chat/__init__.py112
-rw-r--r--ATRI/plugins/chat/data_source.py139
2 files changed, 251 insertions, 0 deletions
diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/chat/__init__.py
new file mode 100644
index 0000000..4864be5
--- /dev/null
+++ b/ATRI/plugins/chat/__init__.py
@@ -0,0 +1,112 @@
+from random import choice
+
+from nonebot.typing import T_State
+from nonebot.adapters.cqhttp import Bot, MessageEvent
+
+from ATRI.utils import CoolqCodeChecker
+from ATRI.utils.limit import FreqLimiter
+from ATRI.utils.apscheduler import scheduler
+from .data_source import Chat
+
+
+_chat_flmt = FreqLimiter(3)
+_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."])
+
+
+chat = Chat().on_message("闲聊(文爱")
+
+async def _chat(bot: Bot, event: MessageEvent):
+ user_id = event.get_user_id()
+ if not _chat_flmt.check(user_id):
+ await chat.finish(_chat_flmt_notice)
+
+ msg = str(event.message)
+ repo = await Chat().deal(msg, user_id)
+ _chat_flmt.start_cd(user_id)
+ await chat.finish(repo)
+
+my_name_is = Chat().on_command("叫我", "更改闲聊(划掉 文爱)时的称呼", aliases={"我是"}, priority=1)
+
+@my_name_is.args_parser # type: ignore
+async def _get_name(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ quit_list = ["算了", "罢了"]
+ if msg in quit_list:
+ await my_name_is.finish("好吧...")
+ if not msg:
+ await my_name_is.reject("欧尼酱想让咱如何称呼你呢!0w0")
+ else:
+ state["name"] = msg
+
+@my_name_is.handle()
+async def _name(bot: Bot, event: MessageEvent, state: T_State):
+ user_id = event.get_user_id()
+ if not _chat_flmt.check(user_id):
+ await my_name_is.finish(_chat_flmt_notice)
+
+ msg = str(event.message).strip()
+ if msg:
+ state["name"] = msg
+
+@my_name_is.got("name")
+async def _deal_name(bot: Bot, event: MessageEvent, state: T_State):
+ user_id = event.get_user_id()
+ new_name = state["name"]
+ repo = choice([
+ f"好~w 那咱以后就称呼你为{new_name}!",
+ f"噢噢噢!原来你叫{new_name}阿~",
+ f"好欸!{new_name}ちゃん~~~",
+ "很不错的称呼呢w"
+ ])
+ Chat().name_is(user_id, new_name)
+ _chat_flmt.start_cd(user_id)
+ await my_name_is.finish(repo)
+
+say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1)
+
[email protected]_parser # type: ignore
+async def _get_say(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
+ quit_list = ["算了", "罢了"]
+ if msg in quit_list:
+ await say.finish("好吧...")
+ if not msg:
+ await say.reject("欧尼酱想让咱如何称呼你呢!0w0")
+ else:
+ state["say"] = msg
+
+async def _ready_say(bot: Bot, event: MessageEvent, state: T_State):
+ user_id = event.get_user_id()
+ if not _chat_flmt.check(user_id):
+ await say.finish(_chat_flmt_notice)
+
+ msg = str(event.message)
+ if msg:
+ state["say"] = msg
+
+async def _deal_say(bot: Bot, event: MessageEvent, state: T_State):
+ msg = state["say"]
+ check = CoolqCodeChecker(msg).check
+ if not check:
+ repo = choice([
+ "不要...",
+ "这个咱不想复读!",
+ "不可以",
+ "不好!"
+ ])
+ await say.finish(repo)
+
+ user_id = event.get_user_id()
+ _chat_flmt.start_cd(user_id)
+ await say.finish(msg)
+
+
[email protected]_job("interval", hours=3, misfire_grace_time=60)
+async def _check_kimo():
+ try:
+ await Chat().update_data()
+ except BaseException:
+ pass
diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/chat/data_source.py
new file mode 100644
index 0000000..1b0eec9
--- /dev/null
+++ b/ATRI/plugins/chat/data_source.py
@@ -0,0 +1,139 @@
+import os
+import json
+from pathlib import Path
+from jieba import posseg
+from random import choice, shuffle
+
+from ATRI.service import Service
+from ATRI.rule import to_bot, is_in_service
+from ATRI.log import logger as log
+from ATRI.utils import request
+from ATRI.exceptions import ReadFileError, WriteError
+
+
+__doc__ = """
+好像有点涩?(偏文爱,需at
+"""
+
+CHAT_PATH = Path(".") / "ATRI" / "data" / "database" / "chat"
+os.makedirs(CHAT_PATH, exist_ok=True)
+KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json"
+
+
+class Chat(Service):
+
+ def __init__(self):
+ Service.__init__(self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5)
+
+ @staticmethod
+ async def _request(url: str) -> dict:
+ res = await request.get(url)
+ data = await res.json()
+ return data
+
+ @classmethod
+ async def _generate_data(cls) -> None:
+ file_name = "kimo.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ log.warning("未检测到闲聊词库,生成中")
+ data = await cls._request(KIMO_URL)
+ try:
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+ log.info("生成完成")
+ except WriteError:
+ raise WriteError("Writing kimo words failed!")
+
+ @classmethod
+ async def _load_data(cls) -> dict:
+ file_name = "kimo.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ await cls._generate_data()
+
+ with open(path, "r", encoding="utf-8") as r:
+ data = json.loads(r.read())
+ return data
+
+ @classmethod
+ async def update_data(cls) -> None:
+ log.info("更新闲聊词库ing...")
+ file_name = "kimo.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ await cls._generate_data()
+
+ updata_data = await cls._request(KIMO_URL)
+ data = json.loads(path.read_bytes())
+ for i in updata_data:
+ if i not in data:
+ data[i] = updata_data[i]
+
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+ log.info("闲聊词库更新完成")
+
+ @staticmethod
+ def name_is(user_id: str, new_name: str):
+ file_name = "users.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ data = {}
+
+ data = json.loads(path.read_bytes())
+ data[user_id] = new_name
+ try:
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps(data, indent=4))
+ except ReadFileError:
+ raise ReadFileError("Update user name failed!")
+
+ @staticmethod
+ def load_name(user_id: str) -> str:
+ file_name = "users.json"
+ path = CHAT_PATH / file_name
+ if not path.is_file():
+ with open(path, "w", encoding="utf-8") as w:
+ w.write(json.dumps({}))
+ return "你"
+
+ data = json.loads(path.read_bytes())
+ try:
+ result = data[user_id]
+ except BaseException:
+ result = "你"
+ return result
+
+ @classmethod
+ async def deal(cls, msg: str, user_id: str) -> str:
+ keywords = posseg.lcut(msg)
+ shuffle(keywords)
+
+ data = await cls._load_data()
+
+ repo = str()
+ for i in keywords:
+ a = i.word
+ b = list(a)
+ try:
+ if b[0] == b[1]:
+ a = b[0]
+ except BaseException:
+ pass
+ if a in data:
+ repo = data.get(a, str())
+
+ if not repo:
+ temp_data = list(data)
+ shuffle(temp_data)
+ for i in temp_data:
+ if i in msg:
+ repo = data.get(i, str())
+
+ a = choice(repo) if type(repo) is list else repo
+ user_name = cls.load_name(user_id)
+ repo = a.replace("你", user_name)
+ return repo