diff options
-rw-r--r-- | ATRI/database/models.py | 40 | ||||
-rw-r--r-- | ATRI/exceptions.py | 14 | ||||
-rw-r--r-- | ATRI/plugins/thesaurus/__init__.py | 835 | ||||
-rw-r--r-- | ATRI/plugins/thesaurus/data_source.py | 192 | ||||
-rw-r--r-- | ATRI/plugins/thesaurus/db.py | 47 | ||||
-rw-r--r-- | ATRI/plugins/thesaurus/listener.py | 121 | ||||
-rw-r--r-- | README.md | 2 |
7 files changed, 1246 insertions, 5 deletions
diff --git a/ATRI/database/models.py b/ATRI/database/models.py index e268f01..d326151 100644 --- a/ATRI/database/models.py +++ b/ATRI/database/models.py @@ -10,6 +10,9 @@ class BilibiliSubscription(Model): up_nickname = fields.TextField(null=True) last_update = fields.DatetimeField(default=datetime.fromordinal(1)) + class Meta: + app = "bilibili" + class TwitterSubscription(Model): tid = fields.IntField() @@ -17,3 +20,40 @@ class TwitterSubscription(Model): name = fields.TextField(null=True) screen_name = fields.TextField(null=True) last_update = fields.DatetimeField(default=datetime.fromordinal(1)) + + class Meta: + app = "twitter" + + +class ThesaurusStoragor(Model): + _id = fields.TextField() + matcher = fields.TextField(null=True) + result = fields.JSONField(null=True) + need_at = fields.IntField(null=True) + m_type = fields.IntField(null=True) + group_id = fields.IntField(null=True) + operator = fields.TextField(null=True) + operator_id = fields.IntField(null=True) + update_time = fields.DatetimeField(null=True) + is_vote = fields.IntField(null=True) + vote_list = fields.JSONField(null=True) + + class Meta: + app = "ts" + + +class ThesaurusAuditList(Model): + _id = fields.TextField() + matcher = fields.TextField(null=True) + result = fields.JSONField(null=True) + need_at = fields.IntField(null=True) + m_type = fields.IntField(null=True) + group_id = fields.IntField(null=True) + operator = fields.TextField(null=True) + operator_id = fields.IntField(null=True) + update_time = fields.DatetimeField(null=True) + is_vote = fields.IntField(null=True) + vote_list = fields.JSONField(null=True) + + class Meta: + app = "tal" diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py index aebf9fc..db6265f 100644 --- a/ATRI/exceptions.py +++ b/ATRI/exceptions.py @@ -1,18 +1,18 @@ import time import json -import string from pathlib import Path -from random import sample from typing import Optional from traceback import format_exc from pydantic.main import BaseModel +from nonebot.matcher import Matcher from nonebot.adapters.onebot.v11 import ActionFailed from nonebot.adapters.onebot.v11 import Bot, PrivateMessageEvent, GroupMessageEvent from nonebot.message import run_postprocessor from .log import logger as log from .config import BotSelfConfig +from .utils import gen_random_str ERROR_DIR = Path(".") / "data" / "errors" @@ -27,7 +27,7 @@ class ErrorInfo(BaseModel): def _save_error(prompt: str, content: str) -> str: - track_id = "".join(sample(string.ascii_letters + string.digits, 8)) + track_id = gen_random_str(8) data = ErrorInfo( track_id=track_id, prompt=prompt, @@ -94,8 +94,14 @@ class TwitterDynamicError(BaseBotException): prompt = "Twitter动态订阅错误" +class ThesaurusError(BaseBotException): + prompt = "词库相关错误" + + @run_postprocessor -async def _track_error(exception: Optional[Exception], bot: Bot, event) -> None: +async def _track_error( + bot: Bot, event, matcher: Matcher, exception: Optional[Exception] +) -> None: if not exception: return diff --git a/ATRI/plugins/thesaurus/__init__.py b/ATRI/plugins/thesaurus/__init__.py new file mode 100644 index 0000000..ca0be3e --- /dev/null +++ b/ATRI/plugins/thesaurus/__init__.py @@ -0,0 +1,835 @@ +import pytz +from tabulate import tabulate + +from nonebot.matcher import Matcher +from nonebot.permission import SUPERUSER +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN +from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent + +from ATRI.utils import gen_random_str, MessageChecker + +from .data_source import ThesaurusManager + + +add_item = ThesaurusManager().cmd_as_group("add", "添加本群词条,需审核或投票") + + +@add_item.handle() +async def _get_normal_item(matcher: Matcher, args: Message = CommandArg()): + raw_msg = args.extract_plain_text() + msg_data = raw_msg.split(" ") + data = dict(enumerate(msg_data)) + if data.get(0): + matcher.set_arg("ts_normal_item_q", Message(msg_data[0])) + if data.get(1): + matcher.set_arg("ts_normal_item_a", Message(msg_data[1])) + if data.get(2): + matcher.set_arg("ts_normal_item_is_need_at", Message(msg_data[2])) + if data.get(3): + matcher.set_arg("ts_normal_item_t", Message(msg_data[3])) + + +@add_item.got("ts_normal_item_q", "有人问:") +@add_item.got("ts_normal_item_a", "我答: \n(支持多个回复,用','[小写]隔开)") +@add_item.got("ts_normal_item_is_need_at", "是否需要at: (y/n)") +async def _deal_noraml_is_need_at( + need_at: str = ArgPlainText("ts_normal_item_is_need_at"), +): + agree_list = ["y", "Y", "是", "同意", "赞成"] + disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] + if need_at not in agree_list and need_at not in disagree_list: + await add_item.reject("你的观点似乎并不相关呢...请重新输入: (y/n)") + + +@add_item.got("ts_normal_item_t", "问答匹配模式: \n(全匹配、模糊匹配、正则)") +async def _add_normal_item( + bot: Bot, + event: GroupMessageEvent, + item_q: str = ArgPlainText("ts_normal_item_q"), + item_a: str = ArgPlainText("ts_normal_item_a"), + _need_at: str = ArgPlainText("ts_normal_item_is_need_at"), + item_t: str = ArgPlainText("ts_normal_item_t"), +): + type_list = ["全匹配", "模糊匹配", "正则"] + if item_t not in type_list: + await add_item.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.") + + q_checker = MessageChecker(item_q).check_cq_code + a_checker = MessageChecker(item_a).check_cq_code + if not q_checker or not a_checker: + await add_item.finish("请不要尝试注入!") + + agree_list = ["y", "Y", "是", "同意", "赞成"] + need_at = 1 if _need_at in agree_list else 0 + + group_id = event.group_id + operator_id = event.user_id + operator_info = await bot.get_group_member_info( + group_id=group_id, user_id=operator_id + ) + operator = operator_info.get("card", "unknown") + item_id = gen_random_str(6) + ans = item_a.split(",") + ts = ThesaurusManager() + + result = await ts.add_item( + item_id, + False, + item_q, + ans, + need_at, + item_t, + group_id, + operator, + operator_id, + 1, + list(), + ) + await add_item.finish(result) + + +add_item_as_group_admin = ThesaurusManager().cmd_as_group( + "add.g", "添加本群词条,仅限管理,无需审核", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@add_item_as_group_admin.handle() +async def _get_group_item(matcher: Matcher, args: Message = CommandArg()): + raw_msg = args.extract_plain_text() + msg_data = raw_msg.split(" ") + data = dict(enumerate(msg_data)) + if data.get(0): + matcher.set_arg("ts_group_item_q", Message(msg_data[0])) + if data.get(1): + matcher.set_arg("ts_group_item_a", Message(msg_data[1])) + if data.get(2): + matcher.set_arg("ts_group_item_is_need_at", Message(msg_data[2])) + if data.get(3): + matcher.set_arg("ts_group_item_t", Message(msg_data[3])) + + +@add_item_as_group_admin.got("ts_group_item_q", "有人问:") +@add_item_as_group_admin.got("ts_group_item_a", "我答: \n(支持多个回复,用','[小写]隔开)") +@add_item_as_group_admin.got("ts_group_item_is_need_at", "是否需要at: (y/n)") +async def _deal_group_is_need_at( + need_at: str = ArgPlainText("ts_group_item_is_need_at"), +): + agree_list = ["y", "Y", "是", "同意", "赞成"] + disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] + if need_at not in agree_list and need_at not in disagree_list: + await add_item_as_group_admin.reject("你的观点似乎并不相关呢...请重新输入: (y/n)") + + +@add_item_as_group_admin.got("ts_group_item_t", "问答匹配模式: \n(全匹配、模糊匹配、正则)") +async def _add_group_item( + bot: Bot, + event: GroupMessageEvent, + item_q: str = ArgPlainText("ts_group_item_q"), + item_a: str = ArgPlainText("ts_group_item_a"), + _need_at: str = ArgPlainText("ts_group_item_is_need_at"), + item_t: str = ArgPlainText("ts_group_item_t"), +): + type_list = ["全匹配", "模糊匹配", "正则"] + if item_t not in type_list: + await add_item_as_group_admin.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.") + + q_checker = MessageChecker(item_q).check_cq_code + a_checker = MessageChecker(item_a).check_cq_code + if not q_checker or not a_checker: + await add_item_as_group_admin.finish("请不要尝试注入!") + + agree_list = ["y", "Y", "是", "同意", "赞成"] + need_at = 1 if _need_at in agree_list else 0 + + group_id = event.group_id + operator_id = event.user_id + operator_info = await bot.get_group_member_info( + group_id=group_id, user_id=operator_id + ) + operator = operator_info.get("card", "unknown") + item_id = gen_random_str(6) + ans = item_a.split(",") + ts = ThesaurusManager() + + result = await ts.add_item( + item_id, + True, + item_q, + ans, + need_at, + item_t, + group_id, + operator, + operator_id, + 0, + list(), + ) + await add_item_as_group_admin.finish(result) + + +add_item_for_global = ThesaurusManager().cmd_as_group( + "add.glo", "添加全局问答", permission=SUPERUSER +) + + +@add_item_for_global.handle() +async def _get_global_item(matcher: Matcher, args: Message = CommandArg()): + raw_msg = args.extract_plain_text() + msg_data = raw_msg.split(" ") + data = dict(enumerate(msg_data)) + + if data.get(0): + matcher.set_arg("ts_global_item_q", Message(msg_data[0])) + if data.get(1): + matcher.set_arg("ts_global_item_a", Message(msg_data[1])) + if data.get(2): + matcher.set_arg("ts_global_is_need_at", Message(msg_data[2])) + if data.get(3): + matcher.set_arg("ts_global_item_type", Message(msg_data[3])) + + +@add_item_for_global.got("ts_global_item_q", "有人问:") +@add_item_for_global.got("ts_global_item_a", "我答: \n(支持多个回复,用','[小写]隔开)") +@add_item_for_global.got("ts_global_item_is_need_at", "是否需要at: (y/n)") +async def _deal_global_is_need_at( + event: MessageEvent, need_at: str = ArgPlainText("ts_global_item_is_need_at") +): + agree_list = ["y", "Y", "是", "同意", "赞成"] + disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] + if need_at not in agree_list and need_at not in disagree_list: + await add_item_for_global.reject("你的观点似乎并不相关呢...请重新输入: (y/n)") + + +@add_item_for_global.got("ts_global_item_type", "问答匹配模式: \n(全匹配、模糊匹配、正则)") +async def _add_global_item( + event: MessageEvent, + item_q: str = ArgPlainText("ts_global_item_q"), + item_a: str = ArgPlainText("ts_global_item_a"), + _need_at: str = ArgPlainText("ts_global_item_is_need_at"), + item_t: str = ArgPlainText("ts_global_item_type"), +): + type_list = ["全匹配", "模糊匹配", "正则"] + if item_t not in type_list: + await add_item_for_global.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.") + + agree_list = ["y", "Y", "是", "同意", "赞成"] + need_at = 1 if _need_at in agree_list else 0 + + operator = "SUPERUSER" + opeartor_id = event.user_id + item_id = gen_random_str(6) + ans = item_a.split(",") + tm = ThesaurusManager() + + result = await tm.add_item( + item_id, True, item_q, ans, need_at, item_t, 0, operator, opeartor_id, 0, list() + ) + await add_item_for_global.finish(result) + + +vote = ThesaurusManager().cmd_as_group("v", "对本群内审核中的词条进行投票") + + +async def _get_vote_info(matcher: Matcher, args: Message = CommandArg()): + raw_msg = args.extract_plain_text() + msg_data = raw_msg.split(" ") + data = dict(enumerate(msg_data)) + if data.get(0): + matcher.set_arg("ts_vote_id", Message(msg_data[0])) + if data.get(1): + matcher.set_arg("ts_vote_attitude", Message(msg_data[1])) + + [email protected]("ts_vote_id", "要投票的词条id是:") +async def _get_item_id( + event: GroupMessageEvent, item_id: str = ArgPlainText("ts_vote_id") +): + tm = ThesaurusManager() + user_id = event.user_id + group_id = event.group_id + + query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id}) + if not query_result: + await vote.finish("未找到此id相关信息...请检查是否输入正确(") + + item_info = query_result[0] + if user_id in item_info.vote_list: + await vote.finish("你已经参与过啦!") + + result = f"""你即将要投票的词条信息: + 词条ID: {item_info._id} + 有人问: {item_info.matcher} + 我答: {"、".join(map(str, item_info.result))} + 提交人: {item_info.operator}@{item_info.operator_id} + 当前赞成: {"、".join(map(str, item_info.vote_list))} + """.strip() + await vote.send(Message(result)) + + [email protected]("ts_vote_attitude", "你的选择是?(y/n)") +async def _get_voter_attitude( + event: GroupMessageEvent, + item_id: str = ArgPlainText("ts_vote_id"), + attitude: str = ArgPlainText("ts_vote_attitude"), +): + agree_list = ["y", "Y", "是", "同意", "赞成"] + disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] + if attitude not in agree_list and attitude not in disagree_list: + await vote.reject("你的观点似乎不相关呢...请重新输入: (y/n)") + + tm = ThesaurusManager() + group_id = event.group_id + user_id = event.user_id + + if attitude in agree_list: + await tm.vote(item_id, group_id, user_id) + await vote.finish(f"已赞成此词条~ ID: {item_id}") + else: + await vote.finish("好吧...") + + +del_item = ThesaurusManager().cmd_as_group( + "del", "删除本群词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@del_item.handle() +async def _get_del_normal_item_info(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("ts_del_normal_item_id", args) + + +@del_item.got("ts_del_item_id", "要删除词条的id是?") +async def _deal_del_normal_item( + event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_item_id") +): + tm = ThesaurusManager() + group_id = event.group_id + + result = await tm.del_item(item_id, group_id, True) + await del_item.finish(result) + + +del_global_item = ThesaurusManager().cmd_as_group( + "del.g", "删除全局词条", permission=SUPERUSER +) + + +@del_global_item.handle() +async def _get_del_global_item_info(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("ts_del_global_item_id", args) + + +@del_global_item.got("ts_del_global_item_id", "要删除词条的id是?") +async def _deal_del_global_item( + event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_global_item_id") +): + tm = ThesaurusManager() + + result = await tm.del_item(item_id, 0, True) + await del_global_item.finish(result) + + +del_vote_item = ThesaurusManager().cmd_as_group( + "del.v", "删除本群处于投票中的词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@del_vote_item.handle() +async def _get_deal_vote_item_info(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("ts_del_vote_item_id", args) + + +@del_vote_item.got("ts_del_vote_item_id", "要删除词条的id是?") +async def _deal_del_vote_item( + event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_vote_item_id") +): + tm = ThesaurusManager() + group_id = event.group_id + + result = await tm.del_item(item_id, group_id, False) + await del_vote_item.finish(result) + + +_LIST_SHOW_DATA: dict = dict() + + +list_item = ThesaurusManager().cmd_as_group("list", "查看本群词条") + + +@list_item.handle() +async def _get_normal_item_list(event: GroupMessageEvent): + tm = ThesaurusManager() + group_id = event.group_id + + query_result = await tm.get_item_list({"group_id": group_id}, True) + if not query_result: + await list_item.finish("本群还没有词条呢...") + + items = list() + for i in query_result[:10]: + item_matcher = i.matcher + item_type = i.m_type + if item_type == 0: + m_type = "全匹配" + elif item_type == 1: + m_type = "模糊匹配" + else: + m_type = "正则" + + items.append([i._id, item_matcher, m_type]) + + output = ( + "本群已添加以下词条:\n" + + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") + + "\n(一页仅展示10个)" + ) + + if len(query_result) > 10: + await list_item.send(output) + else: + await list_item.finish(output) + + +@list_item.got("item_normal_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~") +async def _get_normal_item_more( + event: GroupMessageEvent, is_next: str = ArgPlainText("item_normal_is_next") +): + user_id = event.user_id + group_id = event.group_id + + judge_list = ["下一页", "n", "next"] + if is_next not in judge_list: + try: + del _LIST_SHOW_DATA[group_id][user_id] + except Exception: + pass + await list_item.finish("结束查看~") + + if group_id in _LIST_SHOW_DATA: + _LIST_SHOW_DATA[group_id][user_id] += 10 + else: + _LIST_SHOW_DATA[group_id] = {user_id: 10} + + tm = ThesaurusManager() + items = list() + show_item = _LIST_SHOW_DATA[group_id][user_id] + query_result = await tm.get_item_list({"group_id": group_id}, True) + for i in query_result[:show_item]: + item_matcher = i.matcher + item_type = i.m_type + if item_type == 0: + m_type = "全匹配" + elif item_type == 1: + m_type = "模糊匹配" + else: + m_type = "正则" + + items.append([i._id, item_matcher, m_type]) + + output = ( + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") + + "\n('下一页'或'n'以继续查看,任意回复以退出)" + ) + await list_item.reject(output) + + +list_global_item = ThesaurusManager().cmd_as_group("list.g", "查看全局词条") + + +@list_global_item.handle() +async def _get_global_item_list(event: MessageEvent): + tm = ThesaurusManager() + + query_result = await tm.get_item_list({"group_id": 0}, True) + if not query_result: + await list_global_item.finish("还没有给咱添加全局词条呢...") + + items = list() + for i in query_result[:10]: + item_matcher = i.matcher + item_type = i.m_type + if item_type == 0: + m_type = "全匹配" + elif item_type == 1: + m_type = "模糊匹配" + else: + m_type = "正则" + + items.append([i._id, item_matcher, m_type]) + + output = ( + "咱已装载以下词条:\n" + + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") + + "\n(一页仅展示10个)" + ) + + if len(query_result) > 10: + await list_global_item.send(output) + else: + await list_global_item.finish(output) + + +@list_global_item.got("item_global_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~") +async def _get_global_item_more( + event: MessageEvent, is_next: str = ArgPlainText("item_global_is_next") +): + user_id = event.user_id + + judge_list = ["下一页", "n", "next"] + if is_next not in judge_list: + try: + del _LIST_SHOW_DATA[user_id] + except Exception: + pass + await list_global_item.finish("结束查看~") + + if user_id in _LIST_SHOW_DATA: + _LIST_SHOW_DATA[user_id] += 10 + else: + _LIST_SHOW_DATA[user_id] = 10 + + tm = ThesaurusManager() + items = list() + show_item = _LIST_SHOW_DATA[user_id] + query_result = await tm.get_item_list({"group_id": 0}, True) + for i in query_result[:show_item]: + item_matcher = i.matcher + item_type = i.m_type + if item_type == 0: + m_type = "全匹配" + elif item_type == 1: + m_type = "模糊匹配" + else: + m_type = "正则" + + items.append([i._id, item_matcher, m_type]) + + output = ( + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") + + "\n('下一页'或'n'以继续查看,任意回复以退出)" + ) + await list_global_item.reject(output) + + +list_vote_item = ThesaurusManager().cmd_as_group("list.v", "查看本群待投票词条") + + +@list_vote_item.handle() +async def _get_vote_item_list(event: GroupMessageEvent): + tm = ThesaurusManager() + group_id = event.group_id + + query_result = await tm.get_item_list({"group_id": group_id}) + if not query_result: + await list_vote_item.finish("本群暂未添加待审核词条...") + + items = list() + for i in query_result[:10]: + item_matcher = i.matcher + item_type = i.m_type + if item_type == 0: + m_type = "全匹配" + elif item_type == 1: + m_type = "模糊匹配" + else: + m_type = "正则" + + items.append([i._id, item_matcher, m_type]) + + output = ( + "当前待审词条概况如下:\n" + + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") + + "\n(一页仅展示10个)" + ) + + if len(query_result) > 10: + await list_vote_item.send(output) + else: + await list_vote_item.finish(output) + + +@list_vote_item.got("item_vote_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~") +async def _get_vote_item_more( + event: GroupMessageEvent, is_next: str = ArgPlainText("item_vote_is_next") +): + user_id = event.user_id + group_id = event.group_id + + judge_list = ["下一页", "n", "next"] + if is_next not in judge_list: + try: + del _LIST_SHOW_DATA[group_id][user_id] + except Exception: + pass + await list_vote_item.finish("结束查看~") + + if group_id in _LIST_SHOW_DATA: + _LIST_SHOW_DATA[group_id][user_id] += 10 + else: + _LIST_SHOW_DATA[group_id] = {user_id: 10} + + tm = ThesaurusManager() + items = list() + show_item = _LIST_SHOW_DATA[group_id][user_id] + query_result = await tm.get_item_list({"group_id": group_id}) + for i in query_result[:show_item]: + item_matcher = i.matcher + item_type = i.m_type + if item_type == 0: + m_type = "全匹配" + elif item_type == 1: + m_type = "模糊匹配" + else: + m_type = "正则" + + items.append([i._id, item_matcher, m_type]) + + output = ( + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") + + "\n('下一页'或'n'以继续查看,任意回复以退出)" + ) + await list_vote_item.reject(output) + + +audit_item = ThesaurusManager().cmd_as_group( + "audit", "审核本群处于投票中的词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@audit_item.handle() +async def _get_group_item_info(matcher: Matcher, args: Message = CommandArg()): + raw_msg = args.extract_plain_text() + msg_data = raw_msg.split(" ") + data = dict(enumerate(msg_data)) + if data.get(0): + matcher.set_arg("ts_audit_vote_id", Message(msg_data[0])) + if data.get(1): + matcher.set_arg("ts_audit_vote_attitude", Message(msg_data[1])) + + +@audit_item.got("ts_audit_vote_id", "要审核的词条id是:") +async def _get_audit_item_id( + event: GroupMessageEvent, item_id: str = ArgPlainText("ts_group_vote_id") +): + tm = ThesaurusManager() + group_id = event.group_id + + query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id}) + if not query_result: + await audit_item.finish("未找到此id相关信息...请检查是否输入正确(") + + item_info = query_result[0] + result = f"""你即将要审核的词条信息: + 词条ID: {item_info._id} + 有人问: {item_info.matcher} + 我答: {"、".join(map(str, item_info.result))} + 提交人: {item_info.operator}@{item_info.operator_id} + 当前赞成: {"、".join(map(str, item_info.vote_list))} + """.strip() + await audit_item.send(Message(result)) + + +@audit_item.got("ts_audit_vote_attitude", "你的选择是?(y/n)") +async def _get_audit_attitude( + event: GroupMessageEvent, + item_id: str = ArgPlainText("ts_audit_vote_id"), + attitude: str = ArgPlainText("ts_audit_vote_attitude"), +): + agree_list = ["y", "Y", "是", "同意", "赞成"] + disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] + if attitude not in agree_list and attitude not in disagree_list: + await audit_item.reject("你的观点似乎不相关呢...请重新输入: (y/n)") + + tm = ThesaurusManager() + group_id = event.group_id + + query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id}) + if not query_result: + await audit_item.finish("未找到相关内容...请检查输入...") + item_info = query_result[0] + + if attitude in agree_list: + await tm.add_item( + item_id, + True, + item_info.matcher, + item_info.result, + item_info.need_at, + item_info.m_type, + group_id, + item_info.operator, + item_info.operator_id, + 0, + list(), + ) + await tm.del_item(item_id, group_id, False) + else: + await tm.del_item(item_id, group_id, False) + + await audit_item.finish("完成~!") + + +_ITEM_SHOW_FORMAT = """该词条信息如下: +词条ID: {_id} +有人问: {matcher} +我答: {ans} +判断方式: {m_type} +提交人: {operator}@{operator_id} +更新时间: {update_time} +是否为投票选出: {is_vote} +投票赞成: {vote_list} +""" + + +get_normal_item_info = ThesaurusManager().cmd_as_group("i", "查看本群的词条详情") + + +@get_normal_item_info.handle() +async def _info_normal_get_item_id(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("info_normal_item_id", args) + + +@get_normal_item_info.got("info_normal_item_id", "需要查看的词条ID:") +async def _info_normal_get_item_info( + event: GroupMessageEvent, _id: str = ArgPlainText("info_normal_item_id") +): + tm = ThesaurusManager() + group_id = event.group_id + + query_result = await tm.get_item_list({"_id": _id, "group_id": group_id}, True) + if not query_result: + await get_normal_item_info.finish("未找到此ID相关信息...") + + item_info = query_result[0] + + item_type = item_info.m_type + if item_type == 1: + m_type = "模糊匹配" + elif item_type == 2: + m_type = "正则" + else: + m_type = "全匹配" + + result = _ITEM_SHOW_FORMAT.format( + _id=_id, + matcher=item_info.matcher, + ans="、".join(map(str, item_info.result)), + m_type=m_type, + operator=item_info.operator, + operator_id=item_info.operator_id, + update_time=item_info.update_time.replace( + tzinfo=pytz.timezone("Asia/Shanghai") + ), + is_vote="是" if item_info.is_vote else "否", + vote_list=item_info.vote_list, + ) + await get_normal_item_info.finish(result) + + +get_global_item_info = ThesaurusManager().cmd_as_group("i.g", "查看全局的词条详情") + + +@get_global_item_info.handle() +async def _info_global_get_item_id(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("info_global_item_id", args) + + +@get_global_item_info.got("info_global_item_id", "需要查看的词条ID:") +async def _info_global_get_item_info(_id: str = ArgPlainText("info_global_item_id")): + tm = ThesaurusManager() + + query_result = await tm.get_item_list({"_id": _id, "group_id": 0}, True) + if not query_result: + await get_global_item_info.finish("未找到此ID相关信息...") + + item_info = query_result[0] + + item_type = item_info.m_type + if item_type == 1: + m_type = "模糊匹配" + elif item_type == 2: + m_type = "正则" + else: + m_type = "全匹配" + + result = _ITEM_SHOW_FORMAT.format( + _id=_id, + matcher=item_info.matcher, + ans="、".join(map(str, item_info.result)), + m_type=m_type, + operator=item_info.operator, + operator_id=item_info.operator_id, + update_time=item_info.update_time.replace( + tzinfo=pytz.timezone("Asia/Shanghai") + ), + is_vote="是" if item_info.is_vote else "否", + vote_list=item_info.vote_list, + ) + await get_global_item_info.finish(result) + + +get_vote_item_info = ThesaurusManager().cmd_as_group("i.v", "查看本群的待审核/投票的词条详情") + + +@get_vote_item_info.handle() +async def _info_vote_get_item_id(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("info_vote_item_id", args) + + +@get_vote_item_info.got("info_vote_item_id", "需要查看的词条ID:") +async def _info_vote_get_item_info( + event: GroupMessageEvent, _id: str = ArgPlainText("info_vote_item_id") +): + tm = ThesaurusManager() + group_id = event.group_id + + query_result = await tm.get_item_list({"_id": _id, "group_id": group_id}) + if not query_result: + await get_vote_item_info.finish("未找到此ID相关信息...") + + item_info = query_result[0] + + item_type = item_info.m_type + if item_type == 1: + m_type = "模糊匹配" + elif item_type == 2: + m_type = "正则" + else: + m_type = "全匹配" + + result = _ITEM_SHOW_FORMAT.format( + _id=_id, + matcher=item_info.matcher, + ans="、".join(map(str, item_info.result)), + m_type=m_type, + operator=item_info.operator, + operator_id=item_info.operator_id, + update_time=item_info.update_time.replace( + tzinfo=pytz.timezone("Asia/Shanghai") + ), + is_vote="是" if item_info.is_vote else "否", + vote_list=item_info.vote_list, + ) + await get_vote_item_info.finish(result) + + +from ATRI import driver + +from .listener import init_listener + + +driver().on_startup(init_listener) diff --git a/ATRI/plugins/thesaurus/data_source.py b/ATRI/plugins/thesaurus/data_source.py new file mode 100644 index 0000000..30f2c5b --- /dev/null +++ b/ATRI/plugins/thesaurus/data_source.py @@ -0,0 +1,192 @@ +import pytz +from datetime import datetime + +from nonebot.permission import SUPERUSER +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.exceptions import ThesaurusError + +from .db import DBForTS, DBForTAL +from .db import ThesaurusStoragor + + +class ThesaurusManager(Service): + def __init__(self): + Service.__init__( + self, + "词库管理", + "支持模糊匹配、全匹配、正则的自定义回复~\n支持分群、全局管理,支持群内投票添加", + rule=is_in_service("词库管理"), + permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN, + main_cmd="/ts", + ) + + async def __add_item(self, _id: str, group_id: int, is_main: bool = False): + if is_main: + try: + async with DBForTS() as db: + await db.add_item(_id, group_id) + except Exception: + raise ThesaurusError(f"添加词库(ts)数据失败 目标词id: {_id}") + else: + try: + async with DBForTAL() as db: + await db.add_item(_id, group_id) + except Exception: + raise ThesaurusError(f"添加词库(tal)数据失败 目标词id: {_id}") + + async def update_item( + self, _id: str, group_id: int, update_map: dict, is_main: bool = False + ): + if is_main: + try: + async with DBForTS() as db: + await db.update_item(_id, group_id, update_map) + except Exception: + raise ThesaurusError(f"更新词库(ts)数据失败 目标词id: {_id}") + else: + try: + async with DBForTAL() as db: + await db.update_item(_id, group_id, update_map) + except Exception: + raise ThesaurusError(f"更新词库(tal)数据失败 目标词id: {_id}") + + async def __del_item(self, _id: str, group_id: int, is_main: bool = False): + if is_main: + try: + async with DBForTS() as db: + await db.del_item({"_id": _id, "group_id": group_id}) + except Exception: + raise ThesaurusError(f"删除词库(ts)数据失败 目标词id: {_id}") + else: + try: + async with DBForTAL() as db: + await db.del_item({"_id": _id, "group_id": group_id}) + except Exception: + raise ThesaurusError(f"删除词库(tal)数据失败 目标词id: {_id}") + + async def get_item_list(self, query_map: dict, is_main: bool = False) -> list: + if is_main: + try: + async with DBForTS() as db: + return await db.get_item_list(query_map) + except Exception: + raise ThesaurusError("获取词库(ts)列表数据失败") + else: + try: + async with DBForTAL() as db: + return await db.get_item_list(query_map) + except Exception: + raise ThesaurusError("获取词库(tal)列表数据失败") + + async def get_all_items(self, is_main: bool = False) -> list: + if is_main: + try: + async with DBForTS() as db: + return await db.get_all_items() + except Exception: + raise ThesaurusError("获取全部词库(ts)列表数据失败") + else: + try: + async with DBForTAL() as db: + return await db.get_all_items() + except Exception: + raise ThesaurusError("获取全部词库(tal)列表数据失败") + + async def add_item( + self, + _id: str, + is_main: bool, + q: str, + a: list, + need_at: int, + t: str, + group_id: int, + operator: str, + operator_id: int, + is_vote: int, + vote_list: list, + ) -> str: + query_result = await self.get_item_list( + {"matcher": q, "group_id": group_id}, is_main + ) + if query_result: + item_info = query_result[0] + return f"""{"(需审核/投票)" if not is_main else str()}该词条已存在!! ID: {item_info._id}""" + + if t == "全匹配": + m_type = 0 + elif t == "模糊匹配": + m_type = 1 + else: + m_type = 2 + + item_meta = { + "matcher": q, + "result": a, + "need_at": need_at, + "m_type": m_type, + "group_id": group_id, + "operator": operator, + "operator_id": operator_id, + "update_time": datetime.now(pytz.timezone("Asia/Shanghai")), + "is_vote": is_vote, + "vote_list": vote_list, + } + + await self.__add_item(_id, group_id, is_main) + await self.update_item( + _id, + group_id, + item_meta, + is_main, + ) + return f"""{"(需审核/投票)" if not is_main else str()}成功加上新词条 ID: {_id}""" + + async def del_item(self, _id: str, group_id: int, is_main: bool): + query_result = await self.get_item_list( + {"_id": _id, "group_id": group_id}, is_main + ) + if not query_result: + return f"目标id: {_id} 没有记录呢..." + + await self.__del_item(_id, group_id, is_main) + return f"成功删除目标id: {_id} 问答信息" + + async def vote(self, _id: str, group_id: int, voter: int): + raw_item_info = await self.get_item_list({"_id": _id, "group_id": group_id}) + item_info = raw_item_info[0] + vote_list: list = item_info.vote_list + vote_list.append(voter) + + await self.update_item( + _id, + group_id, + { + "vote_list": vote_list, + "update_time": datetime.now(pytz.timezone("Asia/Shanghai")), + }, + ) + + +class ThesaurusListener(Service): + def __init__(self): + Service.__init__(self, "词库监听", "词库监听器", rule=is_in_service("词库监听")) + + async def get_item_by_id(self, _id: str) -> ThesaurusStoragor: + try: + async with DBForTS() as db: + data = await db.get_item_list({"_id": _id}) + except Exception: + raise ThesaurusError(f"获取词库(ts)数据失败 词条ID: {_id}") + + return data[0] + + async def get_item_list(self, group_id: int): + try: + async with DBForTS() as db: + return await db.get_item_list({"group_id": group_id}) + except Exception: + raise ThesaurusError(f"获取词库(ts)数据失败 目标群号: {group_id}") diff --git a/ATRI/plugins/thesaurus/db.py b/ATRI/plugins/thesaurus/db.py new file mode 100644 index 0000000..b5394fc --- /dev/null +++ b/ATRI/plugins/thesaurus/db.py @@ -0,0 +1,47 @@ +from ATRI.database import ThesaurusStoragor, ThesaurusAuditList + + +class DBForTS: + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + async def add_item(self, _id: str, group_id: int): + await ThesaurusStoragor.create(_id=_id, group_id=group_id) + + async def update_item(self, _id: str, group_id: int, update_map: dict): + await ThesaurusStoragor.filter(_id=_id, group_id=group_id).update(**update_map) + + async def del_item(self, query_map: dict): + await ThesaurusStoragor.filter(**query_map).delete() + + async def get_item_list(self, query_map: dict) -> list: + return await ThesaurusStoragor.filter(**query_map) + + async def get_all_items(self) -> list: + return await ThesaurusStoragor.all() + + +class DBForTAL: + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + async def add_item(self, _id: str, group_id: int): + await ThesaurusAuditList.create(_id=_id, group_id=group_id) + + async def update_item(self, _id: str, group_id: int, update_map: dict): + await ThesaurusAuditList.filter(_id=_id, group_id=group_id).update(**update_map) + + async def del_item(self, query_map: dict): + await ThesaurusAuditList.filter(**query_map).delete() + + async def get_item_list(self, query_map: dict) -> list: + return await ThesaurusAuditList.filter(**query_map) + + async def get_all_items(self) -> list: + return await ThesaurusAuditList.all() diff --git a/ATRI/plugins/thesaurus/listener.py b/ATRI/plugins/thesaurus/listener.py new file mode 100644 index 0000000..d3c3887 --- /dev/null +++ b/ATRI/plugins/thesaurus/listener.py @@ -0,0 +1,121 @@ +import re +from random import choice, shuffle + +from nonebot import get_bot +from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent +from nonebot.adapters.onebot.v11.helpers import Cooldown + +from apscheduler.triggers.base import BaseTrigger +from apscheduler.triggers.combining import AndTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from ATRI.log import logger as log +from ATRI.utils.apscheduler import scheduler + +from .data_source import ThesaurusManager, ThesaurusListener, ThesaurusStoragor + + +class ThesaurusLinstenerIsEnabledChecker(BaseTrigger): + def get_next_fire_time(self, previous_fire_time, now): + tm = ThesaurusManager() + conf = tm.load_service("词库管理") + if conf.get("enabled"): + return now + + +async def _thesaurus_vote_listener(): + tm = ThesaurusManager() + try: + all_items = await tm.get_all_items(False) + except Exception: + return + + for i in all_items: + data: ThesaurusStoragor = i + item_vote_list = data.vote_list + if len(item_vote_list) == 10: + t = data.m_type + + if t == 0: + m_type = "全匹配" + elif t == 1: + m_type = "模糊匹配" + else: + m_type = "正则" + + result = await tm.add_item( + data._id, + True, + data.matcher, + list(data.result), + data.need_at, + m_type, + data.group_id, + data.operator, + data.operator_id, + 1, + list(data.vote_list), + ) + log.debug(result) + + bot = get_bot() + await bot.send_group_msg(group_id=data.group_id, message=result) + + +def init_listener(): + scheduler.add_job( + _thesaurus_vote_listener, + AndTrigger([IntervalTrigger(seconds=10), ThesaurusLinstenerIsEnabledChecker()]), + max_instances=3, # type: ignore + misfire_grace_time=20, # type: ignore + ) + + +main_listener = ThesaurusListener().on_message( + "词库监听器", "监听所有消息判断是否满足触发词条条件", priority=4, block=False +) + + +@main_listener.handle([Cooldown(3)]) +async def _tl_listener(event: MessageEvent): + tl = ThesaurusListener() + msg = event.get_message().extract_plain_text() + + group_id = 0 + if isinstance(event, GroupMessageEvent): + group_id = event.group_id + + query_result = await tl.get_item_list(group_id) + shuffle(query_result) + if query_result: + for item in query_result: + item_info: ThesaurusStoragor = item + + if item_info.m_type == 1: + if item_info.matcher in msg: + if item_info.need_at: + if event.is_tome(): + await main_listener.finish(choice(item_info.result)) + else: + return + else: + await main_listener.finish(choice(item_info.result)) + elif item_info.m_type == 2: + patt = item_info.matcher + if re.findall(patt, msg): + if item_info.need_at: + if event.is_tome(): + await main_listener.finish(choice(item_info.result)) + else: + return + else: + await main_listener.finish(choice(item_info.result)) + else: + if item_info.matcher == msg: + if item_info.need_at: + if event.is_tome(): + await main_listener.finish(choice(item_info.result)) + else: + return + else: + await main_listener.finish(choice(item_info.result)) @@ -65,7 +65,7 @@ > 被勾上的即已大致完成,但仍需优化 - [x] 网页控制台 - [ ] Twitter助手 -- [ ] 自定义词库(多种方式匹配) +- [x] 自定义词库(多种方式匹配) - [ ] RSS订阅 - [ ] 冷重启 - [ ] 进裙验证(问题可自定义) |