summaryrefslogtreecommitdiff
path: root/ATRI/plugins/bilibili_dynamic/data_source.py
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-06-13 19:37:34 +0800
committerKyomotoi <[email protected]>2022-06-13 19:37:34 +0800
commit59c923bbc3e91ad5a3482ec81c9192297ebbebe8 (patch)
treed74da226542a8b6f54200bd6d55e8d459315c71d /ATRI/plugins/bilibili_dynamic/data_source.py
parent4a876e2e64a690dcf01c385e273052e1e22caacd (diff)
downloadATRI-59c923bbc3e91ad5a3482ec81c9192297ebbebe8.tar.gz
ATRI-59c923bbc3e91ad5a3482ec81c9192297ebbebe8.tar.bz2
ATRI-59c923bbc3e91ad5a3482ec81c9192297ebbebe8.zip
♻️ 重构插件: b站动态订阅
Diffstat (limited to 'ATRI/plugins/bilibili_dynamic/data_source.py')
-rw-r--r--ATRI/plugins/bilibili_dynamic/data_source.py373
1 files changed, 96 insertions, 277 deletions
diff --git a/ATRI/plugins/bilibili_dynamic/data_source.py b/ATRI/plugins/bilibili_dynamic/data_source.py
index 14143e6..5233692 100644
--- a/ATRI/plugins/bilibili_dynamic/data_source.py
+++ b/ATRI/plugins/bilibili_dynamic/data_source.py
@@ -1,286 +1,99 @@
-from ATRI.service import Service
-from ATRI.rule import is_in_service
-from ATRI.database.db import DB
-from ATRI.utils import timestamp2datetime
-
import json
-import aiohttp
-import os
-import re
-import asyncio
-from typing import Any
from operator import itemgetter
-__session_pool = {}
-
-
-def get_api(field: str) -> dict:
- """
- 获取 API。
-
- Args:
- field (str): API 所属分类,即 data/api 下的文件名(不含后缀名)
-
- Returns:
- dict, 该 API 的内容。
- """
- path = os.path.abspath(
- os.path.join(os.path.dirname(__file__), f"{field.lower()}.json")
- )
- if os.path.exists(path):
- with open(path, encoding="utf8") as f:
- return json.loads(f.read())
- else:
- return dict()
-
-
-API: dict = get_api("user")
-
-
-def get_session():
- """
- 获取当前模块的 aiohttp.ClientSession 对象,用于自定义请求
-
- Returns:
- aiohttp.ClientSession
- """
- loop = asyncio.get_event_loop()
- session = __session_pool.get(loop, None)
- if session is None:
- session = aiohttp.ClientSession(loop=loop)
- __session_pool[loop] = session
-
- return session
-
-
-async def bilibili_request(
- method: str,
- url: str,
- params: dict = dict(),
- data: Any = None,
- no_csrf: bool = False,
- json_body: bool = False,
- **kwargs,
-) -> dict:
- """
- 向接口发送请求。
-
- Args:
- method (str) : 请求方法。
- url (str) : 请求 URL。
- params (dict, optional) : 请求参数。
- data (Any, optional) : 请求载荷。
- no_csrf (bool, optional) : 不要自动添加 CSRF。
- json_body (bool, optional) 载荷是否为 JSON
-
- Returns:
- 接口未返回数据时,返回 None,否则返回该接口提供的 data 或 result 字段的数据。
- """
-
- method = method.upper()
-
- # 使用 Referer 和 UA 请求头以绕过反爬虫机制
- DEFAULT_HEADERS = {
- "Referer": "https://www.bilibili.com",
- "User-Agent": "Mozilla/5.0",
- }
- headers = DEFAULT_HEADERS
+from nonebot.adapters.onebot.v11 import MessageSegment
+from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN
- if params is None:
- params = {}
-
- # 自动添加 csrf
- if not no_csrf and method in ["POST", "DELETE", "PATCH"]:
- if data is None:
- data = {}
- data["csrf"] = ""
- data["csrf_token"] = ""
-
- # jsonp
-
- if params.get("jsonp", "") == "jsonp":
- params["callback"] = "callback"
+from ATRI.service import Service
+from ATRI.rule import is_in_service
+from ATRI.utils import timestamp2datetime
+from ATRI.exceptions import BilibiliDynamicError
- config = {
- "method": method,
- "url": url,
- "params": params,
- "data": data,
- "headers": headers,
- "cookies": "",
- }
+from .database import DB
+from .api import API
- config.update(kwargs)
- if json_body:
- config["headers"]["Content-Type"] = "application/json"
- config["data"] = json.dumps(config["data"])
+_OUTPUT_FORMAT = """
+{up_nickname} 的{up_dy_type}更新了!
+{up_dy_content}
+{up_dy_media}
+链接: {up_dy_link}
+""".strip()
- session = get_session()
- async with session.request(**config) as resp:
+class BilibiliDynamicSubscriptor(Service):
+ def __init__(self):
+ Service.__init__(
+ self,
+ "b站动态订阅",
+ "b站动态订阅助手~",
+ rule=is_in_service("b站动态订阅"),
+ permission=GROUP_OWNER | GROUP_ADMIN,
+ main_cmd="/bd",
+ )
- # 检查状态码
+ async def add_sub(self, uid: int, group_id: int):
try:
- resp.raise_for_status()
- except aiohttp.ClientResponseError as e:
- raise Exception(e.message)
-
- # 检查响应头 Content-Length
- content_length = resp.headers.get("content-length")
- if content_length and int(content_length) == 0:
- return dict()
+ async with DB() as db:
+ await db.add_sub(uid, group_id)
+ except BilibiliDynamicError:
+ raise BilibiliDynamicError("添加订阅失败")
- # 检查响应头 Content-Type
- content_type = resp.headers.get("content-type")
-
- # 不是 application/json
- if content_type.lower().index("application/json") == -1: # type: ignore
- raise Exception("响应不是 application/json 类型")
-
- raw_data = await resp.text()
- resp_data: dict = dict()
+ async def update_sub(self, uid: int, update_map: dict):
+ try:
+ async with DB() as db:
+ await db.update_sub(uid, update_map)
+ except BilibiliDynamicError:
+ BilibiliDynamicError("更新订阅失败")
- if "callback" in params:
- # JSONP 请求
- resp_data = json.loads(re.match("^.*?({.*}).*$", raw_data, re.S).group(1)) # type: ignore
+ async def del_sub(self, uid: int, group_id: int):
+ try:
+ async with DB() as db:
+ await db.del_sub({"uid": uid, "group_id": group_id})
+ except BilibiliDynamicError:
+ raise BilibiliDynamicError("删除订阅失败")
+
+ async def get_sub_list(self, uid: int = int(), group_id: int = int()) -> list:
+ if not uid:
+ query_map = {"group_id": group_id}
else:
- # JSON
- resp_data = json.loads(raw_data)
-
- # 检查 code
- code = resp_data.get("code", None)
-
- if code is None:
- raise Exception("API 返回数据未含 code 字段")
-
- if code != 0:
- msg = resp_data.get("msg", None)
- if msg is None:
- msg = resp_data.get("message", None)
- if msg is None:
- msg = "接口未返回错误信息"
- raise Exception(msg)
-
- real_data = resp_data.get("data", None)
- if real_data is None:
- real_data = resp_data.get("result", None)
- return real_data
-
+ query_map = {"uid": uid, "group_id": group_id}
-class User:
- """
- b站用户相关
- """
-
- def __init__(self, uid: int):
- """
- Args:
- uid (int) : 用户 UID
- """
- self.uid = uid
-
- self.__self_info = None # 暂时无用
-
- async def get_user_info(self) -> dict:
- """
- 获取用户信息(昵称,性别,生日,签名,头像 URL,空间横幅 URL 等)
-
- Returns:
- dict: 调用接口返回的内容。
- """
- api = API["info"]["info"]
- params = {"mid": self.uid}
- return await bilibili_request("GET", url=api["url"], params=params)
-
- async def get_dynamics(self, offset: int = 0, need_top: bool = False):
- """
- 获取用户动态。
-
- Args:
- offset (str, optional): 该值为第一次调用本方法时,数据中会有个 next_offset 字段,
- 指向下一动态列表第一条动态(类似单向链表)。
- 根据上一次获取结果中的 next_offset 字段值,
- 循环填充该值即可获取到全部动态。
- 0 为从头开始。
- Defaults to 0.
- need_top (bool, optional): 显示置顶动态. Defaults to False.
+ try:
+ async with DB() as db:
+ return await db.get_sub_list(query_map)
+ except BilibiliDynamicError:
+ raise BilibiliDynamicError("获取订阅列表失败")
- Returns:
- dict: 调用接口返回的内容。
- """
- api = API["info"]["dynamic"]
- params = {
- "host_uid": self.uid,
- "offset_dynamic_id": offset,
- "need_top": 1 if need_top else 0,
- }
- data: dict = await bilibili_request("GET", url=api["url"], params=params)
- # card 字段自动转换成 JSON。
+ async def get_all_subs(self) -> list:
+ try:
+ async with DB() as db:
+ return await db.get_all_subs()
+ except BilibiliDynamicError:
+ raise BilibiliDynamicError("获取全部订阅列表失败")
+
+ async def get_up_nickname(self, uid: int) -> str:
+ api = API(uid)
+ resp = await api.get_user_info()
+ data = resp.get("data", dict())
+ return data.get("name", "unknown")
+
+ async def get_up_recent_dynamic(self, uid: int) -> dict:
+ api = API(uid)
+ resp = await api.get_user_dynamics()
+ data = resp.get("data", dict())
if "cards" in data:
for card in data["cards"]:
card["card"] = json.loads(card["card"])
card["extend_json"] = json.loads(card["extend_json"])
return data
-
-class BilibiliDynamicSubscriptor(Service):
- def __init__(self):
- Service.__init__(self, "b站动态订阅", "b站订阅动态助手", rule=is_in_service("b站动态订阅"))
-
- async def add_subscription(self, uid: int, groupid: int) -> bool:
- async with DB() as db:
- res = await db.add_subscription(uid=uid, groupid=groupid)
- return res
-
- async def remove_subscription(self, uid: int, groupid: int) -> bool:
- async with DB() as db:
- res = await db.remove_subscription(
- query_map={"uid": uid, "groupid": groupid}
- )
- return res
-
- async def get_subscriptions(self, query_map: dict) -> list:
- async with DB() as db:
- res = await db.get_subscriptions(query_map=query_map)
- return res
-
- async def update_subscription_by_uid(self, uid: int, update_map: dict) -> bool:
- async with DB() as db:
- res = await db.update_subscriptions_by_uid(uid=uid, update_map=update_map)
- return res
-
- async def get_all_subscriptions(self) -> list:
- async with DB() as db:
- res = await db.get_all_subscriptions()
- return res
-
- # bilibili network function
-
- async def get_upname_by_uid(self, uid: int) -> str:
- try:
- u = User(uid)
- info: dict = await u.get_user_info()
- return info.get("name")
- except:
- return ""
-
- async def get_recent_dynamic_by_uid(self, uid: int) -> dict:
- try:
- u = User(uid)
- info = await u.get_dynamics()
- return info
- except:
- return {}
-
- def extract_dynamics_detail(self, dynamic_list: list) -> list:
- import time
-
- ret = []
- for d in dynamic_list:
+ def extract_dyanmic(self, data: list) -> list:
+ result = list()
+ for i in data:
pattern = {}
- desc = d["desc"]
- card = d["card"]
+ desc = i["desc"]
+ card = i["card"]
type = desc["type"]
# common 部分
@@ -292,11 +105,11 @@ class BilibiliDynamicSubscriptor(Service):
pattern["dynamic_id"] = desc["dynamic_id"]
pattern["timestamp"] = desc["timestamp"]
pattern["time"] = timestamp2datetime(desc["timestamp"])
- pattern["type_zh"] = ""
+ pattern["type_zh"] = str()
# alternative 部分
- pattern["content"] = ""
- pattern["pic"] = ""
+ pattern["content"] = str()
+ pattern["pic"] = str()
# 根据type区分 提取content
if type == 1: # 转发动态
@@ -329,19 +142,25 @@ class BilibiliDynamicSubscriptor(Service):
if len(card["image_urls"]) > 0:
pattern["pic"] = card["image_urls"][0]
- ret.append(pattern)
- ret = sorted(ret, key=itemgetter("timestamp"))
- return ret
+ result.append(pattern)
+ return sorted(result, key=itemgetter("timestamp"))
+
+ def gen_output(self, data: dict, limit_content: int = 100) -> str:
+ """生成动态信息
- def generate_output(self, pattern: dict) -> tuple:
- # 限制摘要的字数
- abstractLimit = 40
- text_part = """【UP名称】{name}\n【动态类型】{dynamic_type}\n【时间】{time}\n【内容摘要】{content}\n【链接】{url}\n""".format(
- name=pattern["name"],
- dynamic_type=pattern["type_zh"],
- time=pattern["time"],
- content=pattern["content"][:abstractLimit],
- url="https://t.bilibili.com/" + str(pattern["dynamic_id"]),
+ Args:
+ data (dict): dict形式的动态数据.
+ limit_content (int, optional): 内容字数限制. 默认 100.
+
+ Returns:
+ str: 动态信息
+ """
+ return _OUTPUT_FORMAT.format(
+ up_nickname=data["name"],
+ up_dy_type=data["type_zh"],
+ up_dy_content=str(data["content"][:limit_content] + "...")
+ .replace("https://", str())
+ .replace("http://", str()),
+ up_dy_media=MessageSegment.image(data["pic"]) if data.get("pic") else str(),
+ up_dy_link="https://t.bilibili.com/" + str(data["dynamic_id"]),
)
- pic_part = pattern["pic"]
- return text_part, pic_part