summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-07-01 00:03:48 +0800
committerKyomotoi <[email protected]>2022-07-01 00:03:48 +0800
commitfbf7d682f92c10ad451b169a9b5f5631f5f0c5a8 (patch)
treeccbcb0d92512a6d4db7295fbc0de50c8c0bdbcf2
parentb6eda930bdb8c52a0bcaf05f83143e1f6861c7b7 (diff)
downloadATRI-fbf7d682f92c10ad451b169a9b5f5631f5f0c5a8.tar.gz
ATRI-fbf7d682f92c10ad451b169a9b5f5631f5f0c5a8.tar.bz2
ATRI-fbf7d682f92c10ad451b169a9b5f5631f5f0c5a8.zip
✨ 更新功能: 自定义词库
-rw-r--r--ATRI/database/models.py40
-rw-r--r--ATRI/exceptions.py14
-rw-r--r--ATRI/plugins/thesaurus/__init__.py835
-rw-r--r--ATRI/plugins/thesaurus/data_source.py192
-rw-r--r--ATRI/plugins/thesaurus/db.py47
-rw-r--r--ATRI/plugins/thesaurus/listener.py121
-rw-r--r--README.md2
7 files changed, 1246 insertions, 5 deletions
diff --git a/ATRI/database/models.py b/ATRI/database/models.py
index e268f01..d326151 100644
--- a/ATRI/database/models.py
+++ b/ATRI/database/models.py
@@ -10,6 +10,9 @@ class BilibiliSubscription(Model):
up_nickname = fields.TextField(null=True)
last_update = fields.DatetimeField(default=datetime.fromordinal(1))
+ class Meta:
+ app = "bilibili"
+
class TwitterSubscription(Model):
tid = fields.IntField()
@@ -17,3 +20,40 @@ class TwitterSubscription(Model):
name = fields.TextField(null=True)
screen_name = fields.TextField(null=True)
last_update = fields.DatetimeField(default=datetime.fromordinal(1))
+
+ class Meta:
+ app = "twitter"
+
+
+class ThesaurusStoragor(Model):
+ _id = fields.TextField()
+ matcher = fields.TextField(null=True)
+ result = fields.JSONField(null=True)
+ need_at = fields.IntField(null=True)
+ m_type = fields.IntField(null=True)
+ group_id = fields.IntField(null=True)
+ operator = fields.TextField(null=True)
+ operator_id = fields.IntField(null=True)
+ update_time = fields.DatetimeField(null=True)
+ is_vote = fields.IntField(null=True)
+ vote_list = fields.JSONField(null=True)
+
+ class Meta:
+ app = "ts"
+
+
+class ThesaurusAuditList(Model):
+ _id = fields.TextField()
+ matcher = fields.TextField(null=True)
+ result = fields.JSONField(null=True)
+ need_at = fields.IntField(null=True)
+ m_type = fields.IntField(null=True)
+ group_id = fields.IntField(null=True)
+ operator = fields.TextField(null=True)
+ operator_id = fields.IntField(null=True)
+ update_time = fields.DatetimeField(null=True)
+ is_vote = fields.IntField(null=True)
+ vote_list = fields.JSONField(null=True)
+
+ class Meta:
+ app = "tal"
diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py
index aebf9fc..db6265f 100644
--- a/ATRI/exceptions.py
+++ b/ATRI/exceptions.py
@@ -1,18 +1,18 @@
import time
import json
-import string
from pathlib import Path
-from random import sample
from typing import Optional
from traceback import format_exc
from pydantic.main import BaseModel
+from nonebot.matcher import Matcher
from nonebot.adapters.onebot.v11 import ActionFailed
from nonebot.adapters.onebot.v11 import Bot, PrivateMessageEvent, GroupMessageEvent
from nonebot.message import run_postprocessor
from .log import logger as log
from .config import BotSelfConfig
+from .utils import gen_random_str
ERROR_DIR = Path(".") / "data" / "errors"
@@ -27,7 +27,7 @@ class ErrorInfo(BaseModel):
def _save_error(prompt: str, content: str) -> str:
- track_id = "".join(sample(string.ascii_letters + string.digits, 8))
+ track_id = gen_random_str(8)
data = ErrorInfo(
track_id=track_id,
prompt=prompt,
@@ -94,8 +94,14 @@ class TwitterDynamicError(BaseBotException):
prompt = "Twitter动态订阅错误"
+class ThesaurusError(BaseBotException):
+ prompt = "词库相关错误"
+
+
@run_postprocessor
-async def _track_error(exception: Optional[Exception], bot: Bot, event) -> None:
+async def _track_error(
+ bot: Bot, event, matcher: Matcher, exception: Optional[Exception]
+) -> None:
if not exception:
return
diff --git a/ATRI/plugins/thesaurus/__init__.py b/ATRI/plugins/thesaurus/__init__.py
new file mode 100644
index 0000000..ca0be3e
--- /dev/null
+++ b/ATRI/plugins/thesaurus/__init__.py
@@ -0,0 +1,835 @@
+import pytz
+from tabulate import tabulate
+
+from nonebot.matcher import Matcher
+from nonebot.permission import SUPERUSER
+from nonebot.params import ArgPlainText, CommandArg
+from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN
+from nonebot.adapters.onebot.v11 import Bot, Message, MessageEvent, GroupMessageEvent
+
+from ATRI.utils import gen_random_str, MessageChecker
+
+from .data_source import ThesaurusManager
+
+
+add_item = ThesaurusManager().cmd_as_group("add", "添加本群词条,需审核或投票")
+
+
+@add_item.handle()
+async def _get_normal_item(matcher: Matcher, args: Message = CommandArg()):
+ raw_msg = args.extract_plain_text()
+ msg_data = raw_msg.split(" ")
+ data = dict(enumerate(msg_data))
+ if data.get(0):
+ matcher.set_arg("ts_normal_item_q", Message(msg_data[0]))
+ if data.get(1):
+ matcher.set_arg("ts_normal_item_a", Message(msg_data[1]))
+ if data.get(2):
+ matcher.set_arg("ts_normal_item_is_need_at", Message(msg_data[2]))
+ if data.get(3):
+ matcher.set_arg("ts_normal_item_t", Message(msg_data[3]))
+
+
+@add_item.got("ts_normal_item_q", "有人问:")
+@add_item.got("ts_normal_item_a", "我答: \n(支持多个回复,用','[小写]隔开)")
+@add_item.got("ts_normal_item_is_need_at", "是否需要at: (y/n)")
+async def _deal_noraml_is_need_at(
+ need_at: str = ArgPlainText("ts_normal_item_is_need_at"),
+):
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"]
+ if need_at not in agree_list and need_at not in disagree_list:
+ await add_item.reject("你的观点似乎并不相关呢...请重新输入: (y/n)")
+
+
+@add_item.got("ts_normal_item_t", "问答匹配模式: \n(全匹配、模糊匹配、正则)")
+async def _add_normal_item(
+ bot: Bot,
+ event: GroupMessageEvent,
+ item_q: str = ArgPlainText("ts_normal_item_q"),
+ item_a: str = ArgPlainText("ts_normal_item_a"),
+ _need_at: str = ArgPlainText("ts_normal_item_is_need_at"),
+ item_t: str = ArgPlainText("ts_normal_item_t"),
+):
+ type_list = ["全匹配", "模糊匹配", "正则"]
+ if item_t not in type_list:
+ await add_item.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.")
+
+ q_checker = MessageChecker(item_q).check_cq_code
+ a_checker = MessageChecker(item_a).check_cq_code
+ if not q_checker or not a_checker:
+ await add_item.finish("请不要尝试注入!")
+
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ need_at = 1 if _need_at in agree_list else 0
+
+ group_id = event.group_id
+ operator_id = event.user_id
+ operator_info = await bot.get_group_member_info(
+ group_id=group_id, user_id=operator_id
+ )
+ operator = operator_info.get("card", "unknown")
+ item_id = gen_random_str(6)
+ ans = item_a.split(",")
+ ts = ThesaurusManager()
+
+ result = await ts.add_item(
+ item_id,
+ False,
+ item_q,
+ ans,
+ need_at,
+ item_t,
+ group_id,
+ operator,
+ operator_id,
+ 1,
+ list(),
+ )
+ await add_item.finish(result)
+
+
+add_item_as_group_admin = ThesaurusManager().cmd_as_group(
+ "add.g", "添加本群词条,仅限管理,无需审核", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
+
+@add_item_as_group_admin.handle()
+async def _get_group_item(matcher: Matcher, args: Message = CommandArg()):
+ raw_msg = args.extract_plain_text()
+ msg_data = raw_msg.split(" ")
+ data = dict(enumerate(msg_data))
+ if data.get(0):
+ matcher.set_arg("ts_group_item_q", Message(msg_data[0]))
+ if data.get(1):
+ matcher.set_arg("ts_group_item_a", Message(msg_data[1]))
+ if data.get(2):
+ matcher.set_arg("ts_group_item_is_need_at", Message(msg_data[2]))
+ if data.get(3):
+ matcher.set_arg("ts_group_item_t", Message(msg_data[3]))
+
+
+@add_item_as_group_admin.got("ts_group_item_q", "有人问:")
+@add_item_as_group_admin.got("ts_group_item_a", "我答: \n(支持多个回复,用','[小写]隔开)")
+@add_item_as_group_admin.got("ts_group_item_is_need_at", "是否需要at: (y/n)")
+async def _deal_group_is_need_at(
+ need_at: str = ArgPlainText("ts_group_item_is_need_at"),
+):
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"]
+ if need_at not in agree_list and need_at not in disagree_list:
+ await add_item_as_group_admin.reject("你的观点似乎并不相关呢...请重新输入: (y/n)")
+
+
+@add_item_as_group_admin.got("ts_group_item_t", "问答匹配模式: \n(全匹配、模糊匹配、正则)")
+async def _add_group_item(
+ bot: Bot,
+ event: GroupMessageEvent,
+ item_q: str = ArgPlainText("ts_group_item_q"),
+ item_a: str = ArgPlainText("ts_group_item_a"),
+ _need_at: str = ArgPlainText("ts_group_item_is_need_at"),
+ item_t: str = ArgPlainText("ts_group_item_t"),
+):
+ type_list = ["全匹配", "模糊匹配", "正则"]
+ if item_t not in type_list:
+ await add_item_as_group_admin.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.")
+
+ q_checker = MessageChecker(item_q).check_cq_code
+ a_checker = MessageChecker(item_a).check_cq_code
+ if not q_checker or not a_checker:
+ await add_item_as_group_admin.finish("请不要尝试注入!")
+
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ need_at = 1 if _need_at in agree_list else 0
+
+ group_id = event.group_id
+ operator_id = event.user_id
+ operator_info = await bot.get_group_member_info(
+ group_id=group_id, user_id=operator_id
+ )
+ operator = operator_info.get("card", "unknown")
+ item_id = gen_random_str(6)
+ ans = item_a.split(",")
+ ts = ThesaurusManager()
+
+ result = await ts.add_item(
+ item_id,
+ True,
+ item_q,
+ ans,
+ need_at,
+ item_t,
+ group_id,
+ operator,
+ operator_id,
+ 0,
+ list(),
+ )
+ await add_item_as_group_admin.finish(result)
+
+
+add_item_for_global = ThesaurusManager().cmd_as_group(
+ "add.glo", "添加全局问答", permission=SUPERUSER
+)
+
+
+@add_item_for_global.handle()
+async def _get_global_item(matcher: Matcher, args: Message = CommandArg()):
+ raw_msg = args.extract_plain_text()
+ msg_data = raw_msg.split(" ")
+ data = dict(enumerate(msg_data))
+
+ if data.get(0):
+ matcher.set_arg("ts_global_item_q", Message(msg_data[0]))
+ if data.get(1):
+ matcher.set_arg("ts_global_item_a", Message(msg_data[1]))
+ if data.get(2):
+ matcher.set_arg("ts_global_is_need_at", Message(msg_data[2]))
+ if data.get(3):
+ matcher.set_arg("ts_global_item_type", Message(msg_data[3]))
+
+
+@add_item_for_global.got("ts_global_item_q", "有人问:")
+@add_item_for_global.got("ts_global_item_a", "我答: \n(支持多个回复,用','[小写]隔开)")
+@add_item_for_global.got("ts_global_item_is_need_at", "是否需要at: (y/n)")
+async def _deal_global_is_need_at(
+ event: MessageEvent, need_at: str = ArgPlainText("ts_global_item_is_need_at")
+):
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"]
+ if need_at not in agree_list and need_at not in disagree_list:
+ await add_item_for_global.reject("你的观点似乎并不相关呢...请重新输入: (y/n)")
+
+
+@add_item_for_global.got("ts_global_item_type", "问答匹配模式: \n(全匹配、模糊匹配、正则)")
+async def _add_global_item(
+ event: MessageEvent,
+ item_q: str = ArgPlainText("ts_global_item_q"),
+ item_a: str = ArgPlainText("ts_global_item_a"),
+ _need_at: str = ArgPlainText("ts_global_item_is_need_at"),
+ item_t: str = ArgPlainText("ts_global_item_type"),
+):
+ type_list = ["全匹配", "模糊匹配", "正则"]
+ if item_t not in type_list:
+ await add_item_for_global.finish("该类型不支持 (全匹配、模糊匹配、正则)\n请重新提交.")
+
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ need_at = 1 if _need_at in agree_list else 0
+
+ operator = "SUPERUSER"
+ opeartor_id = event.user_id
+ item_id = gen_random_str(6)
+ ans = item_a.split(",")
+ tm = ThesaurusManager()
+
+ result = await tm.add_item(
+ item_id, True, item_q, ans, need_at, item_t, 0, operator, opeartor_id, 0, list()
+ )
+ await add_item_for_global.finish(result)
+
+
+vote = ThesaurusManager().cmd_as_group("v", "对本群内审核中的词条进行投票")
+
+
+async def _get_vote_info(matcher: Matcher, args: Message = CommandArg()):
+ raw_msg = args.extract_plain_text()
+ msg_data = raw_msg.split(" ")
+ data = dict(enumerate(msg_data))
+ if data.get(0):
+ matcher.set_arg("ts_vote_id", Message(msg_data[0]))
+ if data.get(1):
+ matcher.set_arg("ts_vote_attitude", Message(msg_data[1]))
+
+
[email protected]("ts_vote_id", "要投票的词条id是:")
+async def _get_item_id(
+ event: GroupMessageEvent, item_id: str = ArgPlainText("ts_vote_id")
+):
+ tm = ThesaurusManager()
+ user_id = event.user_id
+ group_id = event.group_id
+
+ query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id})
+ if not query_result:
+ await vote.finish("未找到此id相关信息...请检查是否输入正确(")
+
+ item_info = query_result[0]
+ if user_id in item_info.vote_list:
+ await vote.finish("你已经参与过啦!")
+
+ result = f"""你即将要投票的词条信息:
+ 词条ID: {item_info._id}
+ 有人问: {item_info.matcher}
+ 我答: {"、".join(map(str, item_info.result))}
+ 提交人: {item_info.operator}@{item_info.operator_id}
+ 当前赞成: {"、".join(map(str, item_info.vote_list))}
+ """.strip()
+ await vote.send(Message(result))
+
+
[email protected]("ts_vote_attitude", "你的选择是?(y/n)")
+async def _get_voter_attitude(
+ event: GroupMessageEvent,
+ item_id: str = ArgPlainText("ts_vote_id"),
+ attitude: str = ArgPlainText("ts_vote_attitude"),
+):
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"]
+ if attitude not in agree_list and attitude not in disagree_list:
+ await vote.reject("你的观点似乎不相关呢...请重新输入: (y/n)")
+
+ tm = ThesaurusManager()
+ group_id = event.group_id
+ user_id = event.user_id
+
+ if attitude in agree_list:
+ await tm.vote(item_id, group_id, user_id)
+ await vote.finish(f"已赞成此词条~ ID: {item_id}")
+ else:
+ await vote.finish("好吧...")
+
+
+del_item = ThesaurusManager().cmd_as_group(
+ "del", "删除本群词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
+
+@del_item.handle()
+async def _get_del_normal_item_info(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("ts_del_normal_item_id", args)
+
+
+@del_item.got("ts_del_item_id", "要删除词条的id是?")
+async def _deal_del_normal_item(
+ event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_item_id")
+):
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ result = await tm.del_item(item_id, group_id, True)
+ await del_item.finish(result)
+
+
+del_global_item = ThesaurusManager().cmd_as_group(
+ "del.g", "删除全局词条", permission=SUPERUSER
+)
+
+
+@del_global_item.handle()
+async def _get_del_global_item_info(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("ts_del_global_item_id", args)
+
+
+@del_global_item.got("ts_del_global_item_id", "要删除词条的id是?")
+async def _deal_del_global_item(
+ event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_global_item_id")
+):
+ tm = ThesaurusManager()
+
+ result = await tm.del_item(item_id, 0, True)
+ await del_global_item.finish(result)
+
+
+del_vote_item = ThesaurusManager().cmd_as_group(
+ "del.v", "删除本群处于投票中的词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
+
+@del_vote_item.handle()
+async def _get_deal_vote_item_info(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("ts_del_vote_item_id", args)
+
+
+@del_vote_item.got("ts_del_vote_item_id", "要删除词条的id是?")
+async def _deal_del_vote_item(
+ event: GroupMessageEvent, item_id: str = ArgPlainText("ts_del_vote_item_id")
+):
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ result = await tm.del_item(item_id, group_id, False)
+ await del_vote_item.finish(result)
+
+
+_LIST_SHOW_DATA: dict = dict()
+
+
+list_item = ThesaurusManager().cmd_as_group("list", "查看本群词条")
+
+
+@list_item.handle()
+async def _get_normal_item_list(event: GroupMessageEvent):
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ query_result = await tm.get_item_list({"group_id": group_id}, True)
+ if not query_result:
+ await list_item.finish("本群还没有词条呢...")
+
+ items = list()
+ for i in query_result[:10]:
+ item_matcher = i.matcher
+ item_type = i.m_type
+ if item_type == 0:
+ m_type = "全匹配"
+ elif item_type == 1:
+ m_type = "模糊匹配"
+ else:
+ m_type = "正则"
+
+ items.append([i._id, item_matcher, m_type])
+
+ output = (
+ "本群已添加以下词条:\n"
+ + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain")
+ + "\n(一页仅展示10个)"
+ )
+
+ if len(query_result) > 10:
+ await list_item.send(output)
+ else:
+ await list_item.finish(output)
+
+
+@list_item.got("item_normal_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~")
+async def _get_normal_item_more(
+ event: GroupMessageEvent, is_next: str = ArgPlainText("item_normal_is_next")
+):
+ user_id = event.user_id
+ group_id = event.group_id
+
+ judge_list = ["下一页", "n", "next"]
+ if is_next not in judge_list:
+ try:
+ del _LIST_SHOW_DATA[group_id][user_id]
+ except Exception:
+ pass
+ await list_item.finish("结束查看~")
+
+ if group_id in _LIST_SHOW_DATA:
+ _LIST_SHOW_DATA[group_id][user_id] += 10
+ else:
+ _LIST_SHOW_DATA[group_id] = {user_id: 10}
+
+ tm = ThesaurusManager()
+ items = list()
+ show_item = _LIST_SHOW_DATA[group_id][user_id]
+ query_result = await tm.get_item_list({"group_id": group_id}, True)
+ for i in query_result[:show_item]:
+ item_matcher = i.matcher
+ item_type = i.m_type
+ if item_type == 0:
+ m_type = "全匹配"
+ elif item_type == 1:
+ m_type = "模糊匹配"
+ else:
+ m_type = "正则"
+
+ items.append([i._id, item_matcher, m_type])
+
+ output = (
+ tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain")
+ + "\n('下一页'或'n'以继续查看,任意回复以退出)"
+ )
+ await list_item.reject(output)
+
+
+list_global_item = ThesaurusManager().cmd_as_group("list.g", "查看全局词条")
+
+
+@list_global_item.handle()
+async def _get_global_item_list(event: MessageEvent):
+ tm = ThesaurusManager()
+
+ query_result = await tm.get_item_list({"group_id": 0}, True)
+ if not query_result:
+ await list_global_item.finish("还没有给咱添加全局词条呢...")
+
+ items = list()
+ for i in query_result[:10]:
+ item_matcher = i.matcher
+ item_type = i.m_type
+ if item_type == 0:
+ m_type = "全匹配"
+ elif item_type == 1:
+ m_type = "模糊匹配"
+ else:
+ m_type = "正则"
+
+ items.append([i._id, item_matcher, m_type])
+
+ output = (
+ "咱已装载以下词条:\n"
+ + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain")
+ + "\n(一页仅展示10个)"
+ )
+
+ if len(query_result) > 10:
+ await list_global_item.send(output)
+ else:
+ await list_global_item.finish(output)
+
+
+@list_global_item.got("item_global_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~")
+async def _get_global_item_more(
+ event: MessageEvent, is_next: str = ArgPlainText("item_global_is_next")
+):
+ user_id = event.user_id
+
+ judge_list = ["下一页", "n", "next"]
+ if is_next not in judge_list:
+ try:
+ del _LIST_SHOW_DATA[user_id]
+ except Exception:
+ pass
+ await list_global_item.finish("结束查看~")
+
+ if user_id in _LIST_SHOW_DATA:
+ _LIST_SHOW_DATA[user_id] += 10
+ else:
+ _LIST_SHOW_DATA[user_id] = 10
+
+ tm = ThesaurusManager()
+ items = list()
+ show_item = _LIST_SHOW_DATA[user_id]
+ query_result = await tm.get_item_list({"group_id": 0}, True)
+ for i in query_result[:show_item]:
+ item_matcher = i.matcher
+ item_type = i.m_type
+ if item_type == 0:
+ m_type = "全匹配"
+ elif item_type == 1:
+ m_type = "模糊匹配"
+ else:
+ m_type = "正则"
+
+ items.append([i._id, item_matcher, m_type])
+
+ output = (
+ tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain")
+ + "\n('下一页'或'n'以继续查看,任意回复以退出)"
+ )
+ await list_global_item.reject(output)
+
+
+list_vote_item = ThesaurusManager().cmd_as_group("list.v", "查看本群待投票词条")
+
+
+@list_vote_item.handle()
+async def _get_vote_item_list(event: GroupMessageEvent):
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ query_result = await tm.get_item_list({"group_id": group_id})
+ if not query_result:
+ await list_vote_item.finish("本群暂未添加待审核词条...")
+
+ items = list()
+ for i in query_result[:10]:
+ item_matcher = i.matcher
+ item_type = i.m_type
+ if item_type == 0:
+ m_type = "全匹配"
+ elif item_type == 1:
+ m_type = "模糊匹配"
+ else:
+ m_type = "正则"
+
+ items.append([i._id, item_matcher, m_type])
+
+ output = (
+ "当前待审词条概况如下:\n"
+ + tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain")
+ + "\n(一页仅展示10个)"
+ )
+
+ if len(query_result) > 10:
+ await list_vote_item.send(output)
+ else:
+ await list_vote_item.finish(output)
+
+
+@list_vote_item.got("item_vote_is_next", "回复'下一页'或'n'以继续查看,任意回复以退出~")
+async def _get_vote_item_more(
+ event: GroupMessageEvent, is_next: str = ArgPlainText("item_vote_is_next")
+):
+ user_id = event.user_id
+ group_id = event.group_id
+
+ judge_list = ["下一页", "n", "next"]
+ if is_next not in judge_list:
+ try:
+ del _LIST_SHOW_DATA[group_id][user_id]
+ except Exception:
+ pass
+ await list_vote_item.finish("结束查看~")
+
+ if group_id in _LIST_SHOW_DATA:
+ _LIST_SHOW_DATA[group_id][user_id] += 10
+ else:
+ _LIST_SHOW_DATA[group_id] = {user_id: 10}
+
+ tm = ThesaurusManager()
+ items = list()
+ show_item = _LIST_SHOW_DATA[group_id][user_id]
+ query_result = await tm.get_item_list({"group_id": group_id})
+ for i in query_result[:show_item]:
+ item_matcher = i.matcher
+ item_type = i.m_type
+ if item_type == 0:
+ m_type = "全匹配"
+ elif item_type == 1:
+ m_type = "模糊匹配"
+ else:
+ m_type = "正则"
+
+ items.append([i._id, item_matcher, m_type])
+
+ output = (
+ tabulate(items, headers=["ID", "匹配词", "判断方式"], tablefmt="plain")
+ + "\n('下一页'或'n'以继续查看,任意回复以退出)"
+ )
+ await list_vote_item.reject(output)
+
+
+audit_item = ThesaurusManager().cmd_as_group(
+ "audit", "审核本群处于投票中的词条", permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN
+)
+
+
+@audit_item.handle()
+async def _get_group_item_info(matcher: Matcher, args: Message = CommandArg()):
+ raw_msg = args.extract_plain_text()
+ msg_data = raw_msg.split(" ")
+ data = dict(enumerate(msg_data))
+ if data.get(0):
+ matcher.set_arg("ts_audit_vote_id", Message(msg_data[0]))
+ if data.get(1):
+ matcher.set_arg("ts_audit_vote_attitude", Message(msg_data[1]))
+
+
+@audit_item.got("ts_audit_vote_id", "要审核的词条id是:")
+async def _get_audit_item_id(
+ event: GroupMessageEvent, item_id: str = ArgPlainText("ts_group_vote_id")
+):
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id})
+ if not query_result:
+ await audit_item.finish("未找到此id相关信息...请检查是否输入正确(")
+
+ item_info = query_result[0]
+ result = f"""你即将要审核的词条信息:
+ 词条ID: {item_info._id}
+ 有人问: {item_info.matcher}
+ 我答: {"、".join(map(str, item_info.result))}
+ 提交人: {item_info.operator}@{item_info.operator_id}
+ 当前赞成: {"、".join(map(str, item_info.vote_list))}
+ """.strip()
+ await audit_item.send(Message(result))
+
+
+@audit_item.got("ts_audit_vote_attitude", "你的选择是?(y/n)")
+async def _get_audit_attitude(
+ event: GroupMessageEvent,
+ item_id: str = ArgPlainText("ts_audit_vote_id"),
+ attitude: str = ArgPlainText("ts_audit_vote_attitude"),
+):
+ agree_list = ["y", "Y", "是", "同意", "赞成"]
+ disagree_list = ["n", "N", "否", "不", "不同意", "不赞成"]
+ if attitude not in agree_list and attitude not in disagree_list:
+ await audit_item.reject("你的观点似乎不相关呢...请重新输入: (y/n)")
+
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ query_result = await tm.get_item_list({"_id": item_id, "group_id": group_id})
+ if not query_result:
+ await audit_item.finish("未找到相关内容...请检查输入...")
+ item_info = query_result[0]
+
+ if attitude in agree_list:
+ await tm.add_item(
+ item_id,
+ True,
+ item_info.matcher,
+ item_info.result,
+ item_info.need_at,
+ item_info.m_type,
+ group_id,
+ item_info.operator,
+ item_info.operator_id,
+ 0,
+ list(),
+ )
+ await tm.del_item(item_id, group_id, False)
+ else:
+ await tm.del_item(item_id, group_id, False)
+
+ await audit_item.finish("完成~!")
+
+
+_ITEM_SHOW_FORMAT = """该词条信息如下:
+词条ID: {_id}
+有人问: {matcher}
+我答: {ans}
+判断方式: {m_type}
+提交人: {operator}@{operator_id}
+更新时间: {update_time}
+是否为投票选出: {is_vote}
+投票赞成: {vote_list}
+"""
+
+
+get_normal_item_info = ThesaurusManager().cmd_as_group("i", "查看本群的词条详情")
+
+
+@get_normal_item_info.handle()
+async def _info_normal_get_item_id(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("info_normal_item_id", args)
+
+
+@get_normal_item_info.got("info_normal_item_id", "需要查看的词条ID:")
+async def _info_normal_get_item_info(
+ event: GroupMessageEvent, _id: str = ArgPlainText("info_normal_item_id")
+):
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ query_result = await tm.get_item_list({"_id": _id, "group_id": group_id}, True)
+ if not query_result:
+ await get_normal_item_info.finish("未找到此ID相关信息...")
+
+ item_info = query_result[0]
+
+ item_type = item_info.m_type
+ if item_type == 1:
+ m_type = "模糊匹配"
+ elif item_type == 2:
+ m_type = "正则"
+ else:
+ m_type = "全匹配"
+
+ result = _ITEM_SHOW_FORMAT.format(
+ _id=_id,
+ matcher=item_info.matcher,
+ ans="、".join(map(str, item_info.result)),
+ m_type=m_type,
+ operator=item_info.operator,
+ operator_id=item_info.operator_id,
+ update_time=item_info.update_time.replace(
+ tzinfo=pytz.timezone("Asia/Shanghai")
+ ),
+ is_vote="是" if item_info.is_vote else "否",
+ vote_list=item_info.vote_list,
+ )
+ await get_normal_item_info.finish(result)
+
+
+get_global_item_info = ThesaurusManager().cmd_as_group("i.g", "查看全局的词条详情")
+
+
+@get_global_item_info.handle()
+async def _info_global_get_item_id(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("info_global_item_id", args)
+
+
+@get_global_item_info.got("info_global_item_id", "需要查看的词条ID:")
+async def _info_global_get_item_info(_id: str = ArgPlainText("info_global_item_id")):
+ tm = ThesaurusManager()
+
+ query_result = await tm.get_item_list({"_id": _id, "group_id": 0}, True)
+ if not query_result:
+ await get_global_item_info.finish("未找到此ID相关信息...")
+
+ item_info = query_result[0]
+
+ item_type = item_info.m_type
+ if item_type == 1:
+ m_type = "模糊匹配"
+ elif item_type == 2:
+ m_type = "正则"
+ else:
+ m_type = "全匹配"
+
+ result = _ITEM_SHOW_FORMAT.format(
+ _id=_id,
+ matcher=item_info.matcher,
+ ans="、".join(map(str, item_info.result)),
+ m_type=m_type,
+ operator=item_info.operator,
+ operator_id=item_info.operator_id,
+ update_time=item_info.update_time.replace(
+ tzinfo=pytz.timezone("Asia/Shanghai")
+ ),
+ is_vote="是" if item_info.is_vote else "否",
+ vote_list=item_info.vote_list,
+ )
+ await get_global_item_info.finish(result)
+
+
+get_vote_item_info = ThesaurusManager().cmd_as_group("i.v", "查看本群的待审核/投票的词条详情")
+
+
+@get_vote_item_info.handle()
+async def _info_vote_get_item_id(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("info_vote_item_id", args)
+
+
+@get_vote_item_info.got("info_vote_item_id", "需要查看的词条ID:")
+async def _info_vote_get_item_info(
+ event: GroupMessageEvent, _id: str = ArgPlainText("info_vote_item_id")
+):
+ tm = ThesaurusManager()
+ group_id = event.group_id
+
+ query_result = await tm.get_item_list({"_id": _id, "group_id": group_id})
+ if not query_result:
+ await get_vote_item_info.finish("未找到此ID相关信息...")
+
+ item_info = query_result[0]
+
+ item_type = item_info.m_type
+ if item_type == 1:
+ m_type = "模糊匹配"
+ elif item_type == 2:
+ m_type = "正则"
+ else:
+ m_type = "全匹配"
+
+ result = _ITEM_SHOW_FORMAT.format(
+ _id=_id,
+ matcher=item_info.matcher,
+ ans="、".join(map(str, item_info.result)),
+ m_type=m_type,
+ operator=item_info.operator,
+ operator_id=item_info.operator_id,
+ update_time=item_info.update_time.replace(
+ tzinfo=pytz.timezone("Asia/Shanghai")
+ ),
+ is_vote="是" if item_info.is_vote else "否",
+ vote_list=item_info.vote_list,
+ )
+ await get_vote_item_info.finish(result)
+
+
+from ATRI import driver
+
+from .listener import init_listener
+
+
+driver().on_startup(init_listener)
diff --git a/ATRI/plugins/thesaurus/data_source.py b/ATRI/plugins/thesaurus/data_source.py
new file mode 100644
index 0000000..30f2c5b
--- /dev/null
+++ b/ATRI/plugins/thesaurus/data_source.py
@@ -0,0 +1,192 @@
+import pytz
+from datetime import datetime
+
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN
+
+from ATRI.service import Service
+from ATRI.rule import is_in_service
+from ATRI.exceptions import ThesaurusError
+
+from .db import DBForTS, DBForTAL
+from .db import ThesaurusStoragor
+
+
+class ThesaurusManager(Service):
+ def __init__(self):
+ Service.__init__(
+ self,
+ "词库管理",
+ "支持模糊匹配、全匹配、正则的自定义回复~\n支持分群、全局管理,支持群内投票添加",
+ rule=is_in_service("词库管理"),
+ permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN,
+ main_cmd="/ts",
+ )
+
+ async def __add_item(self, _id: str, group_id: int, is_main: bool = False):
+ if is_main:
+ try:
+ async with DBForTS() as db:
+ await db.add_item(_id, group_id)
+ except Exception:
+ raise ThesaurusError(f"添加词库(ts)数据失败 目标词id: {_id}")
+ else:
+ try:
+ async with DBForTAL() as db:
+ await db.add_item(_id, group_id)
+ except Exception:
+ raise ThesaurusError(f"添加词库(tal)数据失败 目标词id: {_id}")
+
+ async def update_item(
+ self, _id: str, group_id: int, update_map: dict, is_main: bool = False
+ ):
+ if is_main:
+ try:
+ async with DBForTS() as db:
+ await db.update_item(_id, group_id, update_map)
+ except Exception:
+ raise ThesaurusError(f"更新词库(ts)数据失败 目标词id: {_id}")
+ else:
+ try:
+ async with DBForTAL() as db:
+ await db.update_item(_id, group_id, update_map)
+ except Exception:
+ raise ThesaurusError(f"更新词库(tal)数据失败 目标词id: {_id}")
+
+ async def __del_item(self, _id: str, group_id: int, is_main: bool = False):
+ if is_main:
+ try:
+ async with DBForTS() as db:
+ await db.del_item({"_id": _id, "group_id": group_id})
+ except Exception:
+ raise ThesaurusError(f"删除词库(ts)数据失败 目标词id: {_id}")
+ else:
+ try:
+ async with DBForTAL() as db:
+ await db.del_item({"_id": _id, "group_id": group_id})
+ except Exception:
+ raise ThesaurusError(f"删除词库(tal)数据失败 目标词id: {_id}")
+
+ async def get_item_list(self, query_map: dict, is_main: bool = False) -> list:
+ if is_main:
+ try:
+ async with DBForTS() as db:
+ return await db.get_item_list(query_map)
+ except Exception:
+ raise ThesaurusError("获取词库(ts)列表数据失败")
+ else:
+ try:
+ async with DBForTAL() as db:
+ return await db.get_item_list(query_map)
+ except Exception:
+ raise ThesaurusError("获取词库(tal)列表数据失败")
+
+ async def get_all_items(self, is_main: bool = False) -> list:
+ if is_main:
+ try:
+ async with DBForTS() as db:
+ return await db.get_all_items()
+ except Exception:
+ raise ThesaurusError("获取全部词库(ts)列表数据失败")
+ else:
+ try:
+ async with DBForTAL() as db:
+ return await db.get_all_items()
+ except Exception:
+ raise ThesaurusError("获取全部词库(tal)列表数据失败")
+
+ async def add_item(
+ self,
+ _id: str,
+ is_main: bool,
+ q: str,
+ a: list,
+ need_at: int,
+ t: str,
+ group_id: int,
+ operator: str,
+ operator_id: int,
+ is_vote: int,
+ vote_list: list,
+ ) -> str:
+ query_result = await self.get_item_list(
+ {"matcher": q, "group_id": group_id}, is_main
+ )
+ if query_result:
+ item_info = query_result[0]
+ return f"""{"(需审核/投票)" if not is_main else str()}该词条已存在!! ID: {item_info._id}"""
+
+ if t == "全匹配":
+ m_type = 0
+ elif t == "模糊匹配":
+ m_type = 1
+ else:
+ m_type = 2
+
+ item_meta = {
+ "matcher": q,
+ "result": a,
+ "need_at": need_at,
+ "m_type": m_type,
+ "group_id": group_id,
+ "operator": operator,
+ "operator_id": operator_id,
+ "update_time": datetime.now(pytz.timezone("Asia/Shanghai")),
+ "is_vote": is_vote,
+ "vote_list": vote_list,
+ }
+
+ await self.__add_item(_id, group_id, is_main)
+ await self.update_item(
+ _id,
+ group_id,
+ item_meta,
+ is_main,
+ )
+ return f"""{"(需审核/投票)" if not is_main else str()}成功加上新词条 ID: {_id}"""
+
+ async def del_item(self, _id: str, group_id: int, is_main: bool):
+ query_result = await self.get_item_list(
+ {"_id": _id, "group_id": group_id}, is_main
+ )
+ if not query_result:
+ return f"目标id: {_id} 没有记录呢..."
+
+ await self.__del_item(_id, group_id, is_main)
+ return f"成功删除目标id: {_id} 问答信息"
+
+ async def vote(self, _id: str, group_id: int, voter: int):
+ raw_item_info = await self.get_item_list({"_id": _id, "group_id": group_id})
+ item_info = raw_item_info[0]
+ vote_list: list = item_info.vote_list
+ vote_list.append(voter)
+
+ await self.update_item(
+ _id,
+ group_id,
+ {
+ "vote_list": vote_list,
+ "update_time": datetime.now(pytz.timezone("Asia/Shanghai")),
+ },
+ )
+
+
+class ThesaurusListener(Service):
+ def __init__(self):
+ Service.__init__(self, "词库监听", "词库监听器", rule=is_in_service("词库监听"))
+
+ async def get_item_by_id(self, _id: str) -> ThesaurusStoragor:
+ try:
+ async with DBForTS() as db:
+ data = await db.get_item_list({"_id": _id})
+ except Exception:
+ raise ThesaurusError(f"获取词库(ts)数据失败 词条ID: {_id}")
+
+ return data[0]
+
+ async def get_item_list(self, group_id: int):
+ try:
+ async with DBForTS() as db:
+ return await db.get_item_list({"group_id": group_id})
+ except Exception:
+ raise ThesaurusError(f"获取词库(ts)数据失败 目标群号: {group_id}")
diff --git a/ATRI/plugins/thesaurus/db.py b/ATRI/plugins/thesaurus/db.py
new file mode 100644
index 0000000..b5394fc
--- /dev/null
+++ b/ATRI/plugins/thesaurus/db.py
@@ -0,0 +1,47 @@
+from ATRI.database import ThesaurusStoragor, ThesaurusAuditList
+
+
+class DBForTS:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ async def add_item(self, _id: str, group_id: int):
+ await ThesaurusStoragor.create(_id=_id, group_id=group_id)
+
+ async def update_item(self, _id: str, group_id: int, update_map: dict):
+ await ThesaurusStoragor.filter(_id=_id, group_id=group_id).update(**update_map)
+
+ async def del_item(self, query_map: dict):
+ await ThesaurusStoragor.filter(**query_map).delete()
+
+ async def get_item_list(self, query_map: dict) -> list:
+ return await ThesaurusStoragor.filter(**query_map)
+
+ async def get_all_items(self) -> list:
+ return await ThesaurusStoragor.all()
+
+
+class DBForTAL:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ async def add_item(self, _id: str, group_id: int):
+ await ThesaurusAuditList.create(_id=_id, group_id=group_id)
+
+ async def update_item(self, _id: str, group_id: int, update_map: dict):
+ await ThesaurusAuditList.filter(_id=_id, group_id=group_id).update(**update_map)
+
+ async def del_item(self, query_map: dict):
+ await ThesaurusAuditList.filter(**query_map).delete()
+
+ async def get_item_list(self, query_map: dict) -> list:
+ return await ThesaurusAuditList.filter(**query_map)
+
+ async def get_all_items(self) -> list:
+ return await ThesaurusAuditList.all()
diff --git a/ATRI/plugins/thesaurus/listener.py b/ATRI/plugins/thesaurus/listener.py
new file mode 100644
index 0000000..d3c3887
--- /dev/null
+++ b/ATRI/plugins/thesaurus/listener.py
@@ -0,0 +1,121 @@
+import re
+from random import choice, shuffle
+
+from nonebot import get_bot
+from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent
+from nonebot.adapters.onebot.v11.helpers import Cooldown
+
+from apscheduler.triggers.base import BaseTrigger
+from apscheduler.triggers.combining import AndTrigger
+from apscheduler.triggers.interval import IntervalTrigger
+
+from ATRI.log import logger as log
+from ATRI.utils.apscheduler import scheduler
+
+from .data_source import ThesaurusManager, ThesaurusListener, ThesaurusStoragor
+
+
+class ThesaurusLinstenerIsEnabledChecker(BaseTrigger):
+ def get_next_fire_time(self, previous_fire_time, now):
+ tm = ThesaurusManager()
+ conf = tm.load_service("词库管理")
+ if conf.get("enabled"):
+ return now
+
+
+async def _thesaurus_vote_listener():
+ tm = ThesaurusManager()
+ try:
+ all_items = await tm.get_all_items(False)
+ except Exception:
+ return
+
+ for i in all_items:
+ data: ThesaurusStoragor = i
+ item_vote_list = data.vote_list
+ if len(item_vote_list) == 10:
+ t = data.m_type
+
+ if t == 0:
+ m_type = "全匹配"
+ elif t == 1:
+ m_type = "模糊匹配"
+ else:
+ m_type = "正则"
+
+ result = await tm.add_item(
+ data._id,
+ True,
+ data.matcher,
+ list(data.result),
+ data.need_at,
+ m_type,
+ data.group_id,
+ data.operator,
+ data.operator_id,
+ 1,
+ list(data.vote_list),
+ )
+ log.debug(result)
+
+ bot = get_bot()
+ await bot.send_group_msg(group_id=data.group_id, message=result)
+
+
+def init_listener():
+ scheduler.add_job(
+ _thesaurus_vote_listener,
+ AndTrigger([IntervalTrigger(seconds=10), ThesaurusLinstenerIsEnabledChecker()]),
+ max_instances=3, # type: ignore
+ misfire_grace_time=20, # type: ignore
+ )
+
+
+main_listener = ThesaurusListener().on_message(
+ "词库监听器", "监听所有消息判断是否满足触发词条条件", priority=4, block=False
+)
+
+
+@main_listener.handle([Cooldown(3)])
+async def _tl_listener(event: MessageEvent):
+ tl = ThesaurusListener()
+ msg = event.get_message().extract_plain_text()
+
+ group_id = 0
+ if isinstance(event, GroupMessageEvent):
+ group_id = event.group_id
+
+ query_result = await tl.get_item_list(group_id)
+ shuffle(query_result)
+ if query_result:
+ for item in query_result:
+ item_info: ThesaurusStoragor = item
+
+ if item_info.m_type == 1:
+ if item_info.matcher in msg:
+ if item_info.need_at:
+ if event.is_tome():
+ await main_listener.finish(choice(item_info.result))
+ else:
+ return
+ else:
+ await main_listener.finish(choice(item_info.result))
+ elif item_info.m_type == 2:
+ patt = item_info.matcher
+ if re.findall(patt, msg):
+ if item_info.need_at:
+ if event.is_tome():
+ await main_listener.finish(choice(item_info.result))
+ else:
+ return
+ else:
+ await main_listener.finish(choice(item_info.result))
+ else:
+ if item_info.matcher == msg:
+ if item_info.need_at:
+ if event.is_tome():
+ await main_listener.finish(choice(item_info.result))
+ else:
+ return
+ else:
+ await main_listener.finish(choice(item_info.result))
diff --git a/README.md b/README.md
index 3da11ce..fc31383 100644
--- a/README.md
+++ b/README.md
@@ -65,7 +65,7 @@
> 被勾上的即已大致完成,但仍需优化
- [x] 网页控制台
- [ ] Twitter助手
-- [ ] 自定义词库(多种方式匹配)
+- [x] 自定义词库(多种方式匹配)
- [ ] RSS订阅
- [ ] 冷重启
- [ ] 进裙验证(问题可自定义)