1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
from nonebot.permission import SUPERUSER
from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN
from ATRI import driver
from ATRI.service import Service, ServiceTools
from ATRI.rule import is_in_service
from ATRI.log import logger as log
from ATRI.utils import request
from ATRI.utils.apscheduler import scheduler
from ATRI.exceptions import TwitterDynamicError
from .db import DB
from .api import API
_DYNAMIC_OUTPUT_FORMAT = """
{t_nickname} 的推更新了!
{t_dy_content}
""".strip()
class TwitterDynamicSubscriptor(Service):
def __init__(self):
Service.__init__(
self,
"推特动态订阅",
"推特动态订阅助手~",
rule=is_in_service("推特动态订阅"),
permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN,
main_cmd="/td",
)
async def add_sub(self, tid: int, group_id: int):
try:
async with DB() as db:
await db.add_sub(tid, group_id)
except Exception:
raise TwitterDynamicError("添加订阅失败")
async def update_sub(self, tid: int, group_id: int, update_map: dict):
try:
async with DB() as db:
await db.update_sub(tid, group_id, update_map)
except Exception:
raise TwitterDynamicError("更新订阅失败")
async def del_sub(self, screen_name: str, group_id: int):
try:
async with DB() as db:
await db.del_sub({"screen_name": screen_name, "group_id": group_id})
except Exception:
raise TwitterDynamicError("删除订阅失败")
async def get_sub_list(self, tid: int = int(), group_id: int = int()) -> list:
if not tid:
query_map = {"group_id": group_id}
else:
query_map = {"tid": tid, "group_id": group_id}
try:
async with DB() as db:
return await db.get_sub_list(query_map)
except Exception:
raise TwitterDynamicError("获取订阅列表失败")
async def get_all_subs(self) -> list:
try:
async with DB() as db:
return await db.get_all_subs()
except Exception:
raise TwitterDynamicError("获取全部订阅列表失败")
async def get_twitter_user_info(self, name: str) -> dict:
api = API()
resp = await api.search_user(name)
return resp
async def get_twitter_username(self, name: str) -> tuple:
data = await self.get_twitter_user_info(name)
_name = data.get("name", None)
screen_name = data.get("screen_name", None)
return _name, screen_name
def gen_output(self, data: dict, content_limit) -> str:
"""生成动态信息
Args:
data (dict): dict形式的动态数据.
limit_content (int, optional): 内容字数限制.
Returns:
str: 动态信息
"""
if not content_limit:
content = data["content"]
else:
content = data["content"][:content_limit]
return _DYNAMIC_OUTPUT_FORMAT.format(
t_nickname=data["name"],
t_dy_content=str(content)
.replace("https://", str())
.replace("http://", str()),
)
# TODO
# class TwitterHelper(Service):
# def __init__(self):
# Service.__init__(self, "推特助手", "推特小助手", rule=is_in_service("推特助手"))
async def _regot_in_need():
api = API()
await api.get_token()
# await api.get_cookie()
async def _check_status():
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
}
try:
await request.get("https://twitter.com/", headers=headers)
log.success("已成功连接: Twitter")
await _regot_in_need()
scheduler.add_job(_regot_in_need, "interval", name="刷新推特凭据", minutes=30, misfire_grace_time=10) # type: ignore
except Exception:
data = ServiceTools.load_service("推特动态订阅")
data["enabled"] = False
ServiceTools.save_service(data, "推特动态订阅")
log.warning("无法连接至 Twitter,这将导致相关插件无法工作. 已自动禁用.")
driver().on_startup(_check_status)
|