summaryrefslogtreecommitdiff
path: root/ATRI/plugins/bilibili_dynamic/data_source.py
blob: d6294549daed2f198249f53d572b9be6c33ca2a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import json
from datetime import datetime, timedelta, timezone as tz
from operator import itemgetter

from ATRI.message import MessageBuilder
from ATRI.utils import TimeDealer
from ATRI.exceptions import BilibiliDynamicError

from .db import DB
from .api import API


_OUTPUT_FORMAT = (
    MessageBuilder("{up_nickname}{up_dy_type}更新了!")
    .text("(限制 {limit_content} 字)")
    .text("{up_dy_content}")
    .text("链接: {up_dy_link}")
    .done()
)


class BilibiliDynamicSubscriptor:
    async def __add_sub(self, uid: int, group_id: int):
        try:
            async with DB() as db:
                await db.add_sub(uid, group_id)
        except Exception:
            raise BilibiliDynamicError("添加订阅失败")

    async def update_sub(self, uid: int, group_id: int, update_map: dict):
        try:
            async with DB() as db:
                await db.update_sub(uid, group_id, update_map)
        except Exception:
            raise BilibiliDynamicError("更新订阅失败")

    async def __del_sub(self, uid: int, group_id: int):
        try:
            async with DB() as db:
                await db.del_sub({"uid": uid, "group_id": group_id})
        except Exception:
            raise BilibiliDynamicError("删除订阅失败")

    async def get_sub_list(self, uid: int = int(), group_id: int = int()) -> list:
        if not uid:
            query_map = {"group_id": group_id}
        else:
            query_map = {"uid": uid, "group_id": group_id}

        try:
            async with DB() as db:
                return await db.get_sub_list(query_map)
        except Exception:
            raise BilibiliDynamicError("获取订阅列表失败")

    async def get_all_subs(self) -> list:
        try:
            async with DB() as db:
                return await db.get_all_subs()
        except Exception:
            raise BilibiliDynamicError("获取全部订阅列表失败")

    async def __get_up_nickname(self, uid: int) -> str:
        api = API(uid)
        resp = await api.get_user_info()
        data = resp.get("data", dict())
        return data.get("name", "unknown")

    async def get_up_recent_dynamic(self, uid: int) -> dict:
        api = API(uid)
        resp = await api.get_user_dynamics()
        data = resp.get("data", dict())
        if not data:
            return dict()

        if "cards" in data:
            for card in data["cards"]:
                card["card"] = json.loads(card["card"])
                card["extend_json"] = json.loads(card["extend_json"])
        return data

    def extract_dyanmic(self, data: list) -> list:
        result = list()
        for i in data:
            pattern = {}
            desc = i["desc"]
            card = i["card"]
            type = desc["type"]

            # common 部分
            pattern["type"] = desc["type"]
            pattern["uid"] = desc["uid"]
            pattern["view"] = desc["view"]
            pattern["repost"] = desc["repost"]
            pattern["like"] = desc["like"]
            pattern["dynamic_id"] = desc["dynamic_id"]
            pattern["timestamp"] = desc["timestamp"]
            pattern["time"] = TimeDealer(
                float(desc["timestamp"]), tz(timedelta(hours=8))
            ).to_datetime()
            pattern["type_zh"] = str()

            # alternative 部分
            pattern["content"] = str()
            pattern["pic"] = str()

            # 根据type区分 提取content
            if type == 1:  # 转发动态
                pattern["type_zh"] = "转发动态"
                pattern["content"] = card["item"]["content"]

            elif type == 2:  # 普通动态(带多张图片)
                pattern["type_zh"] = "普通动态(附图)"
                pattern["content"] = card["item"]["description"]
                if card["item"]["pictures_count"] > 0:
                    if isinstance(card["item"]["pictures"][0], str):
                        pattern["pic"] = card["item"]["pictures"][0]
                    else:
                        pattern["pic"] = card["item"]["pictures"][0]["img_src"]

            elif type == 4:  # 普通动态(纯文字)
                pattern["type_zh"] = "普通动态(纯文字)"
                pattern["content"] = card["item"]["content"]
                # 无图片

            elif type == 8:  # 视频动态
                pattern["type_zh"] = "视频动态"
                pattern["content"] = card["dynamic"] + "\n视频标题: " + card["title"]
                pattern["pic"] = card["pic"]

            elif type == 64:  # 文章
                pattern["type_zh"] = "文章"
                pattern["content"] = card["title"] + card["summary"]
                if len(card["image_urls"]) > 0:
                    pattern["pic"] = card["image_urls"][0]

            result.append(pattern)
        return sorted(result, key=itemgetter("timestamp"))

    def gen_output(self, data: dict, content_limit) -> str:
        """生成动态信息

        Args:
            data (dict): dict形式的动态数据.
            limit_content (int, optional): 内容字数限制.

        Returns:
            str: 动态信息
        """
        if not content_limit:
            content = data["content"]
        else:
            content = data["content"][:content_limit]

        return _OUTPUT_FORMAT.format(
            up_nickname=data["name"],
            up_dy_type=data["type_zh"],
            limit_content=content_limit,
            up_dy_content=str(content)
            .replace("https://", str())
            .replace("http://", str()),
            up_dy_link="https://t.bilibili.com/" + str(data["dynamic_id"]),
        )

    async def add_sub(self, uid: int, group_id: int) -> str:
        up_nickname = await self.__get_up_nickname(uid)
        if not up_nickname:
            return f"无法获取id为 {uid} 的up主信息...操作失败了"

        query_result = await self.get_sub_list(uid, group_id)
        if query_result:
            return f"该up主 {up_nickname} 已在本群订阅列表中啦!"

        await self.__add_sub(uid, group_id)
        await self.update_sub(
            uid,
            group_id,
            {"up_nickname": up_nickname, "last_update": datetime.utcnow()},
        )
        return f"成功订阅名为 {up_nickname} up主的动态~!"

    async def del_sub(self, uid: int, group_id: int) -> str:
        query_result = await self.get_sub_list(uid, group_id)
        if not query_result:
            return f"该uid: {uid} 未在本群订阅列表中啦!"

        await self.__del_sub(uid, group_id)
        return f"成功取消订阅uid为 {uid} up主的动态~!"