diff options
author | Kyomotoi <[email protected]> | 2022-03-27 15:28:55 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2022-03-27 15:45:36 +0800 |
commit | d4e5574755a88638f7d592532ad2a13c9b15c23e (patch) | |
tree | cef30c28398891e1a63bfbe1cb0eab3216c6f1fc | |
parent | 66100f025f6705d68cf40549b5c2057d8426871f (diff) | |
download | ATRI-d4e5574755a88638f7d592532ad2a13c9b15c23e.tar.gz ATRI-d4e5574755a88638f7d592532ad2a13c9b15c23e.tar.bz2 ATRI-d4e5574755a88638f7d592532ad2a13c9b15c23e.zip |
🔥🚨 删除部分代码 & 安抚 pylance
-rw-r--r-- | ATRI/plugins/bilibili_dynamic/__init__.py | 32 | ||||
-rw-r--r-- | ATRI/plugins/bilibili_dynamic/data_source.py | 32 |
2 files changed, 29 insertions, 35 deletions
diff --git a/ATRI/plugins/bilibili_dynamic/__init__.py b/ATRI/plugins/bilibili_dynamic/__init__.py index f1ac8d2..d69f81b 100644 --- a/ATRI/plugins/bilibili_dynamic/__init__.py +++ b/ATRI/plugins/bilibili_dynamic/__init__.py @@ -1,22 +1,24 @@ +import re +from tabulate import tabulate +from datetime import datetime, timedelta + import pytz from apscheduler.triggers.base import BaseTrigger from apscheduler.triggers.combining import AndTrigger from apscheduler.triggers.interval import IntervalTrigger -from ATRI.utils.apscheduler import scheduler -from ATRI.utils import timestamp2datetime - from nonebot.params import State from nonebot.adapters.onebot.v11 import MessageSegment, GroupMessageEvent, Message from nonebot.typing import T_State from nonebot import get_bot -from .data_source import BilibiliDynamicSubscriptor -import re -from tabulate import tabulate -from datetime import datetime, timedelta +from ATRI.utils.apscheduler import scheduler +from ATRI.utils import timestamp2datetime from ATRI.log import logger +from .data_source import BilibiliDynamicSubscriptor + + bilibili_dynamic = BilibiliDynamicSubscriptor().on_command( "/bilibili_dynamic", "b站动态订阅助手", aliases={"/bd", "b站动态"} ) @@ -39,7 +41,6 @@ def help() -> str: @bilibili_dynamic.handle() async def _menu(event: GroupMessageEvent, state: T_State = State()): args = str(event.get_plaintext()).strip().lower().split()[1:] - # print(args) if not args: await bilibili_dynamic.finish(help()) elif args and len(args) == 1: @@ -58,7 +59,6 @@ async def handle_subcommand(event: GroupMessageEvent, state: T_State = State()): if state["sub_command"] == "订阅列表": subscriptor = BilibiliDynamicSubscriptor() - # print(event.group_id) r = await subscriptor.get_subscriptions(query_map={"groupid": event.group_id}) subs = [] for s in r: @@ -80,8 +80,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()): if uid == "-1": await bilibili_dynamic.finish("已经成功退出订阅~") - # print(state) - # print(uid) if not re.match(r"^\d+$", uid): await bilibili_dynamic.reject("这似乎不是UID呢, 请重新输入:") uid = int(uid) @@ -107,7 +105,6 @@ async def handle_uid(event: GroupMessageEvent, state: T_State = State()): ) ) success = await subscriptor.add_subscription(uid, event.group_id) - print(success) success = success and ( await subscriptor.update_subscription_by_uid( uid=uid, @@ -141,9 +138,9 @@ tq = Queue() class BilibiliDynamicCheckEnabledTrigger(BaseTrigger): # 自定义trigger 保证服务开启 # 实现abstract方法 <get_next_fire_time> - def get_next_fire_time(self, previous_fire_time, now): + def get_next_fire_time(self, now): subscriptor = BilibiliDynamicSubscriptor() - config = subscriptor.load_service() + config = subscriptor.load_service("b站动态订阅") if config["enabled"] == False: return None else: @@ -156,8 +153,8 @@ class BilibiliDynamicCheckEnabledTrigger(BaseTrigger): @scheduler.scheduled_job( AndTrigger([IntervalTrigger(seconds=10), BilibiliDynamicCheckEnabledTrigger()]), name="b站动态检查", - max_instances=3, - misfire_grace_time=60, + max_instances=3, # type: ignore + misfire_grace_time=60, # type: ignore ) async def _check_dynamic(): from ATRI.database.models import Subscription @@ -171,7 +168,6 @@ async def _check_dynamic(): d: Subscription = tq.get() logger.info("准备查询UP【{up}】的动态 队列剩余{size}".format(up=d.nickname, size=tq.qsize())) ts = int(d.last_update.timestamp()) - # logger.info("上一次更新的时间戳{time}".format(time=ts)) info: dict = await subscriptor.get_recent_dynamic_by_uid(d.uid) res = [] if info: @@ -181,11 +177,9 @@ async def _check_dynamic(): if len(res) == 0: logger.warning("获取UP【{up}】的动态为空".format(up=d.nickname)) for i in res: - # logger.info("获取UP【{up}】的动态,时间{time},内容{content}".format(up=d.nickname, time=i["timestamp"],content=i["content"][:20])) i["name"] = d.nickname if ts < i["timestamp"]: text, pic_url = subscriptor.generate_output(pattern=i) - # print(text,pic_url) output = Message( [MessageSegment.text(text), MessageSegment.image(pic_url)] ) diff --git a/ATRI/plugins/bilibili_dynamic/data_source.py b/ATRI/plugins/bilibili_dynamic/data_source.py index 49ffce7..a419b45 100644 --- a/ATRI/plugins/bilibili_dynamic/data_source.py +++ b/ATRI/plugins/bilibili_dynamic/data_source.py @@ -11,13 +11,11 @@ import asyncio from typing import Any from operator import itemgetter -__doc__ = """b站订阅动态助手 -""" __session_pool = {} -def get_api(field: str): +def get_api(field: str) -> dict: """ 获取 API。 @@ -33,9 +31,11 @@ def get_api(field: str): if os.path.exists(path): with open(path, encoding="utf8") as f: return json.loads(f.read()) + else: + return dict() -API = get_api("user") +API: dict = get_api("user") def get_session(): @@ -57,12 +57,12 @@ def get_session(): async def bilibili_request( method: str, url: str, - params: dict = None, + params: dict = dict(), data: Any = None, no_csrf: bool = False, json_body: bool = False, **kwargs, -): +) -> dict: """ 向接口发送请求。 @@ -130,21 +130,21 @@ async def bilibili_request( # 检查响应头 Content-Length content_length = resp.headers.get("content-length") if content_length and int(content_length) == 0: - return None + return dict() # 检查响应头 Content-Type content_type = resp.headers.get("content-type") # 不是 application/json - if content_type.lower().index("application/json") == -1: + if content_type.lower().index("application/json") == -1: # type: ignore raise Exception("响应不是 application/json 类型") raw_data = await resp.text() - resp_data: dict + resp_data: dict = dict() if "callback" in params: # JSONP 请求 - resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) + resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) # type: ignore else: # JSON resp_data = json.loads(raw_data) @@ -181,9 +181,9 @@ class User: """ self.uid = uid - self.__self_info = None + self.__self_info = None # 暂时无用 - async def get_user_info(self): + async def get_user_info(self) -> dict: """ 获取用户信息(昵称,性别,生日,签名,头像 URL,空间横幅 URL 等) @@ -216,7 +216,7 @@ class User: "offset_dynamic_id": offset, "need_top": 1 if need_top else 0, } - data = await bilibili_request("GET", url=api["url"], params=params) + data: dict = await bilibili_request("GET", url=api["url"], params=params) # card 字段自动转换成 JSON。 if "cards" in data: for card in data["cards"]: @@ -227,7 +227,7 @@ class User: class BilibiliDynamicSubscriptor(Service): def __init__(self): - Service.__init__(self, "b站动态订阅", __doc__, rule=is_in_service("b站动态订阅")) + Service.__init__(self, "b站动态订阅", "b站订阅动态助手", rule=is_in_service("b站动态订阅")) async def add_subscription(self, uid: int, groupid: int) -> bool: async with DB() as db: @@ -261,7 +261,7 @@ class BilibiliDynamicSubscriptor(Service): async def get_upname_by_uid(self, uid: int) -> str: try: u = User(uid) - info = await u.get_user_info() + info: dict = await u.get_user_info() return info.get("name") except: return "" @@ -334,7 +334,7 @@ class BilibiliDynamicSubscriptor(Service): ret = sorted(ret, key=itemgetter("timestamp")) return ret - def generate_output(self, pattern: dict) -> (str, str): + def generate_output(self, pattern: dict) -> tuple: # 限制摘要的字数 abstractLimit = 40 text_part = """【UP名称】{name}\n【动态类型】{dynamic_type}\n【时间】{time}\n【内容摘要】{content}\n""".format( |