diff options
author | Kyomotoi <[email protected]> | 2022-06-13 19:37:34 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2022-06-13 19:37:34 +0800 |
commit | 59c923bbc3e91ad5a3482ec81c9192297ebbebe8 (patch) | |
tree | d74da226542a8b6f54200bd6d55e8d459315c71d /ATRI/plugins/bilibili_dynamic/data_source.py | |
parent | 4a876e2e64a690dcf01c385e273052e1e22caacd (diff) | |
download | ATRI-59c923bbc3e91ad5a3482ec81c9192297ebbebe8.tar.gz ATRI-59c923bbc3e91ad5a3482ec81c9192297ebbebe8.tar.bz2 ATRI-59c923bbc3e91ad5a3482ec81c9192297ebbebe8.zip |
♻️ 重构插件: b站动态订阅
Diffstat (limited to 'ATRI/plugins/bilibili_dynamic/data_source.py')
-rw-r--r-- | ATRI/plugins/bilibili_dynamic/data_source.py | 373 |
1 files changed, 96 insertions, 277 deletions
diff --git a/ATRI/plugins/bilibili_dynamic/data_source.py b/ATRI/plugins/bilibili_dynamic/data_source.py index 14143e6..5233692 100644 --- a/ATRI/plugins/bilibili_dynamic/data_source.py +++ b/ATRI/plugins/bilibili_dynamic/data_source.py @@ -1,286 +1,99 @@ -from ATRI.service import Service -from ATRI.rule import is_in_service -from ATRI.database.db import DB -from ATRI.utils import timestamp2datetime - import json -import aiohttp -import os -import re -import asyncio -from typing import Any from operator import itemgetter -__session_pool = {} - - -def get_api(field: str) -> dict: - """ - 获取 API。 - - Args: - field (str): API 所属分类,即 data/api 下的文件名(不含后缀名) - - Returns: - dict, 该 API 的内容。 - """ - path = os.path.abspath( - os.path.join(os.path.dirname(__file__), f"{field.lower()}.json") - ) - if os.path.exists(path): - with open(path, encoding="utf8") as f: - return json.loads(f.read()) - else: - return dict() - - -API: dict = get_api("user") - - -def get_session(): - """ - 获取当前模块的 aiohttp.ClientSession 对象,用于自定义请求 - - Returns: - aiohttp.ClientSession - """ - loop = asyncio.get_event_loop() - session = __session_pool.get(loop, None) - if session is None: - session = aiohttp.ClientSession(loop=loop) - __session_pool[loop] = session - - return session - - -async def bilibili_request( - method: str, - url: str, - params: dict = dict(), - data: Any = None, - no_csrf: bool = False, - json_body: bool = False, - **kwargs, -) -> dict: - """ - 向接口发送请求。 - - Args: - method (str) : 请求方法。 - url (str) : 请求 URL。 - params (dict, optional) : 请求参数。 - data (Any, optional) : 请求载荷。 - no_csrf (bool, optional) : 不要自动添加 CSRF。 - json_body (bool, optional) 载荷是否为 JSON - - Returns: - 接口未返回数据时,返回 None,否则返回该接口提供的 data 或 result 字段的数据。 - """ - - method = method.upper() - - # 使用 Referer 和 UA 请求头以绕过反爬虫机制 - DEFAULT_HEADERS = { - "Referer": "https://www.bilibili.com", - "User-Agent": "Mozilla/5.0", - } - headers = DEFAULT_HEADERS +from nonebot.adapters.onebot.v11 import MessageSegment +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN - if params is None: - params = {} - - # 自动添加 csrf - if not no_csrf and method in ["POST", "DELETE", "PATCH"]: - if data is None: - data = {} - data["csrf"] = "" - data["csrf_token"] = "" - - # jsonp - - if params.get("jsonp", "") == "jsonp": - params["callback"] = "callback" +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.utils import timestamp2datetime +from ATRI.exceptions import BilibiliDynamicError - config = { - "method": method, - "url": url, - "params": params, - "data": data, - "headers": headers, - "cookies": "", - } +from .database import DB +from .api import API - config.update(kwargs) - if json_body: - config["headers"]["Content-Type"] = "application/json" - config["data"] = json.dumps(config["data"]) +_OUTPUT_FORMAT = """ +{up_nickname} 的{up_dy_type}更新了! +{up_dy_content} +{up_dy_media} +链接: {up_dy_link} +""".strip() - session = get_session() - async with session.request(**config) as resp: +class BilibiliDynamicSubscriptor(Service): + def __init__(self): + Service.__init__( + self, + "b站动态订阅", + "b站动态订阅助手~", + rule=is_in_service("b站动态订阅"), + permission=GROUP_OWNER | GROUP_ADMIN, + main_cmd="/bd", + ) - # 检查状态码 + async def add_sub(self, uid: int, group_id: int): try: - resp.raise_for_status() - except aiohttp.ClientResponseError as e: - raise Exception(e.message) - - # 检查响应头 Content-Length - content_length = resp.headers.get("content-length") - if content_length and int(content_length) == 0: - return dict() + async with DB() as db: + await db.add_sub(uid, group_id) + except BilibiliDynamicError: + raise BilibiliDynamicError("添加订阅失败") - # 检查响应头 Content-Type - content_type = resp.headers.get("content-type") - - # 不是 application/json - if content_type.lower().index("application/json") == -1: # type: ignore - raise Exception("响应不是 application/json 类型") - - raw_data = await resp.text() - resp_data: dict = dict() + async def update_sub(self, uid: int, update_map: dict): + try: + async with DB() as db: + await db.update_sub(uid, update_map) + except BilibiliDynamicError: + BilibiliDynamicError("更新订阅失败") - if "callback" in params: - # JSONP 请求 - resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) # type: ignore + async def del_sub(self, uid: int, group_id: int): + try: + async with DB() as db: + await db.del_sub({"uid": uid, "group_id": group_id}) + except BilibiliDynamicError: + raise BilibiliDynamicError("删除订阅失败") + + async def get_sub_list(self, uid: int = int(), group_id: int = int()) -> list: + if not uid: + query_map = {"group_id": group_id} else: - # JSON - resp_data = json.loads(raw_data) - - # 检查 code - code = resp_data.get("code", None) - - if code is None: - raise Exception("API 返回数据未含 code 字段") - - if code != 0: - msg = resp_data.get("msg", None) - if msg is None: - msg = resp_data.get("message", None) - if msg is None: - msg = "接口未返回错误信息" - raise Exception(msg) - - real_data = resp_data.get("data", None) - if real_data is None: - real_data = resp_data.get("result", None) - return real_data - + query_map = {"uid": uid, "group_id": group_id} -class User: - """ - b站用户相关 - """ - - def __init__(self, uid: int): - """ - Args: - uid (int) : 用户 UID - """ - self.uid = uid - - self.__self_info = None # 暂时无用 - - async def get_user_info(self) -> dict: - """ - 获取用户信息(昵称,性别,生日,签名,头像 URL,空间横幅 URL 等) - - Returns: - dict: 调用接口返回的内容。 - """ - api = API["info"]["info"] - params = {"mid": self.uid} - return await bilibili_request("GET", url=api["url"], params=params) - - async def get_dynamics(self, offset: int = 0, need_top: bool = False): - """ - 获取用户动态。 - - Args: - offset (str, optional): 该值为第一次调用本方法时,数据中会有个 next_offset 字段, - 指向下一动态列表第一条动态(类似单向链表)。 - 根据上一次获取结果中的 next_offset 字段值, - 循环填充该值即可获取到全部动态。 - 0 为从头开始。 - Defaults to 0. - need_top (bool, optional): 显示置顶动态. Defaults to False. + try: + async with DB() as db: + return await db.get_sub_list(query_map) + except BilibiliDynamicError: + raise BilibiliDynamicError("获取订阅列表失败") - Returns: - dict: 调用接口返回的内容。 - """ - api = API["info"]["dynamic"] - params = { - "host_uid": self.uid, - "offset_dynamic_id": offset, - "need_top": 1 if need_top else 0, - } - data: dict = await bilibili_request("GET", url=api["url"], params=params) - # card 字段自动转换成 JSON。 + async def get_all_subs(self) -> list: + try: + async with DB() as db: + return await db.get_all_subs() + except BilibiliDynamicError: + raise BilibiliDynamicError("获取全部订阅列表失败") + + async def get_up_nickname(self, uid: int) -> str: + api = API(uid) + resp = await api.get_user_info() + data = resp.get("data", dict()) + return data.get("name", "unknown") + + async def get_up_recent_dynamic(self, uid: int) -> dict: + api = API(uid) + resp = await api.get_user_dynamics() + data = resp.get("data", dict()) if "cards" in data: for card in data["cards"]: card["card"] = json.loads(card["card"]) card["extend_json"] = json.loads(card["extend_json"]) return data - -class BilibiliDynamicSubscriptor(Service): - def __init__(self): - Service.__init__(self, "b站动态订阅", "b站订阅动态助手", rule=is_in_service("b站动态订阅")) - - async def add_subscription(self, uid: int, groupid: int) -> bool: - async with DB() as db: - res = await db.add_subscription(uid=uid, groupid=groupid) - return res - - async def remove_subscription(self, uid: int, groupid: int) -> bool: - async with DB() as db: - res = await db.remove_subscription( - query_map={"uid": uid, "groupid": groupid} - ) - return res - - async def get_subscriptions(self, query_map: dict) -> list: - async with DB() as db: - res = await db.get_subscriptions(query_map=query_map) - return res - - async def update_subscription_by_uid(self, uid: int, update_map: dict) -> bool: - async with DB() as db: - res = await db.update_subscriptions_by_uid(uid=uid, update_map=update_map) - return res - - async def get_all_subscriptions(self) -> list: - async with DB() as db: - res = await db.get_all_subscriptions() - return res - - # bilibili network function - - async def get_upname_by_uid(self, uid: int) -> str: - try: - u = User(uid) - info: dict = await u.get_user_info() - return info.get("name") - except: - return "" - - async def get_recent_dynamic_by_uid(self, uid: int) -> dict: - try: - u = User(uid) - info = await u.get_dynamics() - return info - except: - return {} - - def extract_dynamics_detail(self, dynamic_list: list) -> list: - import time - - ret = [] - for d in dynamic_list: + def extract_dyanmic(self, data: list) -> list: + result = list() + for i in data: pattern = {} - desc = d["desc"] - card = d["card"] + desc = i["desc"] + card = i["card"] type = desc["type"] # common 部分 @@ -292,11 +105,11 @@ class BilibiliDynamicSubscriptor(Service): pattern["dynamic_id"] = desc["dynamic_id"] pattern["timestamp"] = desc["timestamp"] pattern["time"] = timestamp2datetime(desc["timestamp"]) - pattern["type_zh"] = "" + pattern["type_zh"] = str() # alternative 部分 - pattern["content"] = "" - pattern["pic"] = "" + pattern["content"] = str() + pattern["pic"] = str() # 根据type区分 提取content if type == 1: # 转发动态 @@ -329,19 +142,25 @@ class BilibiliDynamicSubscriptor(Service): if len(card["image_urls"]) > 0: pattern["pic"] = card["image_urls"][0] - ret.append(pattern) - ret = sorted(ret, key=itemgetter("timestamp")) - return ret + result.append(pattern) + return sorted(result, key=itemgetter("timestamp")) + + def gen_output(self, data: dict, limit_content: int = 100) -> str: + """生成动态信息 - def generate_output(self, pattern: dict) -> tuple: - # 限制摘要的字数 - abstractLimit = 40 - text_part = """【UP名称】{name}\n【动态类型】{dynamic_type}\n【时间】{time}\n【内容摘要】{content}\n【链接】{url}\n""".format( - name=pattern["name"], - dynamic_type=pattern["type_zh"], - time=pattern["time"], - content=pattern["content"][:abstractLimit], - url="https://t.bilibili.com/" + str(pattern["dynamic_id"]), + Args: + data (dict): dict形式的动态数据. + limit_content (int, optional): 内容字数限制. 默认 100. + + Returns: + str: 动态信息 + """ + return _OUTPUT_FORMAT.format( + up_nickname=data["name"], + up_dy_type=data["type_zh"], + up_dy_content=str(data["content"][:limit_content] + "...") + .replace("https://", str()) + .replace("http://", str()), + up_dy_media=MessageSegment.image(data["pic"]) if data.get("pic") else str(), + up_dy_link="https://t.bilibili.com/" + str(data["dynamic_id"]), ) - pic_part = pattern["pic"] - return text_part, pic_part |