summaryrefslogtreecommitdiff
path: root/ATRI/plugins/twitter/data_source.py
blob: 0ea7231a535b26625ac49bdd1279d9209f52d2a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from nonebot.permission import SUPERUSER
from nonebot.adapters.onebot.v11 import MessageSegment
from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN

from ATRI.service import Service
from ATRI.rule import is_in_service
from ATRI.exceptions import TwitterDynamicError

from .db import DB
from .api import API


_DYNAMIC_OUTPUT_FORMAT = """
{t_nickname} 的推更新了!
(限制 {limit_content} 字)
{t_dy_content}
{t_dy_media}
链接: {t_dy_link}
""".strip()


class TwitterDynamicSubscriptor(Service):
    def __init__(self):
        Service.__init__(
            self,
            "推特动态订阅",
            "推特动态订阅助手~",
            rule=is_in_service("推特动态订阅"),
            permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN,
            main_cmd="/td",
        )

    async def add_sub(self, tid: int, group_id: int):
        try:
            async with DB() as db:
                await db.add_sub(tid, group_id)
        except TwitterDynamicError:
            raise TwitterDynamicError("添加订阅失败")

    async def update_sub(self, tid: int, group_id: int, update_map: dict):
        try:
            async with DB() as db:
                await db.update_sub(tid, group_id, update_map)
        except TwitterDynamicError:
            raise TwitterDynamicError("更新订阅失败")

    async def del_sub(self, screen_name: str, group_id: int):
        try:
            async with DB() as db:
                await db.del_sub({"screen_name": screen_name, "group_id": group_id})
        except TwitterDynamicError:
            raise TwitterDynamicError("删除订阅失败")

    async def get_sub_list(self, tid: int = int(), group_id: int = int()) -> list:
        if not tid:
            query_map = {"group_id": group_id}
        else:
            query_map = {"tid": tid, "group_id": group_id}

        try:
            async with DB() as db:
                return await db.get_sub_list(query_map)
        except TwitterDynamicError:
            raise TwitterDynamicError("获取订阅列表失败")

    async def get_all_subs(self) -> list:
        try:
            async with DB() as db:
                return await db.get_all_subs()
        except TwitterDynamicError:
            raise TwitterDynamicError("获取全部订阅列表失败")

    async def get_twitter_user_info(self, name: str) -> dict:
        api = API()
        resp = await api.search_user(name)
        return resp

    async def get_twitter_username(self, name: str) -> tuple:
        data = await self.get_twitter_user_info(name)
        _name = data.get("name", None)
        screen_name = data.get("screen_name", None)
        return _name, screen_name

    def gen_output(self, data: dict, limit_content: int = 100) -> str:
        """生成动态信息

        Args:
            data (dict): dict形式的动态数据.
            limit_content (int, optional): 内容字数限制. 默认 100.

        Returns:
            str: 动态信息
        """
        return _DYNAMIC_OUTPUT_FORMAT.format(
            t_nickname=data["name"],
            limit_content=limit_content,
            t_dy_content=str(data["content"][:limit_content])
            .replace("https://", str())
            .replace("http://", str()),
            t_dy_media=MessageSegment.image(data["pic"]) if data.get("pic") else str(),
            t_dy_link=str(data["to_url"]).replace("https://", str()),
        )


# TODO
# class TwitterHelper(Service):
#     def __init__(self):
#         Service.__init__(self, "推特助手", "推特小助手", rule=is_in_service("推特助手"))