diff options
Diffstat (limited to 'ATRI')
| -rw-r--r-- | ATRI/database/models.py | 40 | ||||
| -rw-r--r-- | ATRI/exceptions.py | 14 | ||||
| -rw-r--r-- | ATRI/plugins/thesaurus/__init__.py | 835 | ||||
| -rw-r--r-- | ATRI/plugins/thesaurus/data_source.py | 192 | ||||
| -rw-r--r-- | ATRI/plugins/thesaurus/db.py | 47 | ||||
| -rw-r--r-- | ATRI/plugins/thesaurus/listener.py | 121 | 
6 files changed, 1245 insertions, 4 deletions
| diff --git a/ATRI/database/models.py b/ATRI/database/models.py index e268f01..d326151 100644 --- a/ATRI/database/models.py +++ b/ATRI/database/models.py @@ -10,6 +10,9 @@ class BilibiliSubscription(Model):      up_nickname = fields.TextField(null=True)      last_update = fields.DatetimeField(default=datetime.fromordinal(1)) +    class Meta: +        app = "bilibili" +  class TwitterSubscription(Model):      tid = fields.IntField() @@ -17,3 +20,40 @@ class TwitterSubscription(Model):      name = fields.TextField(null=True)      screen_name = fields.TextField(null=True)      last_update = fields.DatetimeField(default=datetime.fromordinal(1)) + +    class Meta: +        app = "twitter" + + +class ThesaurusStoragor(Model): +    _id = fields.TextField() +    matcher = fields.TextField(null=True) +    result = fields.JSONField(null=True) +    need_at = fields.IntField(null=True) +    m_type = fields.IntField(null=True) +    group_id = fields.IntField(null=True) +    operator = fields.TextField(null=True) +    operator_id = fields.IntField(null=True) +    update_time = fields.DatetimeField(null=True) +    is_vote = fields.IntField(null=True) +    vote_list = fields.JSONField(null=True) + +    class Meta: +        app = "ts" + + +class ThesaurusAuditList(Model): +    _id = fields.TextField() +    matcher = fields.TextField(null=True) +    result = fields.JSONField(null=True) +    need_at = fields.IntField(null=True) +    m_type = fields.IntField(null=True) +    group_id = fields.IntField(null=True) +    operator = fields.TextField(null=True) +    operator_id = fields.IntField(null=True) +    update_time = fields.DatetimeField(null=True) +    is_vote = fields.IntField(null=True) +    vote_list = fields.JSONField(null=True) + +    class Meta: +        app = "tal" diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py index aebf9fc..db6265f 100644 --- a/ATRI/exceptions.py +++ b/ATRI/exceptions.py @@ -1,18 +1,18 @@  import time  import json -import string  from pathlib import Path -from random import sample  from typing import Optional  from traceback import format_exc  from pydantic.main import BaseModel +from nonebot.matcher import Matcher  from nonebot.adapters.onebot.v11 import ActionFailed  from nonebot.adapters.onebot.v11 import Bot, PrivateMessageEvent, GroupMessageEvent  from nonebot.message import run_postprocessor  from .log import logger as log  from .config import BotSelfConfig +from .utils import gen_random_str  ERROR_DIR = Path(".") / "data" / "errors" @@ -27,7 +27,7 @@ class ErrorInfo(BaseModel):  def _save_error(prompt: str, content: str) -> str: -    track_id = "".join(sample(string.ascii_letters + string.digits, 8)) +    track_id = gen_random_str(8)      data = ErrorInfo(          track_id=track_id,          prompt=prompt, @@ -94,8 +94,14 @@ class TwitterDynamicError(BaseBotException):      prompt = "Twitter动态订阅错误" +class ThesaurusError(BaseBotException): +    prompt = "词库相关错误" + +  @run_postprocessor -async def _track_error(exception: Optional[Exception], bot: Bot, event) -> None: +async def _track_error( +    bot: Bot, event, matcher: Matcher, exception: Optional[Exception] +) -> None:      if not exception:          return diff --git a/ATRI/plugins/thesaurus/__init__.py b/ATRI/plugins/thesaurus/__init__.py new file mode 100644 index 0000000..ca0be3e --- /dev/null +++ b/ATRI/plugins/thesaurus/__init__.py @@ -0,0 +1,835 @@ +import pytz +from tabulate import tabulate + +from nonebot.matcher import Matcher +from nonebot.permission import SUPERUSER +from nonebot.params import ArgPlainText, CommandArg +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN +from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent + +from ATRI.utils import gen_random_str, MessageChecker + +from .data_source import ThesaurusManager + + +add_item = ThesaurusManager().cmd_as_group("add", "添加本群词条,需审核或投票") + + +@add_item.handle() +async def _get_normal_item(matcher: Matcher, args: Message = CommandArg()): +    raw_msg = args.extract_plain_text() +    msg_data = raw_msg.split(" ") +    data = dict(enumerate(msg_data)) +    if data.get(0): +        matcher.set_arg("ts_normal_item_q", Message(msg_data[0])) +    if data.get(1): +        matcher.set_arg("ts_normal_item_a", Message(msg_data[1])) +    if data.get(2): +        matcher.set_arg("ts_normal_item_is_need_at", Message(msg_data[2])) +    if data.get(3): +        matcher.set_arg("ts_normal_item_t", Message(msg_data[3])) + + +@add_item.got("ts_normal_item_q", "有人问:") +@add_item.got("ts_normal_item_a", "我答: \n(支持多个回复,用','[小写]隔开)") +@add_item.got("ts_normal_item_is_need_at", "是否需要at: (y/n)") +async def _deal_noraml_is_need_at( +    need_at: str = ArgPlainText("ts_normal_item_is_need_at"), +): +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] +    if need_at not in agree_list and need_at not in disagree_list: +        await add_item.reject("你的观点似乎并不相关呢...请重新输入: (y/n)") + + +@add_item.got("ts_normal_item_t", "问答匹配模式: \n(全匹配、模糊匹配、正则)") +async def _add_normal_item( +    bot: Bot, +    event: GroupMessageEvent, +    item_q: str = ArgPlainText("ts_normal_item_q"), +    item_a: str = ArgPlainText("ts_normal_item_a"), +    _need_at: str = ArgPlainText("ts_normal_item_is_need_at"), +    item_t: str = ArgPlainText("ts_normal_item_t"), +): +    type_list = ["全匹配", "模糊匹配", "正则"] +    if item_t not in type_list: +        await add_item.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.") + +    q_checker = MessageChecker(item_q).check_cq_code +    a_checker = MessageChecker(item_a).check_cq_code +    if not q_checker or not a_checker: +        await add_item.finish("请不要尝试注入!") + +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    need_at = 1 if _need_at in agree_list else 0 + +    group_id = event.group_id +    operator_id = event.user_id +    operator_info = await bot.get_group_member_info( +        group_id=group_id, user_id=operator_id +    ) +    operator = operator_info.get("card", "unknown") +    item_id = gen_random_str(6) +    ans = item_a.split(",") +    ts = ThesaurusManager() + +    result = await ts.add_item( +        item_id, +        False, +        item_q, +        ans, +        need_at, +        item_t, +        group_id, +        operator, +        operator_id, +        1, +        list(), +    ) +    await add_item.finish(result) + + +add_item_as_group_admin = ThesaurusManager().cmd_as_group( +    "add.g", "添加本群词条,仅限管理,无需审核", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@add_item_as_group_admin.handle() +async def _get_group_item(matcher: Matcher, args: Message = CommandArg()): +    raw_msg = args.extract_plain_text() +    msg_data = raw_msg.split(" ") +    data = dict(enumerate(msg_data)) +    if data.get(0): +        matcher.set_arg("ts_group_item_q", Message(msg_data[0])) +    if data.get(1): +        matcher.set_arg("ts_group_item_a", Message(msg_data[1])) +    if data.get(2): +        matcher.set_arg("ts_group_item_is_need_at", Message(msg_data[2])) +    if data.get(3): +        matcher.set_arg("ts_group_item_t", Message(msg_data[3])) + + +@add_item_as_group_admin.got("ts_group_item_q", "有人问:") +@add_item_as_group_admin.got("ts_group_item_a", "我答: \n(支持多个回复,用','[小写]隔开)") +@add_item_as_group_admin.got("ts_group_item_is_need_at", "是否需要at: (y/n)") +async def _deal_group_is_need_at( +    need_at: str = ArgPlainText("ts_group_item_is_need_at"), +): +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] +    if need_at not in agree_list and need_at not in disagree_list: +        await add_item_as_group_admin.reject("你的观点似乎并不相关呢...请重新输入: (y/n)") + + +@add_item_as_group_admin.got("ts_group_item_t", "问答匹配模式: \n(全匹配、模糊匹配、正则)") +async def _add_group_item( +    bot: Bot, +    event: GroupMessageEvent, +    item_q: str = ArgPlainText("ts_group_item_q"), +    item_a: str = ArgPlainText("ts_group_item_a"), +    _need_at: str = ArgPlainText("ts_group_item_is_need_at"), +    item_t: str = ArgPlainText("ts_group_item_t"), +): +    type_list = ["全匹配", "模糊匹配", "正则"] +    if item_t not in type_list: +        await add_item_as_group_admin.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.") + +    q_checker = MessageChecker(item_q).check_cq_code +    a_checker = MessageChecker(item_a).check_cq_code +    if not q_checker or not a_checker: +        await add_item_as_group_admin.finish("请不要尝试注入!") + +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    need_at = 1 if _need_at in agree_list else 0 + +    group_id = event.group_id +    operator_id = event.user_id +    operator_info = await bot.get_group_member_info( +        group_id=group_id, user_id=operator_id +    ) +    operator = operator_info.get("card", "unknown") +    item_id = gen_random_str(6) +    ans = item_a.split(",") +    ts = ThesaurusManager() + +    result = await ts.add_item( +        item_id, +        True, +        item_q, +        ans, +        need_at, +        item_t, +        group_id, +        operator, +        operator_id, +        0, +        list(), +    ) +    await add_item_as_group_admin.finish(result) + + +add_item_for_global = ThesaurusManager().cmd_as_group( +    "add.glo", "添加全局问答", permission=SUPERUSER +) + + +@add_item_for_global.handle() +async def _get_global_item(matcher: Matcher, args: Message = CommandArg()): +    raw_msg = args.extract_plain_text() +    msg_data = raw_msg.split(" ") +    data = dict(enumerate(msg_data)) + +    if data.get(0): +        matcher.set_arg("ts_global_item_q", Message(msg_data[0])) +    if data.get(1): +        matcher.set_arg("ts_global_item_a", Message(msg_data[1])) +    if data.get(2): +        matcher.set_arg("ts_global_is_need_at", Message(msg_data[2])) +    if data.get(3): +        matcher.set_arg("ts_global_item_type", Message(msg_data[3])) + + +@add_item_for_global.got("ts_global_item_q", "有人问:") +@add_item_for_global.got("ts_global_item_a", "我答: \n(支持多个回复,用','[小写]隔开)") +@add_item_for_global.got("ts_global_item_is_need_at", "是否需要at: (y/n)") +async def _deal_global_is_need_at( +    event: MessageEvent, need_at: str = ArgPlainText("ts_global_item_is_need_at") +): +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] +    if need_at not in agree_list and need_at not in disagree_list: +        await add_item_for_global.reject("你的观点似乎并不相关呢...请重新输入: (y/n)") + + +@add_item_for_global.got("ts_global_item_type", "问答匹配模式: \n(全匹配、模糊匹配、正则)") +async def _add_global_item( +    event: MessageEvent, +    item_q: str = ArgPlainText("ts_global_item_q"), +    item_a: str = ArgPlainText("ts_global_item_a"), +    _need_at: str = ArgPlainText("ts_global_item_is_need_at"), +    item_t: str = ArgPlainText("ts_global_item_type"), +): +    type_list = ["全匹配", "模糊匹配", "正则"] +    if item_t not in type_list: +        await add_item_for_global.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.") + +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    need_at = 1 if _need_at in agree_list else 0 + +    operator = "SUPERUSER" +    opeartor_id = event.user_id +    item_id = gen_random_str(6) +    ans = item_a.split(",") +    tm = ThesaurusManager() + +    result = await tm.add_item( +        item_id, True, item_q, ans, need_at, item_t, 0, operator, opeartor_id, 0, list() +    ) +    await add_item_for_global.finish(result) + + +vote = ThesaurusManager().cmd_as_group("v", "对本群内审核中的词条进行投票") + + +@vote.handle() +async def _get_vote_info(matcher: Matcher, args: Message = CommandArg()): +    raw_msg = args.extract_plain_text() +    msg_data = raw_msg.split(" ") +    data = dict(enumerate(msg_data)) +    if data.get(0): +        matcher.set_arg("ts_vote_id", Message(msg_data[0])) +    if data.get(1): +        matcher.set_arg("ts_vote_attitude", Message(msg_data[1])) + + +@vote.got("ts_vote_id", "要投票的词条id是:") +async def _get_item_id( +    event: GroupMessageEvent, item_id: str = ArgPlainText("ts_vote_id") +): +    tm = ThesaurusManager() +    user_id = event.user_id +    group_id = event.group_id + +    query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id}) +    if not query_result: +        await vote.finish("未找到此id相关信息...请检查是否输入正确(") + +    item_info = query_result[0] +    if user_id in item_info.vote_list: +        await vote.finish("你已经参与过啦!") + +    result = f"""你即将要投票的词条信息: +    词条ID: {item_info._id} +    有人问: {item_info.matcher} +    我答: {"、".join(map(str, item_info.result))} +    提交人: {item_info.operator}@{item_info.operator_id} +    当前赞成: {"、".join(map(str, item_info.vote_list))} +    """.strip() +    await vote.send(Message(result)) + + +@vote.got("ts_vote_attitude", "你的选择是?(y/n)") +async def _get_voter_attitude( +    event: GroupMessageEvent, +    item_id: str = ArgPlainText("ts_vote_id"), +    attitude: str = ArgPlainText("ts_vote_attitude"), +): +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] +    if attitude not in agree_list and attitude not in disagree_list: +        await vote.reject("你的观点似乎不相关呢...请重新输入: (y/n)") + +    tm = ThesaurusManager() +    group_id = event.group_id +    user_id = event.user_id + +    if attitude in agree_list: +        await tm.vote(item_id, group_id, user_id) +        await vote.finish(f"已赞成此词条~ ID: {item_id}") +    else: +        await vote.finish("好吧...") + + +del_item = ThesaurusManager().cmd_as_group( +    "del", "删除本群词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@del_item.handle() +async def _get_del_normal_item_info(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("ts_del_normal_item_id", args) + + +@del_item.got("ts_del_item_id", "要删除词条的id是?") +async def _deal_del_normal_item( +    event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_item_id") +): +    tm = ThesaurusManager() +    group_id = event.group_id + +    result = await tm.del_item(item_id, group_id, True) +    await del_item.finish(result) + + +del_global_item = ThesaurusManager().cmd_as_group( +    "del.g", "删除全局词条", permission=SUPERUSER +) + + +@del_global_item.handle() +async def _get_del_global_item_info(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("ts_del_global_item_id", args) + + +@del_global_item.got("ts_del_global_item_id", "要删除词条的id是?") +async def _deal_del_global_item( +    event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_global_item_id") +): +    tm = ThesaurusManager() + +    result = await tm.del_item(item_id, 0, True) +    await del_global_item.finish(result) + + +del_vote_item = ThesaurusManager().cmd_as_group( +    "del.v", "删除本群处于投票中的词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@del_vote_item.handle() +async def _get_deal_vote_item_info(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("ts_del_vote_item_id", args) + + +@del_vote_item.got("ts_del_vote_item_id", "要删除词条的id是?") +async def _deal_del_vote_item( +    event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_vote_item_id") +): +    tm = ThesaurusManager() +    group_id = event.group_id + +    result = await tm.del_item(item_id, group_id, False) +    await del_vote_item.finish(result) + + +_LIST_SHOW_DATA: dict = dict() + + +list_item = ThesaurusManager().cmd_as_group("list", "查看本群词条") + + +@list_item.handle() +async def _get_normal_item_list(event: GroupMessageEvent): +    tm = ThesaurusManager() +    group_id = event.group_id + +    query_result = await tm.get_item_list({"group_id": group_id}, True) +    if not query_result: +        await list_item.finish("本群还没有词条呢...") + +    items = list() +    for i in query_result[:10]: +        item_matcher = i.matcher +        item_type = i.m_type +        if item_type == 0: +            m_type = "全匹配" +        elif item_type == 1: +            m_type = "模糊匹配" +        else: +            m_type = "正则" + +        items.append([i._id, item_matcher, m_type]) + +    output = ( +        "本群已添加以下词条:\n" +        + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") +        + "\n(一页仅展示10个)" +    ) + +    if len(query_result) > 10: +        await list_item.send(output) +    else: +        await list_item.finish(output) + + +@list_item.got("item_normal_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~") +async def _get_normal_item_more( +    event: GroupMessageEvent, is_next: str = ArgPlainText("item_normal_is_next") +): +    user_id = event.user_id +    group_id = event.group_id + +    judge_list = ["下一页", "n", "next"] +    if is_next not in judge_list: +        try: +            del _LIST_SHOW_DATA[group_id][user_id] +        except Exception: +            pass +        await list_item.finish("结束查看~") + +    if group_id in _LIST_SHOW_DATA: +        _LIST_SHOW_DATA[group_id][user_id] += 10 +    else: +        _LIST_SHOW_DATA[group_id] = {user_id: 10} + +    tm = ThesaurusManager() +    items = list() +    show_item = _LIST_SHOW_DATA[group_id][user_id] +    query_result = await tm.get_item_list({"group_id": group_id}, True) +    for i in query_result[:show_item]: +        item_matcher = i.matcher +        item_type = i.m_type +        if item_type == 0: +            m_type = "全匹配" +        elif item_type == 1: +            m_type = "模糊匹配" +        else: +            m_type = "正则" + +        items.append([i._id, item_matcher, m_type]) + +    output = ( +        tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") +        + "\n('下一页'或'n'以继续查看,任意回复以退出)" +    ) +    await list_item.reject(output) + + +list_global_item = ThesaurusManager().cmd_as_group("list.g", "查看全局词条") + + +@list_global_item.handle() +async def _get_global_item_list(event: MessageEvent): +    tm = ThesaurusManager() + +    query_result = await tm.get_item_list({"group_id": 0}, True) +    if not query_result: +        await list_global_item.finish("还没有给咱添加全局词条呢...") + +    items = list() +    for i in query_result[:10]: +        item_matcher = i.matcher +        item_type = i.m_type +        if item_type == 0: +            m_type = "全匹配" +        elif item_type == 1: +            m_type = "模糊匹配" +        else: +            m_type = "正则" + +        items.append([i._id, item_matcher, m_type]) + +    output = ( +        "咱已装载以下词条:\n" +        + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") +        + "\n(一页仅展示10个)" +    ) + +    if len(query_result) > 10: +        await list_global_item.send(output) +    else: +        await list_global_item.finish(output) + + +@list_global_item.got("item_global_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~") +async def _get_global_item_more( +    event: MessageEvent, is_next: str = ArgPlainText("item_global_is_next") +): +    user_id = event.user_id + +    judge_list = ["下一页", "n", "next"] +    if is_next not in judge_list: +        try: +            del _LIST_SHOW_DATA[user_id] +        except Exception: +            pass +        await list_global_item.finish("结束查看~") + +    if user_id in _LIST_SHOW_DATA: +        _LIST_SHOW_DATA[user_id] += 10 +    else: +        _LIST_SHOW_DATA[user_id] = 10 + +    tm = ThesaurusManager() +    items = list() +    show_item = _LIST_SHOW_DATA[user_id] +    query_result = await tm.get_item_list({"group_id": 0}, True) +    for i in query_result[:show_item]: +        item_matcher = i.matcher +        item_type = i.m_type +        if item_type == 0: +            m_type = "全匹配" +        elif item_type == 1: +            m_type = "模糊匹配" +        else: +            m_type = "正则" + +        items.append([i._id, item_matcher, m_type]) + +    output = ( +        tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") +        + "\n('下一页'或'n'以继续查看,任意回复以退出)" +    ) +    await list_global_item.reject(output) + + +list_vote_item = ThesaurusManager().cmd_as_group("list.v", "查看本群待投票词条") + + +@list_vote_item.handle() +async def _get_vote_item_list(event: GroupMessageEvent): +    tm = ThesaurusManager() +    group_id = event.group_id + +    query_result = await tm.get_item_list({"group_id": group_id}) +    if not query_result: +        await list_vote_item.finish("本群暂未添加待审核词条...") + +    items = list() +    for i in query_result[:10]: +        item_matcher = i.matcher +        item_type = i.m_type +        if item_type == 0: +            m_type = "全匹配" +        elif item_type == 1: +            m_type = "模糊匹配" +        else: +            m_type = "正则" + +        items.append([i._id, item_matcher, m_type]) + +    output = ( +        "当前待审词条概况如下:\n" +        + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") +        + "\n(一页仅展示10个)" +    ) + +    if len(query_result) > 10: +        await list_vote_item.send(output) +    else: +        await list_vote_item.finish(output) + + +@list_vote_item.got("item_vote_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~") +async def _get_vote_item_more( +    event: GroupMessageEvent, is_next: str = ArgPlainText("item_vote_is_next") +): +    user_id = event.user_id +    group_id = event.group_id + +    judge_list = ["下一页", "n", "next"] +    if is_next not in judge_list: +        try: +            del _LIST_SHOW_DATA[group_id][user_id] +        except Exception: +            pass +        await list_vote_item.finish("结束查看~") + +    if group_id in _LIST_SHOW_DATA: +        _LIST_SHOW_DATA[group_id][user_id] += 10 +    else: +        _LIST_SHOW_DATA[group_id] = {user_id: 10} + +    tm = ThesaurusManager() +    items = list() +    show_item = _LIST_SHOW_DATA[group_id][user_id] +    query_result = await tm.get_item_list({"group_id": group_id}) +    for i in query_result[:show_item]: +        item_matcher = i.matcher +        item_type = i.m_type +        if item_type == 0: +            m_type = "全匹配" +        elif item_type == 1: +            m_type = "模糊匹配" +        else: +            m_type = "正则" + +        items.append([i._id, item_matcher, m_type]) + +    output = ( +        tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain") +        + "\n('下一页'或'n'以继续查看,任意回复以退出)" +    ) +    await list_vote_item.reject(output) + + +audit_item = ThesaurusManager().cmd_as_group( +    "audit", "审核本群处于投票中的词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN +) + + +@audit_item.handle() +async def _get_group_item_info(matcher: Matcher, args: Message = CommandArg()): +    raw_msg = args.extract_plain_text() +    msg_data = raw_msg.split(" ") +    data = dict(enumerate(msg_data)) +    if data.get(0): +        matcher.set_arg("ts_audit_vote_id", Message(msg_data[0])) +    if data.get(1): +        matcher.set_arg("ts_audit_vote_attitude", Message(msg_data[1])) + + +@audit_item.got("ts_audit_vote_id", "要审核的词条id是:") +async def _get_audit_item_id( +    event: GroupMessageEvent, item_id: str = ArgPlainText("ts_group_vote_id") +): +    tm = ThesaurusManager() +    group_id = event.group_id + +    query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id}) +    if not query_result: +        await audit_item.finish("未找到此id相关信息...请检查是否输入正确(") + +    item_info = query_result[0] +    result = f"""你即将要审核的词条信息: +    词条ID: {item_info._id} +    有人问: {item_info.matcher} +    我答: {"、".join(map(str, item_info.result))} +    提交人: {item_info.operator}@{item_info.operator_id} +    当前赞成: {"、".join(map(str, item_info.vote_list))} +    """.strip() +    await audit_item.send(Message(result)) + + +@audit_item.got("ts_audit_vote_attitude", "你的选择是?(y/n)") +async def _get_audit_attitude( +    event: GroupMessageEvent, +    item_id: str = ArgPlainText("ts_audit_vote_id"), +    attitude: str = ArgPlainText("ts_audit_vote_attitude"), +): +    agree_list = ["y", "Y", "是", "同意", "赞成"] +    disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"] +    if attitude not in agree_list and attitude not in disagree_list: +        await audit_item.reject("你的观点似乎不相关呢...请重新输入: (y/n)") + +    tm = ThesaurusManager() +    group_id = event.group_id + +    query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id}) +    if not query_result: +        await audit_item.finish("未找到相关内容...请检查输入...") +    item_info = query_result[0] + +    if attitude in agree_list: +        await tm.add_item( +            item_id, +            True, +            item_info.matcher, +            item_info.result, +            item_info.need_at, +            item_info.m_type, +            group_id, +            item_info.operator, +            item_info.operator_id, +            0, +            list(), +        ) +        await tm.del_item(item_id, group_id, False) +    else: +        await tm.del_item(item_id, group_id, False) + +    await audit_item.finish("完成~!") + + +_ITEM_SHOW_FORMAT = """该词条信息如下: +词条ID: {_id} +有人问: {matcher} +我答: {ans} +判断方式: {m_type} +提交人: {operator}@{operator_id} +更新时间: {update_time} +是否为投票选出: {is_vote} +投票赞成: {vote_list} +""" + + +get_normal_item_info = ThesaurusManager().cmd_as_group("i", "查看本群的词条详情") + + +@get_normal_item_info.handle() +async def _info_normal_get_item_id(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("info_normal_item_id", args) + + +@get_normal_item_info.got("info_normal_item_id", "需要查看的词条ID:") +async def _info_normal_get_item_info( +    event: GroupMessageEvent, _id: str = ArgPlainText("info_normal_item_id") +): +    tm = ThesaurusManager() +    group_id = event.group_id + +    query_result = await tm.get_item_list({"_id": _id, "group_id": group_id}, True) +    if not query_result: +        await get_normal_item_info.finish("未找到此ID相关信息...") + +    item_info = query_result[0] + +    item_type = item_info.m_type +    if item_type == 1: +        m_type = "模糊匹配" +    elif item_type == 2: +        m_type = "正则" +    else: +        m_type = "全匹配" + +    result = _ITEM_SHOW_FORMAT.format( +        _id=_id, +        matcher=item_info.matcher, +        ans="、".join(map(str, item_info.result)), +        m_type=m_type, +        operator=item_info.operator, +        operator_id=item_info.operator_id, +        update_time=item_info.update_time.replace( +            tzinfo=pytz.timezone("Asia/Shanghai") +        ), +        is_vote="是" if item_info.is_vote else "否", +        vote_list=item_info.vote_list, +    ) +    await get_normal_item_info.finish(result) + + +get_global_item_info = ThesaurusManager().cmd_as_group("i.g", "查看全局的词条详情") + + +@get_global_item_info.handle() +async def _info_global_get_item_id(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("info_global_item_id", args) + + +@get_global_item_info.got("info_global_item_id", "需要查看的词条ID:") +async def _info_global_get_item_info(_id: str = ArgPlainText("info_global_item_id")): +    tm = ThesaurusManager() + +    query_result = await tm.get_item_list({"_id": _id, "group_id": 0}, True) +    if not query_result: +        await get_global_item_info.finish("未找到此ID相关信息...") + +    item_info = query_result[0] + +    item_type = item_info.m_type +    if item_type == 1: +        m_type = "模糊匹配" +    elif item_type == 2: +        m_type = "正则" +    else: +        m_type = "全匹配" + +    result = _ITEM_SHOW_FORMAT.format( +        _id=_id, +        matcher=item_info.matcher, +        ans="、".join(map(str, item_info.result)), +        m_type=m_type, +        operator=item_info.operator, +        operator_id=item_info.operator_id, +        update_time=item_info.update_time.replace( +            tzinfo=pytz.timezone("Asia/Shanghai") +        ), +        is_vote="是" if item_info.is_vote else "否", +        vote_list=item_info.vote_list, +    ) +    await get_global_item_info.finish(result) + + +get_vote_item_info = ThesaurusManager().cmd_as_group("i.v", "查看本群的待审核/投票的词条详情") + + +@get_vote_item_info.handle() +async def _info_vote_get_item_id(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("info_vote_item_id", args) + + +@get_vote_item_info.got("info_vote_item_id", "需要查看的词条ID:") +async def _info_vote_get_item_info( +    event: GroupMessageEvent, _id: str = ArgPlainText("info_vote_item_id") +): +    tm = ThesaurusManager() +    group_id = event.group_id + +    query_result = await tm.get_item_list({"_id": _id, "group_id": group_id}) +    if not query_result: +        await get_vote_item_info.finish("未找到此ID相关信息...") + +    item_info = query_result[0] + +    item_type = item_info.m_type +    if item_type == 1: +        m_type = "模糊匹配" +    elif item_type == 2: +        m_type = "正则" +    else: +        m_type = "全匹配" + +    result = _ITEM_SHOW_FORMAT.format( +        _id=_id, +        matcher=item_info.matcher, +        ans="、".join(map(str, item_info.result)), +        m_type=m_type, +        operator=item_info.operator, +        operator_id=item_info.operator_id, +        update_time=item_info.update_time.replace( +            tzinfo=pytz.timezone("Asia/Shanghai") +        ), +        is_vote="是" if item_info.is_vote else "否", +        vote_list=item_info.vote_list, +    ) +    await get_vote_item_info.finish(result) + + +from ATRI import driver + +from .listener import init_listener + + +driver().on_startup(init_listener) diff --git a/ATRI/plugins/thesaurus/data_source.py b/ATRI/plugins/thesaurus/data_source.py new file mode 100644 index 0000000..30f2c5b --- /dev/null +++ b/ATRI/plugins/thesaurus/data_source.py @@ -0,0 +1,192 @@ +import pytz +from datetime import datetime + +from nonebot.permission import SUPERUSER +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.exceptions import ThesaurusError + +from .db import DBForTS, DBForTAL +from .db import ThesaurusStoragor + + +class ThesaurusManager(Service): +    def __init__(self): +        Service.__init__( +            self, +            "词库管理", +            "支持模糊匹配、全匹配、正则的自定义回复~\n支持分群、全局管理,支持群内投票添加", +            rule=is_in_service("词库管理"), +            permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN, +            main_cmd="/ts", +        ) + +    async def __add_item(self, _id: str, group_id: int, is_main: bool = False): +        if is_main: +            try: +                async with DBForTS() as db: +                    await db.add_item(_id, group_id) +            except Exception: +                raise ThesaurusError(f"添加词库(ts)数据失败 目标词id: {_id}") +        else: +            try: +                async with DBForTAL() as db: +                    await db.add_item(_id, group_id) +            except Exception: +                raise ThesaurusError(f"添加词库(tal)数据失败 目标词id: {_id}") + +    async def update_item( +        self, _id: str, group_id: int, update_map: dict, is_main: bool = False +    ): +        if is_main: +            try: +                async with DBForTS() as db: +                    await db.update_item(_id, group_id, update_map) +            except Exception: +                raise ThesaurusError(f"更新词库(ts)数据失败 目标词id: {_id}") +        else: +            try: +                async with DBForTAL() as db: +                    await db.update_item(_id, group_id, update_map) +            except Exception: +                raise ThesaurusError(f"更新词库(tal)数据失败 目标词id: {_id}") + +    async def __del_item(self, _id: str, group_id: int, is_main: bool = False): +        if is_main: +            try: +                async with DBForTS() as db: +                    await db.del_item({"_id": _id, "group_id": group_id}) +            except Exception: +                raise ThesaurusError(f"删除词库(ts)数据失败 目标词id: {_id}") +        else: +            try: +                async with DBForTAL() as db: +                    await db.del_item({"_id": _id, "group_id": group_id}) +            except Exception: +                raise ThesaurusError(f"删除词库(tal)数据失败 目标词id: {_id}") + +    async def get_item_list(self, query_map: dict, is_main: bool = False) -> list: +        if is_main: +            try: +                async with DBForTS() as db: +                    return await db.get_item_list(query_map) +            except Exception: +                raise ThesaurusError("获取词库(ts)列表数据失败") +        else: +            try: +                async with DBForTAL() as db: +                    return await db.get_item_list(query_map) +            except Exception: +                raise ThesaurusError("获取词库(tal)列表数据失败") + +    async def get_all_items(self, is_main: bool = False) -> list: +        if is_main: +            try: +                async with DBForTS() as db: +                    return await db.get_all_items() +            except Exception: +                raise ThesaurusError("获取全部词库(ts)列表数据失败") +        else: +            try: +                async with DBForTAL() as db: +                    return await db.get_all_items() +            except Exception: +                raise ThesaurusError("获取全部词库(tal)列表数据失败") + +    async def add_item( +        self, +        _id: str, +        is_main: bool, +        q: str, +        a: list, +        need_at: int, +        t: str, +        group_id: int, +        operator: str, +        operator_id: int, +        is_vote: int, +        vote_list: list, +    ) -> str: +        query_result = await self.get_item_list( +            {"matcher": q, "group_id": group_id}, is_main +        ) +        if query_result: +            item_info = query_result[0] +            return f"""{"(需审核/投票)" if not is_main else str()}该词条已存在!! ID: {item_info._id}""" + +        if t == "全匹配": +            m_type = 0 +        elif t == "模糊匹配": +            m_type = 1 +        else: +            m_type = 2 + +        item_meta = { +            "matcher": q, +            "result": a, +            "need_at": need_at, +            "m_type": m_type, +            "group_id": group_id, +            "operator": operator, +            "operator_id": operator_id, +            "update_time": datetime.now(pytz.timezone("Asia/Shanghai")), +            "is_vote": is_vote, +            "vote_list": vote_list, +        } + +        await self.__add_item(_id, group_id, is_main) +        await self.update_item( +            _id, +            group_id, +            item_meta, +            is_main, +        ) +        return f"""{"(需审核/投票)" if not is_main else str()}成功加上新词条 ID: {_id}""" + +    async def del_item(self, _id: str, group_id: int, is_main: bool): +        query_result = await self.get_item_list( +            {"_id": _id, "group_id": group_id}, is_main +        ) +        if not query_result: +            return f"目标id: {_id} 没有记录呢..." + +        await self.__del_item(_id, group_id, is_main) +        return f"成功删除目标id: {_id} 问答信息" + +    async def vote(self, _id: str, group_id: int, voter: int): +        raw_item_info = await self.get_item_list({"_id": _id, "group_id": group_id}) +        item_info = raw_item_info[0] +        vote_list: list = item_info.vote_list +        vote_list.append(voter) + +        await self.update_item( +            _id, +            group_id, +            { +                "vote_list": vote_list, +                "update_time": datetime.now(pytz.timezone("Asia/Shanghai")), +            }, +        ) + + +class ThesaurusListener(Service): +    def __init__(self): +        Service.__init__(self, "词库监听", "词库监听器", rule=is_in_service("词库监听")) + +    async def get_item_by_id(self, _id: str) -> ThesaurusStoragor: +        try: +            async with DBForTS() as db: +                data = await db.get_item_list({"_id": _id}) +        except Exception: +            raise ThesaurusError(f"获取词库(ts)数据失败 词条ID: {_id}") + +        return data[0] + +    async def get_item_list(self, group_id: int): +        try: +            async with DBForTS() as db: +                return await db.get_item_list({"group_id": group_id}) +        except Exception: +            raise ThesaurusError(f"获取词库(ts)数据失败 目标群号: {group_id}") diff --git a/ATRI/plugins/thesaurus/db.py b/ATRI/plugins/thesaurus/db.py new file mode 100644 index 0000000..b5394fc --- /dev/null +++ b/ATRI/plugins/thesaurus/db.py @@ -0,0 +1,47 @@ +from ATRI.database import ThesaurusStoragor, ThesaurusAuditList + + +class DBForTS: +    async def __aenter__(self): +        return self + +    async def __aexit__(self, exc_type, exc_val, exc_tb): +        pass + +    async def add_item(self, _id: str, group_id: int): +        await ThesaurusStoragor.create(_id=_id, group_id=group_id) + +    async def update_item(self, _id: str, group_id: int, update_map: dict): +        await ThesaurusStoragor.filter(_id=_id, group_id=group_id).update(**update_map) + +    async def del_item(self, query_map: dict): +        await ThesaurusStoragor.filter(**query_map).delete() + +    async def get_item_list(self, query_map: dict) -> list: +        return await ThesaurusStoragor.filter(**query_map) + +    async def get_all_items(self) -> list: +        return await ThesaurusStoragor.all() + + +class DBForTAL: +    async def __aenter__(self): +        return self + +    async def __aexit__(self, exc_type, exc_val, exc_tb): +        pass + +    async def add_item(self, _id: str, group_id: int): +        await ThesaurusAuditList.create(_id=_id, group_id=group_id) + +    async def update_item(self, _id: str, group_id: int, update_map: dict): +        await ThesaurusAuditList.filter(_id=_id, group_id=group_id).update(**update_map) + +    async def del_item(self, query_map: dict): +        await ThesaurusAuditList.filter(**query_map).delete() + +    async def get_item_list(self, query_map: dict) -> list: +        return await ThesaurusAuditList.filter(**query_map) + +    async def get_all_items(self) -> list: +        return await ThesaurusAuditList.all() diff --git a/ATRI/plugins/thesaurus/listener.py b/ATRI/plugins/thesaurus/listener.py new file mode 100644 index 0000000..d3c3887 --- /dev/null +++ b/ATRI/plugins/thesaurus/listener.py @@ -0,0 +1,121 @@ +import re +from random import choice, shuffle + +from nonebot import get_bot +from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent +from nonebot.adapters.onebot.v11.helpers import Cooldown + +from apscheduler.triggers.base import BaseTrigger +from apscheduler.triggers.combining import AndTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from ATRI.log import logger as log +from ATRI.utils.apscheduler import scheduler + +from .data_source import ThesaurusManager, ThesaurusListener, ThesaurusStoragor + + +class ThesaurusLinstenerIsEnabledChecker(BaseTrigger): +    def get_next_fire_time(self, previous_fire_time, now): +        tm = ThesaurusManager() +        conf = tm.load_service("词库管理") +        if conf.get("enabled"): +            return now + + +async def _thesaurus_vote_listener(): +    tm = ThesaurusManager() +    try: +        all_items = await tm.get_all_items(False) +    except Exception: +        return + +    for i in all_items: +        data: ThesaurusStoragor = i +        item_vote_list = data.vote_list +        if len(item_vote_list) == 10: +            t = data.m_type + +            if t == 0: +                m_type = "全匹配" +            elif t == 1: +                m_type = "模糊匹配" +            else: +                m_type = "正则" + +            result = await tm.add_item( +                data._id, +                True, +                data.matcher, +                list(data.result), +                data.need_at, +                m_type, +                data.group_id, +                data.operator, +                data.operator_id, +                1, +                list(data.vote_list), +            ) +            log.debug(result) + +            bot = get_bot() +            await bot.send_group_msg(group_id=data.group_id, message=result) + + +def init_listener(): +    scheduler.add_job( +        _thesaurus_vote_listener, +        AndTrigger([IntervalTrigger(seconds=10), ThesaurusLinstenerIsEnabledChecker()]), +        max_instances=3,  # type: ignore +        misfire_grace_time=20,  # type: ignore +    ) + + +main_listener = ThesaurusListener().on_message( +    "词库监听器", "监听所有消息判断是否满足触发词条条件", priority=4, block=False +) + + +@main_listener.handle([Cooldown(3)]) +async def _tl_listener(event: MessageEvent): +    tl = ThesaurusListener() +    msg = event.get_message().extract_plain_text() + +    group_id = 0 +    if isinstance(event, GroupMessageEvent): +        group_id = event.group_id + +    query_result = await tl.get_item_list(group_id) +    shuffle(query_result) +    if query_result: +        for item in query_result: +            item_info: ThesaurusStoragor = item + +            if item_info.m_type == 1: +                if item_info.matcher in msg: +                    if item_info.need_at: +                        if event.is_tome(): +                            await main_listener.finish(choice(item_info.result)) +                        else: +                            return +                    else: +                        await main_listener.finish(choice(item_info.result)) +            elif item_info.m_type == 2: +                patt = item_info.matcher +                if re.findall(patt, msg): +                    if item_info.need_at: +                        if event.is_tome(): +                            await main_listener.finish(choice(item_info.result)) +                        else: +                            return +                    else: +                        await main_listener.finish(choice(item_info.result)) +            else: +                if item_info.matcher == msg: +                    if item_info.need_at: +                        if event.is_tome(): +                            await main_listener.finish(choice(item_info.result)) +                        else: +                            return +                    else: +                        await main_listener.finish(choice(item_info.result)) | 
