summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-03-27 15:28:55 +0800
committerKyomotoi <[email protected]>2022-03-27 15:45:36 +0800
commitd4e5574755a88638f7d592532ad2a13c9b15c23e (patch)
treecef30c28398891e1a63bfbe1cb0eab3216c6f1fc
parent66100f025f6705d68cf40549b5c2057d8426871f (diff)
downloadATRI-d4e5574755a88638f7d592532ad2a13c9b15c23e.tar.gz
ATRI-d4e5574755a88638f7d592532ad2a13c9b15c23e.tar.bz2
ATRI-d4e5574755a88638f7d592532ad2a13c9b15c23e.zip
🔥🚨 删除部分代码 & 安抚 pylance
-rw-r--r--ATRI/plugins/bilibili_dynamic/__init__.py32
-rw-r--r--ATRI/plugins/bilibili_dynamic/data_source.py32
2 files changed, 29 insertions, 35 deletions
diff --git a/ATRI/plugins/bilibili_dynamic/__init__.py b/ATRI/plugins/bilibili_dynamic/__init__.py
index f1ac8d2..d69f81b 100644
--- a/ATRI/plugins/bilibili_dynamic/__init__.py
+++ b/ATRI/plugins/bilibili_dynamic/__init__.py
@@ -1,22 +1,24 @@
+import re
+from tabulate import tabulate
+from datetime import datetime, timedelta
+
import pytz
from apscheduler.triggers.base import BaseTrigger
from apscheduler.triggers.combining import AndTrigger
from apscheduler.triggers.interval import IntervalTrigger
-from ATRI.utils.apscheduler import scheduler
-from ATRI.utils import timestamp2datetime
-
from nonebot.params import State
from nonebot.adapters.onebot.v11 import MessageSegment, GroupMessageEvent, Message
from nonebot.typing import T_State
from nonebot import get_bot
-from .data_source import BilibiliDynamicSubscriptor
-import re
-from tabulate import tabulate
-from datetime import datetime, timedelta
+from ATRI.utils.apscheduler import scheduler
+from ATRI.utils import timestamp2datetime
from ATRI.log import logger
+from .data_source import BilibiliDynamicSubscriptor
+
+
bilibili_dynamic = BilibiliDynamicSubscriptor().on_command(
"/bilibili_dynamic", "b站动态订阅助手", aliases={"/bd", "b站动态"}
)
@@ -39,7 +41,6 @@ def help() -> str:
@bilibili_dynamic.handle()
async def _menu(event: GroupMessageEvent, state: T_State = State()):
args = str(event.get_plaintext()).strip().lower().split()[1:]
- # print(args)
if not args:
await bilibili_dynamic.finish(help())
elif args and len(args) == 1:
@@ -58,7 +59,6 @@ async def handle_subcommand(event: GroupMessageEvent, state: T_State = State()):
if state["sub_command"] == "订阅列表":
subscriptor = BilibiliDynamicSubscriptor()
- # print(event.group_id)
r = await subscriptor.get_subscriptions(query_map={"groupid": event.group_id})
subs = []
for s in r:
@@ -80,8 +80,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()):
if uid == "-1":
await bilibili_dynamic.finish("已经成功退出订阅~")
- # print(state)
- # print(uid)
if not re.match(r"^\d+$", uid):
await bilibili_dynamic.reject("这似乎不是UID呢, 请重新输入:")
uid = int(uid)
@@ -107,7 +105,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()):
)
)
success = await subscriptor.add_subscription(uid, event.group_id)
- print(success)
success = success and (
await subscriptor.update_subscription_by_uid(
uid=uid,
@@ -141,9 +138,9 @@ tq = Queue()
class BilibiliDynamicCheckEnabledTrigger(BaseTrigger):
# 自定义trigger 保证服务开启
# 实现abstract方法 <get_next_fire_time>
- def get_next_fire_time(self, previous_fire_time, now):
+ def get_next_fire_time(self, now):
subscriptor = BilibiliDynamicSubscriptor()
- config = subscriptor.load_service()
+ config = subscriptor.load_service("b站动态订阅")
if config["enabled"] == False:
return None
else:
@@ -156,8 +153,8 @@ class BilibiliDynamicCheckEnabledTrigger(BaseTrigger):
@scheduler.scheduled_job(
AndTrigger([IntervalTrigger(seconds=10), BilibiliDynamicCheckEnabledTrigger()]),
name="b站动态检查",
- max_instances=3,
- misfire_grace_time=60,
+ max_instances=3, # type: ignore
+ misfire_grace_time=60, # type: ignore
)
async def _check_dynamic():
from ATRI.database.models import Subscription
@@ -171,7 +168,6 @@ async def _check_dynamic():
d: Subscription = tq.get()
logger.info("准备查询UP【{up}】的动态 队列剩余{size}".format(up=d.nickname, size=tq.qsize()))
ts = int(d.last_update.timestamp())
- # logger.info("上一次更新的时间戳{time}".format(time=ts))
info: dict = await subscriptor.get_recent_dynamic_by_uid(d.uid)
res = []
if info:
@@ -181,11 +177,9 @@ async def _check_dynamic():
if len(res) == 0:
logger.warning("获取UP【{up}】的动态为空".format(up=d.nickname))
for i in res:
- # logger.info("获取UP【{up}】的动态,时间{time},内容{content}".format(up=d.nickname, time=i["timestamp"],content=i["content"][:20]))
i["name"] = d.nickname
if ts < i["timestamp"]:
text, pic_url = subscriptor.generate_output(pattern=i)
- # print(text,pic_url)
output = Message(
[MessageSegment.text(text), MessageSegment.image(pic_url)]
)
diff --git a/ATRI/plugins/bilibili_dynamic/data_source.py b/ATRI/plugins/bilibili_dynamic/data_source.py
index 49ffce7..a419b45 100644
--- a/ATRI/plugins/bilibili_dynamic/data_source.py
+++ b/ATRI/plugins/bilibili_dynamic/data_source.py
@@ -11,13 +11,11 @@ import asyncio
from typing import Any
from operator import itemgetter
-__doc__ = """b站订阅动态助手
-"""
__session_pool = {}
-def get_api(field: str):
+def get_api(field: str) -> dict:
"""
获取 API。
@@ -33,9 +31,11 @@ def get_api(field: str):
if os.path.exists(path):
with open(path, encoding="utf8") as f:
return json.loads(f.read())
+ else:
+ return dict()
-API = get_api("user")
+API: dict = get_api("user")
def get_session():
@@ -57,12 +57,12 @@ def get_session():
async def bilibili_request(
method: str,
url: str,
- params: dict = None,
+ params: dict = dict(),
data: Any = None,
no_csrf: bool = False,
json_body: bool = False,
**kwargs,
-):
+) -> dict:
"""
向接口发送请求。
@@ -130,21 +130,21 @@ async def bilibili_request(
# 检查响应头 Content-Length
content_length = resp.headers.get("content-length")
if content_length and int(content_length) == 0:
- return None
+ return dict()
# 检查响应头 Content-Type
content_type = resp.headers.get("content-type")
# 不是 application/json
- if content_type.lower().index("application/json") == -1:
+ if content_type.lower().index("application/json") == -1: # type: ignore
raise Exception("响应不是 application/json 类型")
raw_data = await resp.text()
- resp_data: dict
+ resp_data: dict = dict()
if "callback" in params:
# JSONP 请求
- resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1))
+ resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) # type: ignore
else:
# JSON
resp_data = json.loads(raw_data)
@@ -181,9 +181,9 @@ class User:
"""
self.uid = uid
- self.__self_info = None
+ self.__self_info = None # 暂时无用
- async def get_user_info(self):
+ async def get_user_info(self) -> dict:
"""
获取用户信息(昵称,性别,生日,签名,头像 URL,空间横幅 URL 等)
@@ -216,7 +216,7 @@ class User:
"offset_dynamic_id": offset,
"need_top": 1 if need_top else 0,
}
- data = await bilibili_request("GET", url=api["url"], params=params)
+ data: dict = await bilibili_request("GET", url=api["url"], params=params)
# card 字段自动转换成 JSON。
if "cards" in data:
for card in data["cards"]:
@@ -227,7 +227,7 @@ class User:
class BilibiliDynamicSubscriptor(Service):
def __init__(self):
- Service.__init__(self, "b站动态订阅", __doc__, rule=is_in_service("b站动态订阅"))
+ Service.__init__(self, "b站动态订阅", "b站订阅动态助手", rule=is_in_service("b站动态订阅"))
async def add_subscription(self, uid: int, groupid: int) -> bool:
async with DB() as db:
@@ -261,7 +261,7 @@ class BilibiliDynamicSubscriptor(Service):
async def get_upname_by_uid(self, uid: int) -> str:
try:
u = User(uid)
- info = await u.get_user_info()
+ info: dict = await u.get_user_info()
return info.get("name")
except:
return ""
@@ -334,7 +334,7 @@ class BilibiliDynamicSubscriptor(Service):
ret = sorted(ret, key=itemgetter("timestamp"))
return ret
- def generate_output(self, pattern: dict) -> (str, str):
+ def generate_output(self, pattern: dict) -> tuple:
# 限制摘要的字数
abstractLimit = 40
text_part = """【UP名称】{name}\n【动态类型】{dynamic_type}\n【时间】{time}\n【内容摘要】{content}\n""".format(