summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--ATRI/database/db.py9
-rw-r--r--ATRI/plugins/anime_search.py9
-rw-r--r--ATRI/plugins/applet/data_source.py6
-rw-r--r--ATRI/plugins/bilibili_dynamic/__init__.py42
-rw-r--r--ATRI/plugins/bilibili_dynamic/data_source.py32
-rw-r--r--ATRI/plugins/code_runner/__init__.py38
-rw-r--r--ATRI/plugins/code_runner/data_source.py8
-rw-r--r--ATRI/plugins/essential.py31
-rw-r--r--ATRI/plugins/funny/data_source.py7
-rw-r--r--ATRI/plugins/help/__init__.py27
-rw-r--r--ATRI/plugins/help/data_source.py4
-rw-r--r--ATRI/plugins/kimo/__init__.py (renamed from ATRI/plugins/chat/__init__.py)33
-rw-r--r--ATRI/plugins/kimo/data_source.py (renamed from ATRI/plugins/chat/data_source.py)12
-rw-r--r--ATRI/plugins/manage/data_source.py5
-rw-r--r--ATRI/plugins/polaroid/__init__.py27
-rw-r--r--ATRI/plugins/polaroid/data_source.py60
-rw-r--r--ATRI/plugins/polaroid/image_dealer.py46
-rw-r--r--ATRI/plugins/repo.py2
-rw-r--r--ATRI/plugins/saucenao/__init__.py1
-rw-r--r--ATRI/plugins/saucenao/data_source.py19
-rw-r--r--ATRI/plugins/setu/__init__.py22
-rw-r--r--ATRI/plugins/setu/data_source.py25
-rw-r--r--ATRI/plugins/status/__init__.py4
-rw-r--r--ATRI/plugins/status/data_source.py34
-rw-r--r--ATRI/plugins/util/data_source.py7
-rw-r--r--ATRI/service.py54
-rw-r--r--ATRI/utils/__init__.py7
-rw-r--r--ATRI/utils/apscheduler.py23
-rw-r--r--ATRI/utils/check_update.py54
-rw-r--r--README.md5
-rw-r--r--test/test_plugin_chat.py84
-rw-r--r--test/test_plugin_code_runner.py10
-rw-r--r--test/test_plugin_help.py23
-rw-r--r--test/test_plugin_kimo.py43
35 files changed, 453 insertions, 363 deletions
diff --git a/.gitignore b/.gitignore
index cad6e71..b152ff0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -144,3 +144,6 @@ dmypy.json
/accounts/*
/data/*
+
+# for MacOS
+.DS_Store
diff --git a/ATRI/database/db.py b/ATRI/database/db.py
index c2aa015..68db8cf 100644
--- a/ATRI/database/db.py
+++ b/ATRI/database/db.py
@@ -1,7 +1,12 @@
+from pathlib import Path
from tortoise import Tortoise
+from nonebot import get_driver
from ATRI.database import models
-from nonebot import get_driver
+
+
+DB_DIR = Path(".") / "data" / "database" / "sql"
+DB_DIR.mkdir(exist_ok=True)
# 关于数据库的操作类,只实现与数据库有关的CRUD
@@ -17,7 +22,7 @@ class DB:
from ATRI.database import models
await Tortoise.init(
- db_url="sqlite://ATRI/database/db.sqlite3",
+ db_url=f"sqlite://{DB_DIR}/db.sqlite3",
modules={"models": [locals()["models"]]},
)
# Generate the schema
diff --git a/ATRI/plugins/anime_search.py b/ATRI/plugins/anime_search.py
index 3bfeedb..773748d 100644
--- a/ATRI/plugins/anime_search.py
+++ b/ATRI/plugins/anime_search.py
@@ -8,17 +8,16 @@ from ATRI.rule import is_in_service
from ATRI.utils import request, Translate
from ATRI.exceptions import RequestError
+
URL = "https://api.trace.moe/search?anilistInfo=true&url="
_anime_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
-__doc__ = """
-通过一张图片搜索你需要的番!据说里*也可以
-"""
-
class Anime(Service):
def __init__(self):
- Service.__init__(self, "以图搜番", __doc__, rule=is_in_service("以图搜番"))
+ Service.__init__(
+ self, "以图搜番", "通过一张图片搜索你需要的番!据说里*也可以", rule=is_in_service("以图搜番")
+ )
@staticmethod
async def _request(url: str) -> dict:
diff --git a/ATRI/plugins/applet/data_source.py b/ATRI/plugins/applet/data_source.py
index 3fc1bc5..60ca581 100644
--- a/ATRI/plugins/applet/data_source.py
+++ b/ATRI/plugins/applet/data_source.py
@@ -15,12 +15,12 @@ s = [11, 10, 3, 8, 4, 6]
xor = 177451812
add = 8728348608
-__doc__ = "啥b腾讯小程序给👴爪巴\n目前只整了b站的"
-
class Applet(Service):
def __init__(self):
- Service.__init__(self, "小程序处理", __doc__, rule=is_in_service("小程序处理"))
+ Service.__init__(
+ self, "小程序处理", "啥b腾讯小程序给👴爪巴\n目前只整了b站的", rule=is_in_service("小程序处理")
+ )
@staticmethod
def _bv_dec(x) -> str:
diff --git a/ATRI/plugins/bilibili_dynamic/__init__.py b/ATRI/plugins/bilibili_dynamic/__init__.py
index 82fbb4d..a42a0e3 100644
--- a/ATRI/plugins/bilibili_dynamic/__init__.py
+++ b/ATRI/plugins/bilibili_dynamic/__init__.py
@@ -1,35 +1,37 @@
+import re
+from tabulate import tabulate
+from datetime import datetime, timedelta
+
import pytz
from apscheduler.triggers.base import BaseTrigger
from apscheduler.triggers.combining import AndTrigger
from apscheduler.triggers.interval import IntervalTrigger
-from ATRI.utils.apscheduler import scheduler
-from ATRI.utils import timestamp2datetime
-
from nonebot.params import State
from nonebot.adapters.onebot.v11 import MessageSegment, GroupMessageEvent, Message
from nonebot.typing import T_State
from nonebot import get_bot
-from .data_source import BilibiliDynamicSubscriptor
-import re
-from tabulate import tabulate
-from datetime import datetime, timedelta
+from ATRI.utils.apscheduler import scheduler
+from ATRI.utils import timestamp2datetime
from ATRI.log import logger
+from .data_source import BilibiliDynamicSubscriptor
+
+
bilibili_dynamic = BilibiliDynamicSubscriptor().on_command(
- "/bilibili_dynamic", "b站动态订阅助手", aliases={"b站动态"}
+ "/bilibili_dynamic", "b站动态订阅助手", aliases={"/bd", "b站动态"}
)
-__help__ = """欢迎使用【b站动态订阅助手】~
-目前支持的功能如下...请选择:
+__help__ = """好哦!是b站动态订阅诶~
+目前支持的功能如下...请键入对应关键词:
1.添加订阅
2.取消订阅
3.订阅列表
-----------------------------------
-用法示例1:/bilibili_dynamic 添加订阅
-用法示例2:/bilibili_dynamic 取消订阅 401742377(数字uid)
-用法示例3:/bilibili_dynamic 订阅列表"""
+用法示例1:/bd 添加订阅
+用法示例2:/bd 取消订阅 401742377(数字uid)
+用法示例3:/bd 订阅列表"""
def help() -> str:
@@ -39,7 +41,6 @@ def help() -> str:
@bilibili_dynamic.handle()
async def _menu(event: GroupMessageEvent, state: T_State = State()):
args = str(event.get_plaintext()).strip().lower().split()[1:]
- # print(args)
if not args:
await bilibili_dynamic.finish(help())
elif args and len(args) == 1:
@@ -58,7 +59,6 @@ async def handle_subcommand(event: GroupMessageEvent, state: T_State = State()):
if state["sub_command"] == "订阅列表":
subscriptor = BilibiliDynamicSubscriptor()
- # print(event.group_id)
r = await subscriptor.get_subscriptions(query_map={"groupid": event.group_id})
subs = []
for s in r:
@@ -80,8 +80,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()):
if uid == "-1":
await bilibili_dynamic.finish("已经成功退出订阅~")
- # print(state)
- # print(uid)
if not re.match(r"^\d+$", uid):
await bilibili_dynamic.reject("这似乎不是UID呢, 请重新输入:")
uid = int(uid)
@@ -107,7 +105,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()):
)
)
success = await subscriptor.add_subscription(uid, event.group_id)
- print(success)
success = success and (
await subscriptor.update_subscription_by_uid(
uid=uid,
@@ -143,7 +140,7 @@ class BilibiliDynamicCheckEnabledTrigger(BaseTrigger):
# 实现abstract方法 <get_next_fire_time>
def get_next_fire_time(self, previous_fire_time, now):
subscriptor = BilibiliDynamicSubscriptor()
- config = subscriptor.load_service()
+ config = subscriptor.load_service("b站动态订阅")
if config["enabled"] == False:
return None
else:
@@ -156,8 +153,8 @@ class BilibiliDynamicCheckEnabledTrigger(BaseTrigger):
@scheduler.scheduled_job(
AndTrigger([IntervalTrigger(seconds=10), BilibiliDynamicCheckEnabledTrigger()]),
name="b站动态检查",
- max_instances=3,
- misfire_grace_time=60,
+ max_instances=3, # type: ignore
+ misfire_grace_time=60, # type: ignore
)
async def _check_dynamic():
from ATRI.database.models import Subscription
@@ -171,7 +168,6 @@ async def _check_dynamic():
d: Subscription = tq.get()
logger.info("准备查询UP【{up}】的动态 队列剩余{size}".format(up=d.nickname, size=tq.qsize()))
ts = int(d.last_update.timestamp())
- # logger.info("上一次更新的时间戳{time}".format(time=ts))
info: dict = await subscriptor.get_recent_dynamic_by_uid(d.uid)
res = []
if info:
@@ -181,11 +177,9 @@ async def _check_dynamic():
if len(res) == 0:
logger.warning("获取UP【{up}】的动态为空".format(up=d.nickname))
for i in res:
- # logger.info("获取UP【{up}】的动态,时间{time},内容{content}".format(up=d.nickname, time=i["timestamp"],content=i["content"][:20]))
i["name"] = d.nickname
if ts < i["timestamp"]:
text, pic_url = subscriptor.generate_output(pattern=i)
- # print(text,pic_url)
output = Message(
[MessageSegment.text(text), MessageSegment.image(pic_url)]
)
diff --git a/ATRI/plugins/bilibili_dynamic/data_source.py b/ATRI/plugins/bilibili_dynamic/data_source.py
index 49ffce7..a419b45 100644
--- a/ATRI/plugins/bilibili_dynamic/data_source.py
+++ b/ATRI/plugins/bilibili_dynamic/data_source.py
@@ -11,13 +11,11 @@ import asyncio
from typing import Any
from operator import itemgetter
-__doc__ = """b站订阅动态助手
-"""
__session_pool = {}
-def get_api(field: str):
+def get_api(field: str) -> dict:
"""
获取 API。
@@ -33,9 +31,11 @@ def get_api(field: str):
if os.path.exists(path):
with open(path, encoding="utf8") as f:
return json.loads(f.read())
+ else:
+ return dict()
-API = get_api("user")
+API: dict = get_api("user")
def get_session():
@@ -57,12 +57,12 @@ def get_session():
async def bilibili_request(
method: str,
url: str,
- params: dict = None,
+ params: dict = dict(),
data: Any = None,
no_csrf: bool = False,
json_body: bool = False,
**kwargs,
-):
+) -> dict:
"""
向接口发送请求。
@@ -130,21 +130,21 @@ async def bilibili_request(
# 检查响应头 Content-Length
content_length = resp.headers.get("content-length")
if content_length and int(content_length) == 0:
- return None
+ return dict()
# 检查响应头 Content-Type
content_type = resp.headers.get("content-type")
# 不是 application/json
- if content_type.lower().index("application/json") == -1:
+ if content_type.lower().index("application/json") == -1: # type: ignore
raise Exception("响应不是 application/json 类型")
raw_data = await resp.text()
- resp_data: dict
+ resp_data: dict = dict()
if "callback" in params:
# JSONP 请求
- resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1))
+ resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) # type: ignore
else:
# JSON
resp_data = json.loads(raw_data)
@@ -181,9 +181,9 @@ class User:
"""
self.uid = uid
- self.__self_info = None
+ self.__self_info = None # 暂时无用
- async def get_user_info(self):
+ async def get_user_info(self) -> dict:
"""
获取用户信息(昵称,性别,生日,签名,头像 URL,空间横幅 URL 等)
@@ -216,7 +216,7 @@ class User:
"offset_dynamic_id": offset,
"need_top": 1 if need_top else 0,
}
- data = await bilibili_request("GET", url=api["url"], params=params)
+ data: dict = await bilibili_request("GET", url=api["url"], params=params)
# card 字段自动转换成 JSON。
if "cards" in data:
for card in data["cards"]:
@@ -227,7 +227,7 @@ class User:
class BilibiliDynamicSubscriptor(Service):
def __init__(self):
- Service.__init__(self, "b站动态订阅", __doc__, rule=is_in_service("b站动态订阅"))
+ Service.__init__(self, "b站动态订阅", "b站订阅动态助手", rule=is_in_service("b站动态订阅"))
async def add_subscription(self, uid: int, groupid: int) -> bool:
async with DB() as db:
@@ -261,7 +261,7 @@ class BilibiliDynamicSubscriptor(Service):
async def get_upname_by_uid(self, uid: int) -> str:
try:
u = User(uid)
- info = await u.get_user_info()
+ info: dict = await u.get_user_info()
return info.get("name")
except:
return ""
@@ -334,7 +334,7 @@ class BilibiliDynamicSubscriptor(Service):
ret = sorted(ret, key=itemgetter("timestamp"))
return ret
- def generate_output(self, pattern: dict) -> (str, str):
+ def generate_output(self, pattern: dict) -> tuple:
# 限制摘要的字数
abstractLimit = 40
text_part = """【UP名称】{name}\n【动态类型】{dynamic_type}\n【时间】{time}\n【内容摘要】{content}\n""".format(
diff --git a/ATRI/plugins/code_runner/__init__.py b/ATRI/plugins/code_runner/__init__.py
index adf6407..77240ec 100644
--- a/ATRI/plugins/code_runner/__init__.py
+++ b/ATRI/plugins/code_runner/__init__.py
@@ -11,33 +11,43 @@ from .data_source import CodeRunner
_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
-code_runner = CodeRunner().on_command("/code", "在线运行一段代码,帮助:/code help")
+code_runner = CodeRunner().on_command("/code", "在线运行一段代码,获取帮助:/code.help")
@code_runner.handle([Cooldown(5, prompt=_flmt_notice)])
-async def _code_runner(
- matcher: Matcher, event: MessageEvent, args: Message = CommandArg()
-):
- user_id = event.get_user_id()
+async def _code_runner(matcher: Matcher, args: Message = CommandArg()):
msg = args.extract_plain_text()
if msg:
matcher.set_arg("opt", args)
else:
- content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!"
+ content = "请键入 /code.help 以获取帮助~!"
await code_runner.finish(Message(content))
-@code_runner.got("opt")
+@code_runner.got("opt", prompt="需要运行的语言及代码?\n获取帮助:/code.help")
async def _(event: MessageEvent, opt: str = ArgPlainText("opt")):
user_id = event.get_user_id()
- msg = opt.split("\n")
- if msg[0] == "help":
- content = f"> {MessageSegment.at(user_id)}\n" + "请键入 /code help 以获取帮助~!"
- elif msg[0] == "list":
- content = f"> {MessageSegment.at(user_id)}\n" + CodeRunner().list_supp_lang()
- else:
- content = MessageSegment.at(user_id) + await CodeRunner().runner(unescape(opt))
+ # 拯救傻瓜用户
+ if opt == "/code.help":
+ await code_runner.finish(CodeRunner().help())
+ content = MessageSegment.at(user_id) + str(await CodeRunner().runner(unescape(opt)))
await code_runner.finish(Message(content))
+
+
+code_runner_helper = CodeRunner().cmd_as_group("help", "使用说明")
+
+
+@code_runner_helper.handle()
+async def _():
+ await code_runner_helper.finish(CodeRunner().help())
+
+
+code_supp_list = CodeRunner().cmd_as_group("list", "查看支持的语言")
+
+
+@code_supp_list.handle()
+async def _():
+ await code_supp_list.finish(CodeRunner().list_supp_lang())
diff --git a/ATRI/plugins/code_runner/data_source.py b/ATRI/plugins/code_runner/data_source.py
index bf3d8b2..87ae792 100644
--- a/ATRI/plugins/code_runner/data_source.py
+++ b/ATRI/plugins/code_runner/data_source.py
@@ -34,13 +34,11 @@ SUPPORTED_LANGUAGES = {
}
-__doc__ = """在线跑代码
-"""
-
-
class CodeRunner(Service):
def __init__(self):
- Service.__init__(self, "在线跑代码", __doc__, rule=is_in_service("在线跑代码"))
+ Service.__init__(
+ self, "在线跑代码", "在线跑代码", rule=is_in_service("在线跑代码"), main_cmd="/code"
+ )
@staticmethod
def help() -> str:
diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py
index 6ec5401..37c18a4 100644
--- a/ATRI/plugins/essential.py
+++ b/ATRI/plugins/essential.py
@@ -2,6 +2,7 @@ import os
import json
import shutil
import asyncio
+from time import sleep
from datetime import datetime
from pydantic.main import BaseModel
from random import choice, randint
@@ -33,6 +34,7 @@ from ATRI.log import logger as log
from ATRI.config import BotSelfConfig
from ATRI.utils import MessageChecker
from ATRI.utils.apscheduler import scheduler
+from ATRI.utils.check_update import CheckUpdate
driver = ATRI.driver()
@@ -50,6 +52,24 @@ os.makedirs(TEMP_PATH, exist_ok=True)
@driver.on_startup
async def startup():
log.info(f"Now running: {ATRI.__version__}")
+
+ try:
+ log.info("Starting to check update...")
+ log.info(await CheckUpdate.show_latest_commit_info())
+ sleep(1)
+
+ l_v, l_v_t = await CheckUpdate.show_latest_version()
+ if l_v != ATRI.__version__:
+ log.warning("New version has been released, please update.")
+ log.warning(f"Latest version: {l_v} Update time: {l_v_t}")
+ sleep(3)
+ except Exception:
+ log.error("检查 更新/最新推送 失败...")
+
+ if not scheduler.running:
+ scheduler.start()
+ log.info("Scheduler Started.")
+
log.info("アトリは、高性能ですから!")
@@ -106,14 +126,9 @@ class GroupRequestInfo(BaseModel):
is_approve: bool
-__doc__ = """
-对bot基础/必须请求进行处理
-"""
-
-
class Essential(Service):
def __init__(self):
- Service.__init__(self, "基础部件", __doc__)
+ Service.__init__(self, "基础部件", "对bot基础/必须请求进行处理")
friend_add_event = Essential().on_request("好友添加", "好友添加检测")
@@ -279,6 +294,7 @@ async def _recall_group_event(bot: Bot, event: GroupRecallNoticeEvent):
except BaseException:
return
+ log.debug(f"Recall raw msg:\n{repo}")
user = event.user_id
group = event.group_id
repo = repo["message"]
@@ -310,6 +326,7 @@ async def _recall_private_event(bot: Bot, event: FriendRecallNoticeEvent):
except BaseException:
return
+ log.debug(f"Recall raw msg:\n{repo}")
user = event.user_id
repo = repo["message"]
@@ -347,7 +364,7 @@ async def _():
await acc_recall.finish("现在可以接受撤回信息啦!")
[email protected]_job("interval", name="清除缓存", minutes=30, misfire_grace_time=5)
[email protected]_job("interval", name="清除缓存", minutes=30, misfire_grace_time=5) # type: ignore
async def _clear_cache():
try:
shutil.rmtree(TEMP_PATH)
diff --git a/ATRI/plugins/funny/data_source.py b/ATRI/plugins/funny/data_source.py
index 8edc88a..75fa4ee 100644
--- a/ATRI/plugins/funny/data_source.py
+++ b/ATRI/plugins/funny/data_source.py
@@ -17,14 +17,9 @@ FUNNY_DIR = Path(".") / "data"
os.makedirs(FUNNY_DIR, exist_ok=True)
-__doc__ = """
-乐1乐,莫当真
-"""
-
-
class Funny(Service):
def __init__(self):
- Service.__init__(self, "乐", __doc__, rule=is_in_service("乐"))
+ Service.__init__(self, "乐", "乐1乐,莫当真", rule=is_in_service("乐"))
@staticmethod
async def idk_laugh(name: str) -> str:
diff --git a/ATRI/plugins/help/__init__.py b/ATRI/plugins/help/__init__.py
index d4b8b36..c6f915e 100644
--- a/ATRI/plugins/help/__init__.py
+++ b/ATRI/plugins/help/__init__.py
@@ -3,34 +3,31 @@ from nonebot.adapters.onebot.v11 import MessageEvent
from .data_source import Helper
-main_help = Helper().on_command("菜单", "获取食用bot的方法", aliases={"menu"})
+menu = Helper().on_command("菜单", "获取食用bot的方法", aliases={"menu"})
-@main_help.handle()
-async def _main_help():
- repo = Helper().menu()
- await main_help.finish(repo)
+async def _():
+ await menu.finish(Helper().menu())
-about_me = Helper().on_command("关于", "获取关于bot的信息", aliases={"about"})
+about = Helper().on_command("关于", "获取关于bot的信息", aliases={"about"})
-@about_me.handle()
-async def _about_me():
- repo = Helper().about()
- await about_me.finish(repo)
+async def _():
+ await about.finish(Helper().about())
-service_list = Helper().on_command("服务列表", "查看所有可用服务", aliases={"功能列表"})
+service_list = Helper().on_command("服务列表", "获取服务列表", aliases={"功能列表"})
@service_list.handle()
-async def _service_list():
- repo = Helper().service_list()
- await service_list.finish(repo)
+async def _():
+ await service_list.finish(Helper().service_list())
-service_info = Helper().on_command("帮助", "获取服务详细帮助", aliases={"help"})
+service_info = Helper().on_command("帮助", "获取对应服务详细信息", aliases={"help"})
@service_info.handle()
diff --git a/ATRI/plugins/help/data_source.py b/ATRI/plugins/help/data_source.py
index f96d59e..e3dd1ff 100644
--- a/ATRI/plugins/help/data_source.py
+++ b/ATRI/plugins/help/data_source.py
@@ -72,7 +72,7 @@ class Helper(Service):
)
table = tabulate(
services,
- headers=["服务名称", "开启状态", "仅支持管理员"],
+ headers=["服务名称", "开启状态(全局)", "仅支持管理员"],
tablefmt="plain",
showindex=True,
)
@@ -91,7 +91,7 @@ class Helper(Service):
service_enabled = data.get("enabled", True)
_service_cmd_list = list(data.get("cmd_list", {"error"}))
- service_cmd_list = "、".join(map(str, _service_cmd_list))
+ service_cmd_list = "\n".join(map(str, _service_cmd_list))
repo = SERVICE_INFO_FORMAT.format(
service=service_name,
diff --git a/ATRI/plugins/chat/__init__.py b/ATRI/plugins/kimo/__init__.py
index 4219d56..b1c5698 100644
--- a/ATRI/plugins/chat/__init__.py
+++ b/ATRI/plugins/kimo/__init__.py
@@ -6,27 +6,27 @@ from nonebot.adapters.onebot.v11 import MessageEvent, Message
from nonebot.adapters.onebot.v11.helpers import Cooldown
from ATRI.utils.apscheduler import scheduler
-from .data_source import Chat
+from .data_source import Kimo
_chat_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~", "我开始为你以后的伴侣担心了..."])
-chat = Chat().on_message("文爱", "闲聊(文爱")
+kimo = Kimo().on_message("文爱", "闲聊(文爱")
[email protected]([Cooldown(3, prompt=_chat_flmt_notice)])
[email protected]([Cooldown(3, prompt=_chat_flmt_notice)])
async def _chat(event: MessageEvent):
user_id = event.get_user_id()
msg = str(event.message)
- repo = await Chat().deal(msg, user_id)
+ repo = await Kimo().deal(msg, user_id)
try:
- await chat.finish(repo)
+ await kimo.finish(repo)
except Exception:
return
-my_name_is = Chat().on_command("叫我", "更改闲聊(文爱)时的称呼", aliases={"我是"}, priority=1)
+my_name_is = Kimo().on_command("叫我", "更改kimo时的称呼", aliases={"我是"}, priority=1)
@my_name_is.handle([Cooldown(3, prompt=_chat_flmt_notice)])
@@ -47,28 +47,13 @@ async def _deal_name(event: MessageEvent, new_name: str = ArgPlainText("name")):
"很不错的称呼呢w",
]
)
- Chat().name_is(user_id, new_name)
+ Kimo().name_is(user_id, new_name)
await my_name_is.finish(repo)
-say = Chat().on_command("说", "别人让我说啥就说啥(", priority=1)
-
-
[email protected]([Cooldown(3, prompt=_chat_flmt_notice)])
-async def _ready_say(matcher: Matcher, args: Message = CommandArg()):
- msg = args.extract_plain_text()
- if msg:
- matcher.set_arg("say", args)
-
-
[email protected]("say", "想要咱复读啥呢...")
-async def _deal_say(msg: str = ArgPlainText("say")):
- await say.finish(msg)
-
-
[email protected]_job("interval", name="闲聊词库检查更新", hours=3, misfire_grace_time=60)
[email protected]_job("interval", name="kimo词库检查更新", hours=3, misfire_grace_time=60) # type: ignore
async def _check_kimo():
try:
- await Chat().update_data()
+ await Kimo().update_data()
except BaseException:
pass
diff --git a/ATRI/plugins/chat/data_source.py b/ATRI/plugins/kimo/data_source.py
index 576db03..6dd448a 100644
--- a/ATRI/plugins/chat/data_source.py
+++ b/ATRI/plugins/kimo/data_source.py
@@ -11,19 +11,15 @@ from ATRI.utils import request
from ATRI.exceptions import ReadFileError, WriteError
-__doc__ = """
-好像有点涩?
-"""
-
-CHAT_PATH = Path(".") / "data" / "database" / "chat"
+CHAT_PATH = Path(".") / "data" / "database" / "kimo"
os.makedirs(CHAT_PATH, exist_ok=True)
KIMO_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json"
-class Chat(Service):
+class Kimo(Service):
def __init__(self):
Service.__init__(
- self, "闲聊", __doc__, rule=to_bot() & is_in_service("闲聊"), priority=5
+ self, "kimo", "好像有点涩?", rule=to_bot() & is_in_service("kimo"), priority=5
)
@staticmethod
@@ -73,7 +69,7 @@ class Chat(Service):
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
- log.info("闲聊词库更新完成")
+ log.info("kimo词库更新完成")
@staticmethod
def name_is(user_id: str, new_name: str):
diff --git a/ATRI/plugins/manage/data_source.py b/ATRI/plugins/manage/data_source.py
index bc9f801..395a165 100644
--- a/ATRI/plugins/manage/data_source.py
+++ b/ATRI/plugins/manage/data_source.py
@@ -6,6 +6,7 @@ from datetime import datetime
from ATRI.service import Service, ServiceTools
from ATRI.exceptions import ReadFileError, load_error
+
MANAGE_DIR = Path(".") / "data" / "database" / "manege"
ESSENTIAL_DIR = Path(".") / "data" / "database" / "essential"
os.makedirs(MANAGE_DIR, exist_ok=True)
@@ -17,12 +18,10 @@ Time: {time}
{content}
""".strip()
-__doc__ = """控制bot的各项服务"""
-
class Manage(Service):
def __init__(self):
- Service.__init__(self, "管理", __doc__, True)
+ Service.__init__(self, "管理", "控制bot的各项服务", True)
@staticmethod
def _load_block_user_list() -> dict:
diff --git a/ATRI/plugins/polaroid/__init__.py b/ATRI/plugins/polaroid/__init__.py
new file mode 100644
index 0000000..a3d2d61
--- /dev/null
+++ b/ATRI/plugins/polaroid/__init__.py
@@ -0,0 +1,27 @@
+from random import choice
+
+from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment, Message
+from nonebot.adapters.onebot.v11.helpers import Cooldown
+
+from ATRI.rule import to_bot
+from .data_source import Polaroid, TEMP_PATH
+
+
+_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
+
+
+polaroid = Polaroid().on_command("拍立得", "获取一张以自己头像的拍立得图片!需at", rule=to_bot())
+
+
[email protected]([Cooldown(15, prompt=_flmt_notice)])
+async def _(event: MessageEvent):
+ user_id = event.get_user_id()
+
+ temp_p = TEMP_PATH / f"{user_id}.png"
+ if not temp_p.is_file():
+ pol = await Polaroid().generate(user_id)
+ result = MessageSegment.image(pol)
+ else:
+ result = MessageSegment.image(temp_p)
+
+ await polaroid.finish(Message(result))
diff --git a/ATRI/plugins/polaroid/data_source.py b/ATRI/plugins/polaroid/data_source.py
new file mode 100644
index 0000000..82e0337
--- /dev/null
+++ b/ATRI/plugins/polaroid/data_source.py
@@ -0,0 +1,60 @@
+import asyncio
+
+from ATRI.service import Service
+from ATRI.rule import is_in_service
+from ATRI.utils import request
+from ATRI.log import logger as log
+from ATRI.exceptions import RequestError, WriteError
+from .image_dealer import image_dealer
+
+
+TENCENT_AVATER_URL = "https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
+SOURCE_URL = "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/"
+
+
+class Polaroid(Service):
+ def __init__(self):
+ Service.__init__(self, "拍立得", "根据头像生成拍立得风格照片!", rule=is_in_service("拍立得"))
+
+ @classmethod
+ async def _request(cls, user_id: str) -> bytes:
+ try:
+ res = await request.get(TENCENT_AVATER_URL.format(user_id=user_id))
+ except RequestError:
+ raise RequestError("Request failed!")
+ data = res.read()
+ return data
+
+ @classmethod
+ async def generate(cls, user_id: str):
+ await init_source()
+
+ user_avater = await cls._request(user_id)
+
+ result = image_dealer(user_avater, user_id)
+ return f"file:///{result}"
+
+
+from .image_dealer import TEMP_PATH, POLAROID_DIR
+
+
+async def init_source():
+ files = ["frame-0.PNG", "frame-1.PNG", "font-0.ttf"]
+
+ for i in files:
+ path = POLAROID_DIR / i
+ if not path.is_file():
+ log.warning("插件 polaroid 缺少所需资源,装载中")
+
+ url = SOURCE_URL + i
+ data = await request.get(url)
+ try:
+ with open(path, "wb") as w:
+ w.write(data.read())
+ log.info("所需资源装载完成")
+ except WriteError:
+ raise WriteError("装载资源失败")
+
+
+loop = asyncio.get_event_loop()
+loop.create_task(init_source())
diff --git a/ATRI/plugins/polaroid/image_dealer.py b/ATRI/plugins/polaroid/image_dealer.py
new file mode 100644
index 0000000..c1ab50b
--- /dev/null
+++ b/ATRI/plugins/polaroid/image_dealer.py
@@ -0,0 +1,46 @@
+from random import choice
+from pathlib import Path
+from datetime import datetime
+
+from PIL import Image, ImageFont, ImageDraw
+
+
+POLAROID_DIR = Path(".") / "data" / "database" / "polaroid"
+TEMP_PATH = Path(".") / "data" / "temp"
+POLAROID_DIR.mkdir(exist_ok=True)
+
+
+def image_dealer(user_img: bytes, user_id):
+ user_p = str((TEMP_PATH / f"{user_id}.png").absolute())
+
+ with open(user_p, "wb") as w:
+ w.write(user_img)
+
+ bg = Image.new("RGBA", (689, 1097), color=(0, 0, 0, 0))
+
+ pol_frame = Image.open(POLAROID_DIR / f"frame-{choice([0, 1])}.PNG").convert("RGBA")
+ user = Image.open(user_p).convert("RGBA").resize((800, 800), Image.ANTIALIAS)
+
+ _, _, _, a = pol_frame.split()
+
+ f_path = str(POLAROID_DIR / "font-0.ttf")
+ f_date = ImageFont.truetype(f_path, 100) # type: ignore
+ f_msg = ImageFont.truetype(f_path, 110) # type: ignore
+
+ bg.paste(user, (0, 53))
+ bg.paste(pol_frame, (0, 0), a)
+
+ msg = f"今天辛苦了{choice(['!', '❤️', '✨'])}"
+ now = datetime.now().strftime("%m / %d")
+
+ img = ImageDraw.Draw(bg)
+ img.text((25, 805), now, font=f_date, fill=(99, 184, 255))
+ img.text(
+ (15, 915),
+ msg,
+ font=f_msg,
+ fill=(255, 100, 97),
+ )
+
+ bg.save(user_p)
+ return user_p
diff --git a/ATRI/plugins/repo.py b/ATRI/plugins/repo.py
index 9c5818a..7087afa 100644
--- a/ATRI/plugins/repo.py
+++ b/ATRI/plugins/repo.py
@@ -15,7 +15,7 @@ _repo_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~
REPO_FORMAT = """
来自用户{user}反馈:
{msg}
-"""
+""".strip()
class Repo(Service):
diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py
index d3ad38f..071af98 100644
--- a/ATRI/plugins/saucenao/__init__.py
+++ b/ATRI/plugins/saucenao/__init__.py
@@ -1,4 +1,3 @@
-from re import findall
from random import choice
from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment
diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py
index 80b6b52..8350839 100644
--- a/ATRI/plugins/saucenao/data_source.py
+++ b/ATRI/plugins/saucenao/data_source.py
@@ -1,28 +1,33 @@
from random import choice
-from ATRI.service import Service
+from ATRI.service import Service, ServiceTools
from ATRI.rule import is_in_service
from ATRI.exceptions import RequestError
from ATRI.utils import request
+from ATRI.config import SauceNAO as sa
+from ATRI.log import logger as log
-URL = "https://saucenao.com/search.php"
-__doc__ = """
-以图搜图,仅限二刺螈
-"""
+URL = "https://saucenao.com/search.php"
class SaouceNao(Service):
def __init__(
self,
- api_key: str = None,
+ api_key: str = str(),
output_type=2,
testmode=1,
dbmaski=32768,
db=5,
numres=5,
):
- Service.__init__(self, "以图搜图", __doc__, rule=is_in_service("以图搜图"))
+ Service.__init__(self, "以图搜图", "以图搜图,仅限二刺螈", rule=is_in_service("以图搜图"))
+
+ if not sa.key:
+ data = ServiceTools.load_service("以图搜图")
+ data["enabled"] = False
+ ServiceTools.save_service(data, "以图搜图")
+ log.warning("插件 SauceNao 所需的 key 未配置,将被全局禁用,后续填写请手动启用")
params = dict()
params["api_key"] = api_key
diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py
index cf38fd6..00eba07 100644
--- a/ATRI/plugins/setu/__init__.py
+++ b/ATRI/plugins/setu/__init__.py
@@ -9,7 +9,6 @@ from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message, MessageSegme
from nonebot.adapters.onebot.v11.helpers import extract_image_urls, Cooldown
from ATRI.config import BotSelfConfig
-from ATRI.utils.apscheduler import scheduler
from .data_source import Setu
@@ -171,27 +170,6 @@ async def _deal_setting(msg: str = ArgPlainText("catcher_set")):
await catcher_setting.finish(repo)
- "interval", name="涩批诱捕器", hours=1, misfire_grace_time=60, args=[Bot]
-)
-async def _scheduler_setu(bot):
- try:
- group_list = await bot.get_group_list()
- lucky_group = choice(group_list)
- group_id = lucky_group["group_id"]
- setu = await Setu().scheduler()
- if not setu:
- return
-
- msg_0 = await bot.send_group_msg(group_id=int(group_id), message=Message(setu))
- message_id = msg_0["message_id"]
- await asyncio.sleep(60)
- await bot.delete_msg(message_id=message_id)
-
- except Exception:
- pass
-
-
_ag_l = ["涩图来", "来点涩图", "来份涩图"]
_ag_patt = r"来[张点丶份](.*?)的[涩色🐍]图"
diff --git a/ATRI/plugins/setu/data_source.py b/ATRI/plugins/setu/data_source.py
index b7c5162..e7d769d 100644
--- a/ATRI/plugins/setu/data_source.py
+++ b/ATRI/plugins/setu/data_source.py
@@ -1,5 +1,4 @@
import asyncio
-from random import choice
from nonebot.adapters.onebot.v11 import Bot, MessageSegment
from ATRI.service import Service
@@ -78,30 +77,6 @@ class Setu(Service):
data = await detect_image(url, file_size)
return data
- @classmethod
- async def scheduler(cls) -> str:
- """
- 每隔指定时间随机抽取一个群发送涩图.
- 格式:
- 是{tag}哦~❤
- {setu}
- """
- res = await request.get(LOLICON_URL)
- data: dict = res.json()
- temp_data: dict = data.get("data", list())
- if not temp_data:
- return ""
-
- tag = choice(temp_data.get("tags", ["女孩子"]))
-
- url = temp_data[0]["urls"].get(
- "original",
- cls._use_proxy(DEFAULT_SETU),
- )
- setu = MessageSegment.image(url, timeout=114514)
- repo = f"是{tag}哦~❤\n{setu}"
- return repo
-
@staticmethod
async def async_recall(bot: Bot, event_id):
await asyncio.sleep(30)
diff --git a/ATRI/plugins/status/__init__.py b/ATRI/plugins/status/__init__.py
index 3e18e1d..05ece79 100644
--- a/ATRI/plugins/status/__init__.py
+++ b/ATRI/plugins/status/__init__.py
@@ -22,8 +22,8 @@ async def _status():
info_msg = "アトリは高性能ですから!"
[email protected]_job("interval", name="状态检查", minutes=10, misfire_grace_time=15)
-async def _status_checking():
[email protected]_job("interval", name="状态检查", minutes=10, misfire_grace_time=15) # type: ignore
+async def _check_runtime():
msg, stat = IsSurvive().get_status()
if not stat:
await status.finish(msg)
diff --git a/ATRI/plugins/status/data_source.py b/ATRI/plugins/status/data_source.py
index 74882de..3595c0c 100644
--- a/ATRI/plugins/status/data_source.py
+++ b/ATRI/plugins/status/data_source.py
@@ -8,12 +8,24 @@ from ATRI.rule import is_in_service
from ATRI.exceptions import GetStatusError
-__doc__ = "检查咱自身状态"
+_status_msg = """
+> Status Overview
+
+[CPU: {cpu}%]
+[Memory: {mem}%]
+[Disk usage: {disk}%]
+
+[Net sent: {inteSENT}MB]
+[Net recv: {inteRECV}MB]
+
+[Runtime: {up_time}]
+{msg}
+""".strip()
class IsSurvive(Service):
def __init__(self):
- Service.__init__(self, "状态", __doc__, rule=is_in_service("状态"))
+ Service.__init__(self, "状态", "检查自身状态", rule=is_in_service("状态"))
@staticmethod
def ping() -> str:
@@ -55,14 +67,14 @@ class IsSurvive(Service):
log.info("资源占用正常")
is_ok = True
- msg0 = (
- "Self status:\n"
- f"* CPU: {cpu}%\n"
- f"* MEM: {mem}%\n"
- f"* DISK: {disk}%\n"
- f"* netSENT: {inteSENT}MB\n"
- f"* netRECV: {inteRECV}MB\n"
- f"* Runtime: {up_time}\n"
- ) + msg
+ msg0 = _status_msg.format(
+ cpu=cpu,
+ mem=mem,
+ disk=disk,
+ inteSENT=inteSENT,
+ inteRECV=inteRECV,
+ up_time=up_time,
+ msg=msg,
+ )
return msg0, is_ok
diff --git a/ATRI/plugins/util/data_source.py b/ATRI/plugins/util/data_source.py
index 7b36ebe..4a73c26 100644
--- a/ATRI/plugins/util/data_source.py
+++ b/ATRI/plugins/util/data_source.py
@@ -8,14 +8,9 @@ from ATRI.service import Service
from ATRI.rule import is_in_service
-__doc__ = """
-非常实用(?)的工具们!
-"""
-
-
class Utils(Service):
def __init__(self):
- Service.__init__(self, "小工具", __doc__, rule=is_in_service("小工具"))
+ Service.__init__(self, "小工具", "非常实用(?)的工具们!", rule=is_in_service("小工具"))
@staticmethod
def roll_dice(par: str) -> str:
diff --git a/ATRI/service.py b/ATRI/service.py
index 9f680c8..c9509df 100644
--- a/ATRI/service.py
+++ b/ATRI/service.py
@@ -4,7 +4,7 @@ import json
from pathlib import Path
from types import ModuleType
from pydantic import BaseModel
-from typing import List, Set, Tuple, Type, Union, Optional, TYPE_CHECKING
+from typing import List, Set, Tuple, Type, Union, Optional
from nonebot.matcher import Matcher
from nonebot.permission import Permission
@@ -19,9 +19,6 @@ from nonebot.rule import Rule, command, keyword, regex
from ATRI.exceptions import ReadFileError, WriteError
-if TYPE_CHECKING:
- from nonebot.adapters import Bot, Event
-
SERVICE_DIR = Path(".") / "data" / "service"
SERVICES_DIR = SERVICE_DIR / "services"
@@ -69,7 +66,7 @@ class Service:
def __init__(
self,
service: str,
- docs: str = None,
+ docs: str,
only_admin: bool = False,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
permission: Optional[Permission] = None,
@@ -77,6 +74,7 @@ class Service:
temp: bool = False,
priority: int = 1,
state: Optional[T_State] = None,
+ main_cmd: str = str(),
):
self.service = service
self.docs = docs
@@ -87,13 +85,9 @@ class Service:
self.temp = temp
self.priority = priority
self.state = state
+ self.main_cmd = (main_cmd,)
- def _generate_service_config(self, service: str = None, docs: str = None) -> None:
- if not service:
- service = self.service
- if not docs:
- docs = self.docs or str()
-
+ def _generate_service_config(self, service: str, docs: str = str()) -> None:
path = SERVICES_DIR / f"{service}.json"
data = ServiceInfo(
service=service,
@@ -110,31 +104,28 @@ class Service:
except WriteError:
raise WriteError("Write service info failed!")
- def save_service(self, service_data: dict, service: str = None) -> None:
+ def save_service(self, service_data: dict, service: str) -> None:
if not service:
service = self.service
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
- self._generate_service_config()
+ self._generate_service_config(service, self.docs)
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(service_data, indent=4))
- def load_service(self, service: str = None) -> dict:
- if not service:
- service = self.service
-
+ def load_service(self, service: str) -> dict:
path = SERVICES_DIR / f"{service}.json"
if not path.is_file():
- self._generate_service_config()
+ self._generate_service_config(service, self.docs)
try:
data = json.loads(path.read_bytes())
except ReadFileError:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
- self._generate_service_config()
+ self._generate_service_config(service, self.docs)
data = json.loads(path.read_bytes())
return data
@@ -142,25 +133,25 @@ class Service:
data = self.load_service(self.service)
temp_data: dict = data["cmd_list"]
temp_data.update(cmds)
- self.save_service(data)
+ self.save_service(data, self.service)
def _load_cmds(self) -> dict:
path = SERVICES_DIR / f"{self.service}.json"
if not path.is_file():
- self._generate_service_config()
+ self._generate_service_config(self.service, self.docs)
data = json.loads(path.read_bytes())
return data["cmd_list"]
def on_message(
self,
- name: str = None,
+ name: str = str(),
docs: str = str(),
rule: Optional[Union[Rule, T_RuleChecker]] = None,
permission: Optional[Union[Permission, T_PermissionChecker]] = None,
handlers: Optional[List[Union[T_Handler, Dependent]]] = None,
block: bool = True,
- priority: int = None,
+ priority: int = 1,
state: Optional[T_State] = None,
) -> Type[Matcher]:
if not rule:
@@ -169,8 +160,6 @@ class Service:
permission = self.permission
if not handlers:
handlers = self.handlers
- if not priority:
- priority = self.priority
if not state:
state = self.state
@@ -253,11 +242,13 @@ class Service:
if not aliases:
aliases = set()
+ if isinstance(cmd, tuple):
+ cmd = ".".join(map(str, cmd))
+
cmd_list[cmd] = CommandInfo(
type="command", docs=docs, aliases=list(aliases)
).dict()
self._save_cmds(cmd_list)
-
commands = set([cmd]) | (aliases or set())
return self.on_message(rule=command(*commands) & rule, block=True, **kwargs)
@@ -297,6 +288,15 @@ class Service:
return self.on_message(rule=regex(pattern, flags) & rule, **kwargs)
+ def cmd_as_group(self, cmd: str, docs: str, **kwargs) -> Type[Matcher]:
+ sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
+ _cmd = self.main_cmd + sub_cmd
+
+ if "aliases" in kwargs:
+ del kwargs["aliases"]
+
+ return self.on_command(_cmd, docs, **kwargs)
+
class ServiceTools(object):
@staticmethod
@@ -327,7 +327,7 @@ class ServiceTools(object):
return data
@classmethod
- def auth_service(cls, service, user_id: str = None, group_id: str = None) -> bool:
+ def auth_service(cls, service, user_id: str = str(), group_id: str = str()) -> bool:
data = cls.load_service(service)
auth_global = data.get("enabled", True)
diff --git a/ATRI/utils/__init__.py b/ATRI/utils/__init__.py
index 565de97..29c9f05 100644
--- a/ATRI/utils/__init__.py
+++ b/ATRI/utils/__init__.py
@@ -5,18 +5,15 @@ import yaml
import aiofiles
import time
from pathlib import Path
-from aiohttp import FormData
from datetime import datetime
from PIL import Image, ImageFile
from aiofiles.threadpool.text import AsyncTextIOWrapper
-from . import request
-
def timestamp2datetimestr(timestamp: int) -> str:
format = "%Y-%m-%d %H:%M:%S"
- timestamp = time.localtime(timestamp)
- dt = time.strftime(format, timestamp)
+ tt = time.localtime(timestamp)
+ dt = time.strftime(format, tt)
return dt
diff --git a/ATRI/utils/apscheduler.py b/ATRI/utils/apscheduler.py
index 186ff8e..e1959c1 100644
--- a/ATRI/utils/apscheduler.py
+++ b/ATRI/utils/apscheduler.py
@@ -1,34 +1,13 @@
-"""
-Fork from: https://github.com/nonebot/plugin-apscheduler
-"""
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
-from nonebot import get_driver
-from nonebot.log import logger, LoguruHandler
+from nonebot.log import LoguruHandler
-apscheduler_autostart: bool = True
-# apscheduler_config: dict = {"apscheduler.timezone": "Asia/Shanghai"}
-
-
-driver = get_driver()
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
-async def _start_scheduler():
- if not scheduler.running:
- # scheduler.configure(apscheduler_config)
- scheduler.start()
- logger.info("Scheduler Started.")
-
-
-if apscheduler_autostart:
- driver.on_startup(_start_scheduler)
-
aps_logger = logging.getLogger("apscheduler")
aps_logger.setLevel(logging.DEBUG)
aps_logger.handlers.clear()
aps_logger.addHandler(LoguruHandler())
-
-from apscheduler.triggers.date import DateTrigger
diff --git a/ATRI/utils/check_update.py b/ATRI/utils/check_update.py
new file mode 100644
index 0000000..a8575ab
--- /dev/null
+++ b/ATRI/utils/check_update.py
@@ -0,0 +1,54 @@
+from ATRI.exceptions import RequestError
+
+from . import request
+
+
+REPO_COMMITS_URL = "https://api.github.com/repos/Kyomotoi/ATRI/commits"
+REPO_RELEASE_URL = "https://api.github.com/repos/Kyomotoi/ATRI/releases"
+
+
+class CheckUpdate:
+ @staticmethod
+ async def _get_commits_info() -> dict:
+ req = await request.get(REPO_COMMITS_URL)
+ return req.json()
+
+ @staticmethod
+ async def _get_release_info() -> dict:
+ req = await request.get(REPO_RELEASE_URL)
+ return req.json()
+
+ @classmethod
+ async def show_latest_commit_info(cls) -> str:
+ try:
+ data = await cls._get_commits_info()
+ except RequestError:
+ raise RequestError("Getting commit info timeout...")
+
+ try:
+ commit_data: dict = data[0]
+ except Exception:
+ raise Exception("GitHub has been error!!!")
+
+ c_info = commit_data["commit"]
+ c_msg = c_info["message"]
+ c_sha = commit_data["sha"][0:5]
+ c_time = c_info["author"]["date"]
+
+ return f"Latest commit {c_msg} | sha: {c_sha} | time: {c_time}"
+
+ @classmethod
+ async def show_latest_version(cls) -> tuple:
+ try:
+ data = await cls._get_release_info()
+ except RequestError:
+ raise RequestError("Getting release list timeout...")
+
+ try:
+ release_data: dict = data[0]
+ except Exception:
+ raise Exception("GitHub has been error!!!")
+
+ l_v = release_data["tag_name"]
+ l_v_t = release_data["published_at"]
+ return l_v, l_v_t
diff --git a/README.md b/README.md
index d9a74a4..4e3b43c 100644
--- a/README.md
+++ b/README.md
@@ -22,7 +22,7 @@
为QQ群中复现一个优秀的功能性机器人是本项目的目标
-## 声明(Attaction)
+## 声明(Warning)
**一切开发旨在学习,请勿用于非法用途**
@@ -55,6 +55,7 @@
- 以图搜番
- ATRI语(加密、解密,改自[`rcnb`](https://github.com/rcnbapp/RCNB.js))
- 简单骰子
+ - b站动态订阅
- 娱乐:
- 看不懂的笑话
@@ -69,7 +70,7 @@
- [ ] 网页控制台
- [ ] RSS订阅
- - [ ] B站动态订阅
+ - [x] B站动态订阅
- [ ] 冷重启
- [ ] 进裙验证(问题可自定义)
- [ ] 好感度系统(目前优先在[`go-ATRI`](https://github.com/Kyomotoi/go-ATRI)上实现)
diff --git a/test/test_plugin_chat.py b/test/test_plugin_chat.py
deleted file mode 100644
index e04f1d3..0000000
--- a/test/test_plugin_chat.py
+++ /dev/null
@@ -1,84 +0,0 @@
-import pytest
-from nonebug import App
-from nonebot.adapters.onebot.v11 import MessageSegment
-
-from .utils import make_fake_message, make_fake_event
-
-
-async def test_chat(app: App):
- from ATRI.plugins.chat import chat
-
- Message = make_fake_message()
-
- async with app.test_matcher(chat) as ctx:
- bot = ctx.create_bot()
-
- msg = Message("爱你")
- event = make_fake_event(_message=msg, _to_me=True)()
-
- ctx.receive_event(bot, event)
- ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True)
-
-
-async def test_my_name_is(app: App):
- from ATRI.plugins.chat import my_name_is
-
- Message = make_fake_message()
-
- async with app.test_matcher(my_name_is) as ctx:
- bot = ctx.create_bot()
-
- msg = Message("叫我")
- event = make_fake_event(_message=msg, _to_me=True)()
-
- ctx.receive_event(bot, event)
- ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True)
-
- msg = Message("欧尼酱")
- event = make_fake_event(_message=msg)()
-
- ctx.receive_event(bot, event)
- ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True)
-
-
-async def test_say(app: App):
- from ATRI.plugins.chat import say
-
- Message = make_fake_message()
-
- async with app.test_matcher(say) as ctx:
- bot = ctx.create_bot()
-
- msg = Message("说")
- event = make_fake_event(_message=msg, _to_me=True)()
-
- ctx.receive_event(bot, event)
- ctx.should_call_send(event, "想要咱复读啥呢...", True)
-
- msg = Message("nya~")
- event = make_fake_event(_message=msg)()
-
- ctx.receive_event(bot, event)
- ctx.should_call_send(event, "nya~", True)
-
- async with app.test_matcher(say) as ctx:
- bot = ctx.create_bot()
-
- msg = Message("说")
- event = make_fake_event(_message=msg, _to_me=True)()
-
- ctx.receive_event(bot, event)
- ctx.should_call_send(event, "想要咱复读啥呢...", True)
-
- msg = Message(
- MessageSegment.image(
- "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/noting/88674944_p0.png"
- )
- )
- event = make_fake_event(_message=msg, _to_me=True)()
-
- ctx.receive_event(bot, event)
- ctx.should_call_send(event, "不要...", True)
diff --git a/test/test_plugin_code_runner.py b/test/test_plugin_code_runner.py
index 584ebb0..ec9eb2d 100644
--- a/test/test_plugin_code_runner.py
+++ b/test/test_plugin_code_runner.py
@@ -17,12 +17,12 @@ async def test_code_runner(app: App):
event = make_fake_event(_message=msg)()
ctx.receive_event(bot, event)
- ctx.should_call_send(event, "请键入 /code help 以获取帮助~!", True)
+ ctx.should_call_send(event, "请键入 /code.help 以获取帮助~!", True)
async with app.test_matcher(code_runner) as ctx:
bot = ctx.create_bot()
- msg = Message("/code help")
+ msg = Message("/code.help")
event = make_fake_event(_message=msg)()
ctx.receive_event(bot, event)
@@ -34,14 +34,14 @@ async def test_code_runner(app: App):
For example:
/code python
print('hello world')
- """,
+ """.strip(),
True,
)
async with app.test_matcher(code_runner) as ctx:
bot = ctx.create_bot()
- msg = Message("/code list")
+ msg = Message("/code.list")
event = make_fake_event(_message=msg)()
ctx.receive_event(bot, event)
@@ -56,7 +56,7 @@ async def test_code_runner(app: App):
julia, kotlin, lua, perl,
php, python, ruby, rust,
scala, swift, typescript
- """,
+ """.strip(),
True,
)
diff --git a/test/test_plugin_help.py b/test/test_plugin_help.py
index 82a0467..3d975ac 100644
--- a/test/test_plugin_help.py
+++ b/test/test_plugin_help.py
@@ -6,11 +6,11 @@ from .utils import make_fake_message, make_fake_event
@pytest.mark.asyncio
async def test_main_help(app: App):
- from ATRI.plugins.help import main_help
+ from ATRI.plugins.help import menu
Message = make_fake_message()
- async with app.test_matcher(main_help) as ctx:
+ async with app.test_matcher(menu) as ctx:
bot = ctx.create_bot()
msg = Message("菜单")
@@ -25,7 +25,7 @@ async def test_main_help(app: App):
服务列表 -以查看所有可用服务
帮助 [服务] -以查看对应服务帮助
Tip: 均需要at触发。@bot 菜单 以打开此页面
- """,
+ """.strip(),
True,
)
@@ -34,7 +34,7 @@ async def test_main_help(app: App):
async def test_about_me(app: App):
from ATRI import __version__
from ATRI.config import BotSelfConfig
- from ATRI.plugins.help import about_me
+ from ATRI.plugins.help import about
temp_list = list()
for i in BotSelfConfig.nickname:
@@ -43,7 +43,7 @@ async def test_about_me(app: App):
Message = make_fake_message()
- async with app.test_matcher(about_me) as ctx:
+ async with app.test_matcher(about) as ctx:
bot = ctx.create_bot()
msg = Message("关于")
@@ -57,8 +57,9 @@ async def test_about_me(app: App):
可以称呼咱:{nickname}
咱的型号是:{__version__}
想进一步了解:
- https://github.com/Kyomotoi/ATRI
- """,
+ atri.kyomotoi.moe
+ 进不去: project-atri-docs.vercel.app
+ """.strip(),
True,
)
@@ -87,7 +88,10 @@ async def test_service_list(app: App):
]
)
table = tabulate(
- services, headers=["服务名称", "开启状态", "仅支持管理员"], tablefmt="plain", showindex=True
+ services,
+ headers=["服务名称", "开启状态(全局)", "仅支持管理员"],
+ tablefmt="plain",
+ showindex=True,
)
output = f"咱搭载了以下服务~\n{table}\n@bot 帮助 [服务] -以查看对应服务帮助"
@@ -139,7 +143,8 @@ async def test_service_info(app: App):
服务名:状态
说明:检查咱自身状态
可用命令:
- /ping、/status
+ /ping
+ /status
是否全局启用:True
Tip: @bot 帮助 [服务] [命令] 以查看对应命令详细信息
""",
diff --git a/test/test_plugin_kimo.py b/test/test_plugin_kimo.py
new file mode 100644
index 0000000..ad6ff0b
--- /dev/null
+++ b/test/test_plugin_kimo.py
@@ -0,0 +1,43 @@
+import pytest
+from nonebug import App
+from nonebot.adapters.onebot.v11 import MessageSegment
+
+from .utils import make_fake_message, make_fake_event
+
+
+async def test_chat(app: App):
+ from ATRI.plugins.kimo import kimo
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(kimo) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("爱你")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "是…是嘛(脸红)呐,其实咱也……", True)
+
+
+async def test_my_name_is(app: App):
+ from ATRI.plugins.kimo import my_name_is
+
+ Message = make_fake_message()
+
+ async with app.test_matcher(my_name_is) as ctx:
+ bot = ctx.create_bot()
+
+ msg = Message("叫我")
+ event = make_fake_event(_message=msg, _to_me=True)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "欧尼酱想让咱如何称呼呢!0w0", True)
+
+ msg = Message("欧尼酱")
+ event = make_fake_event(_message=msg)()
+
+ ctx.receive_event(bot, event)
+ ctx.should_call_send(event, "好~w 那咱以后就称呼你为欧尼酱!", True)