summaryrefslogtreecommitdiff
path: root/ATRI/plugins/twitter/data_source.py
blob: d1dd8d5412ed6e066689a0fcf589d86ed06f78e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from datetime import datetime

from ATRI import driver
from ATRI.service import ServiceTools
from ATRI.log import log
from ATRI.utils import request
from ATRI.utils.apscheduler import scheduler
from ATRI.message import MessageBuilder
from ATRI.exceptions import TwitterDynamicError

from .db import DB
from .api import API


_DYNAMIC_OUTPUT_FORMAT = (
    MessageBuilder("{t_nickname} 的推更新了!").text("{t_dy_content}").done()
)


class TwitterDynamicSubscriptor:
    async def __add_sub(self, tid: int, group_id: int):
        try:
            async with DB() as db:
                await db.add_sub(tid, group_id)
        except Exception:
            raise TwitterDynamicError("添加订阅失败")

    async def update_sub(self, tid: int, group_id: int, update_map: dict):
        try:
            async with DB() as db:
                await db.update_sub(tid, group_id, update_map)
        except Exception:
            raise TwitterDynamicError("更新订阅失败")

    async def __del_sub(self, tid: int, group_id: int):
        try:
            async with DB() as db:
                await db.del_sub({"tid": tid, "group_id": group_id})
        except Exception:
            raise TwitterDynamicError("删除订阅失败")

    async def get_sub_list(self, tid: int = int(), group_id: int = int()) -> list:
        if not tid:
            query_map = {"group_id": group_id}
        else:
            query_map = {"tid": tid, "group_id": group_id}

        try:
            async with DB() as db:
                return await db.get_sub_list(query_map)
        except Exception:
            raise TwitterDynamicError("获取订阅列表失败")

    async def get_all_subs(self) -> list:
        try:
            async with DB() as db:
                return await db.get_all_subs()
        except Exception:
            raise TwitterDynamicError("获取全部订阅列表失败")

    async def get_twitter_user_info(self, name: str) -> dict:
        api = API()
        resp = await api.search_user(name)
        return resp

    async def get_twitter_username(self, name: str) -> tuple:
        data = await self.get_twitter_user_info(name)
        _name = data.get("name", None)
        screen_name = data.get("screen_name", None)
        return _name, screen_name

    def gen_output(self, data: dict, content_limit) -> str:
        """生成动态信息

        Args:
            data (dict): dict形式的动态数据.
            limit_content (int, optional): 内容字数限制.

        Returns:
            str: 动态信息
        """
        if not content_limit:
            content = data["content"]
        else:
            content = data["content"][:content_limit]

        return _DYNAMIC_OUTPUT_FORMAT.format(
            t_nickname=data["name"],
            t_dy_content=str(content)
            .replace("https://", str())
            .replace("http://", str()),
        )

    async def add_sub(self, name: str, group_id: int) -> str:
        t_name, t_screen_name = await self.get_twitter_username(name)
        if not t_name or not t_screen_name:
            return f"无法获取名为 {name} 的推主的信息...操作失败了"

        res = await self.get_twitter_user_info(name)
        tid = res["id"]

        query_result = await self.get_sub_list(tid, group_id)
        if query_result:
            return f"该推主 {t_name}@{t_screen_name}\n已在本群订阅列表中啦!"

        await self.__add_sub(tid, group_id)
        await self.update_sub(
            tid,
            group_id,
            {
                "name": t_name,
                "screen_name": t_screen_name,
                "last_update": datetime.utcnow(),
            },
        )
        return f"成功订阅名为 {t_name}@{t_screen_name} 推主的动态~!"

    async def del_sub(self, tid: int, group_id: int) -> str:
        query_result = await self.get_sub_list(tid, group_id)
        if not query_result:
            return f"取消订阅失败...该tid: {tid} 不在本群订阅列表中"

        await self.__del_sub(tid, group_id)
        return f"成功取消tid为 {tid} 推主的订阅~"


# TODO
# class TwitterHelper(Service):
#     def __init__(self):
#         Service.__init__(self, "推特助手", "推特小助手", rule=is_in_service("推特助手"))


async def _regot_in_need():
    api = API()
    await api.get_token()
    # await api.get_cookie()


async def _check_status():
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
    }
    try:
        await request.get("https://twitter.com/", headers=headers)
        log.success("已成功连接: Twitter")
        await _regot_in_need()
        scheduler.add_job(_regot_in_need, "interval", name="刷新推特凭据", minutes=30, misfire_grace_time=10)  # type: ignore
    except Exception:
        ServiceTools("推特动态订阅").service_controller(False)
        log.warning("无法连接至 Twitter,这将导致相关插件无法工作. 已自动禁用.")


driver().on_startup(_check_status)