diff options
author | Yuki-Asuuna <[email protected]> | 2022-02-25 21:22:48 +0800 |
---|---|---|
committer | Yuki-Asuuna <[email protected]> | 2022-02-25 21:22:48 +0800 |
commit | 920173c8faaab5a65459ce176d36812bac6feb08 (patch) | |
tree | 5b6925acd2804af1b64b729d50174926420c2835 /ATRI/plugins/bilibili_dynamic/data_source.py | |
parent | 1b5e5de6bcb0b76d432bf0d9e1a6fb9bba510e52 (diff) | |
download | ATRI-920173c8faaab5a65459ce176d36812bac6feb08.tar.gz ATRI-920173c8faaab5a65459ce176d36812bac6feb08.tar.bz2 ATRI-920173c8faaab5a65459ce176d36812bac6feb08.zip |
feat: 添加b站动态订阅功能
Change-Id: I8b74e3a286901379b8337e33d1b581524cb80d97
Diffstat (limited to 'ATRI/plugins/bilibili_dynamic/data_source.py')
-rw-r--r-- | ATRI/plugins/bilibili_dynamic/data_source.py | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/ATRI/plugins/bilibili_dynamic/data_source.py b/ATRI/plugins/bilibili_dynamic/data_source.py new file mode 100644 index 0000000..e4eb5d7 --- /dev/null +++ b/ATRI/plugins/bilibili_dynamic/data_source.py @@ -0,0 +1,349 @@ +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.database.db import DB +from ATRI.utils import timestamp2datetime + +import json +import aiohttp +import os +import re +import asyncio +from typing import Any + +__doc__ = """b站订阅动态助手 +""" + +__session_pool = {} + + +def get_api(field: str): + """ + 获取 API。 + + Args: + field (str): API 所属分类,即 data/api 下的文件名(不含后缀名) + + Returns: + dict, 该 API 的内容。 + """ + path = os.path.abspath( + os.path.join(os.path.dirname(__file__), f"{field.lower()}.json") + ) + if os.path.exists(path): + with open(path, encoding="utf8") as f: + return json.loads(f.read()) + + +API = get_api("user") + + +def get_session(): + """ + 获取当前模块的 aiohttp.ClientSession 对象,用于自定义请求 + + Returns: + aiohttp.ClientSession + """ + loop = asyncio.get_event_loop() + session = __session_pool.get(loop, None) + if session is None: + session = aiohttp.ClientSession(loop=loop) + __session_pool[loop] = session + + return session + + +async def bilibili_request( + method: str, + url: str, + params: dict = None, + data: Any = None, + no_csrf: bool = False, + json_body: bool = False, + **kwargs, +): + """ + 向接口发送请求。 + + Args: + method (str) : 请求方法。 + url (str) : 请求 URL。 + params (dict, optional) : 请求参数。 + data (Any, optional) : 请求载荷。 + no_csrf (bool, optional) : 不要自动添加 CSRF。 + json_body (bool, optional) 载荷是否为 JSON + + Returns: + 接口未返回数据时,返回 None,否则返回该接口提供的 data 或 result 字段的数据。 + """ + + method = method.upper() + + # 使用 Referer 和 UA 请求头以绕过反爬虫机制 + DEFAULT_HEADERS = { + "Referer": "https://www.bilibili.com", + "User-Agent": "Mozilla/5.0", + } + headers = DEFAULT_HEADERS + + if params is None: + params = {} + + # 自动添加 csrf + if not no_csrf and method in ["POST", "DELETE", "PATCH"]: + if data is None: + data = {} + data["csrf"] = "" + data["csrf_token"] = "" + + # jsonp + + if params.get("jsonp", "") == "jsonp": + params["callback"] = "callback" + + config = { + "method": method, + "url": url, + "params": params, + "data": data, + "headers": headers, + "cookies": "", + } + + config.update(kwargs) + + if json_body: + config["headers"]["Content-Type"] = "application/json" + config["data"] = json.dumps(config["data"]) + + session = get_session() + + async with session.request(**config) as resp: + + # 检查状态码 + try: + resp.raise_for_status() + except aiohttp.ClientResponseError as e: + raise Exception(e.message) + + # 检查响应头 Content-Length + content_length = resp.headers.get("content-length") + if content_length and int(content_length) == 0: + return None + + # 检查响应头 Content-Type + content_type = resp.headers.get("content-type") + + # 不是 application/json + if content_type.lower().index("application/json") == -1: + raise Exception("响应不是 application/json 类型") + + raw_data = await resp.text() + resp_data: dict + + if "callback" in params: + # JSONP 请求 + resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) + else: + # JSON + resp_data = json.loads(raw_data) + + # 检查 code + code = resp_data.get("code", None) + + if code is None: + raise Exception("API 返回数据未含 code 字段") + + if code != 0: + msg = resp_data.get("msg", None) + if msg is None: + msg = resp_data.get("message", None) + if msg is None: + msg = "接口未返回错误信息" + raise Exception(msg) + + real_data = resp_data.get("data", None) + if real_data is None: + real_data = resp_data.get("result", None) + return real_data + + +class User: + """ + b站用户相关 + """ + + def __init__(self, uid: int): + """ + Args: + uid (int) : 用户 UID + """ + self.uid = uid + + self.__self_info = None + + async def get_user_info(self): + """ + 获取用户信息(昵称,性别,生日,签名,头像 URL,空间横幅 URL 等) + + Returns: + dict: 调用接口返回的内容。 + """ + api = API["info"]["info"] + params = {"mid": self.uid} + return await bilibili_request("GET", url=api["url"], params=params) + + async def get_dynamics(self, offset: int = 0, need_top: bool = False): + """ + 获取用户动态。 + + Args: + offset (str, optional): 该值为第一次调用本方法时,数据中会有个 next_offset 字段, + 指向下一动态列表第一条动态(类似单向链表)。 + 根据上一次获取结果中的 next_offset 字段值, + 循环填充该值即可获取到全部动态。 + 0 为从头开始。 + Defaults to 0. + need_top (bool, optional): 显示置顶动态. Defaults to False. + + Returns: + dict: 调用接口返回的内容。 + """ + api = API["info"]["dynamic"] + params = { + "host_uid": self.uid, + "offset_dynamic_id": offset, + "need_top": 1 if need_top else 0, + } + data = await bilibili_request("GET", url=api["url"], params=params) + # card 字段自动转换成 JSON。 + if "cards" in data: + for card in data["cards"]: + card["card"] = json.loads(card["card"]) + card["extend_json"] = json.loads(card["extend_json"]) + return data + + +class BilibiliDynamicSubscriptor(Service): + def __init__(self): + Service.__init__(self, "b站动态订阅", __doc__, rule=is_in_service("b站动态订阅")) + + async def add_subscription(self, uid: int, groupid: int) -> bool: + async with DB() as db: + res = await db.add_subscription(uid=uid, groupid=groupid) + return res + + async def remove_subscription(self, uid: int, groupid: int) -> bool: + async with DB() as db: + res = await db.remove_subscription( + query_map={"uid": uid, "groupid": groupid} + ) + return res + + async def get_subscriptions(self, query_map: dict) -> list: + async with DB() as db: + res = await db.get_subscriptions(query_map=query_map) + return res + + async def update_subscription_by_uid(self, uid: int, update_map: dict) -> bool: + async with DB() as db: + res = await db.update_subscriptions_by_uid(uid=uid, update_map=update_map) + return res + + async def get_all_subscriptions(self) -> list: + async with DB() as db: + res = await db.get_all_subscriptions() + return res + + # bilibili network function + + async def get_upname_by_uid(self, uid: int) -> str: + try: + u = User(uid) + info = await u.get_user_info() + return info.get("name") + except: + return "" + + async def get_recent_dynamic_by_uid(self, uid: int) -> dict: + try: + u = User(uid) + info = await u.get_dynamics() + return info + except: + return {} + + def extract_dynamics_detail(self, dynamic_list: list) -> list: + import time + + ret = [] + for d in dynamic_list: + pattern = {} + desc = d["desc"] + card = d["card"] + type = desc["type"] + + # common 部分 + pattern["type"] = desc["type"] + pattern["uid"] = desc["uid"] + pattern["view"] = desc["view"] + pattern["repost"] = desc["repost"] + pattern["like"] = desc["like"] + pattern["dynamic_id"] = desc["dynamic_id"] + pattern["timestamp"] = desc["timestamp"] + pattern["time"] = timestamp2datetime(desc["timestamp"]) + pattern["type_zh"] = "" + + # alternative 部分 + pattern["content"] = "" + pattern["pic"] = "" + + # 根据type区分 提取content + if type == 1: # 转发动态 + pattern["type_zh"] = "转发动态" + pattern["content"] = card["item"]["content"] + pattern["pic"] = card["user"]["face"] + + elif type == 2: # 普通动态(带多张图片) + pattern["type_zh"] = "普通动态(附图)" + pattern["content"] = card["item"]["description"] + if card["item"]["pictures_count"] > 0: + if isinstance(card["item"]["pictures"][0], str): + pattern["pic"] = card["item"]["pictures"][0] + else: + pattern["pic"] = card["item"]["pictures"][0]["img_src"] + + elif type == 4: # 普通动态(纯文字) + pattern["type_zh"] = "普通动态(纯文字)" + pattern["content"] = card["item"]["content"] + # 无图片 + + elif type == 8: # 视频动态 + pattern["type_zh"] = "视频动态" + pattern["content"] = card["dynamic"] + pattern["pic"] = card["pic"] + + elif type == 64: # 文章 + pattern["type_zh"] = "文章" + pattern["content"] = card["title"] + card["summary"] + if len(card["image_urls"]) > 0: + pattern["pic"] = card["image_urls"][0] + + ret.append(pattern) + + return ret + + def generate_output(self, pattern: dict) -> (str, str): + text_part = """【UP名称】{name}\n【动态类型】{dynamic_type}\n【动态ID】{dynamic_id}\n【时间】{time}\n【UID】{uid}\n【当前阅读次数】{view}\n【当前转发次数】{repost}\n【当前点赞次数】{like}\n【内容摘要】{content}\n""".format( + name=pattern["name"], + dynamic_type=pattern["type_zh"], + dynamic_id=pattern["dynamic_id"], + time=pattern["time"], + uid=pattern["uid"], + view=pattern["view"], + repost=pattern["repost"], + like=pattern["like"], + content=pattern["content"], + ) + pic_part = pattern["pic"] + return text_part, pic_part |