1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
import re
from ATRI.utils import request
from ATRI.exceptions import RequestError
_GUEST_TOKEN: str = str()
_COOKIE: str = str()
class API:
_bearer_token = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
_user_agent = "Mozilla/5.0 (Linux; Android 2.3.6) AppleWebKit/532.2 (KHTML, like Gecko) Chrome/53.0.866.0 Safari/532.2"
def _gen_headers(self) -> dict:
return {
"origin": "https://twitter.com",
"authorization": self._bearer_token,
"cookie": _COOKIE,
"x-guest-token": _GUEST_TOKEN,
"x-twitter-active-user": "yes",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-user": "?1",
"sec-fetch-site": "same-site",
"upgrade-insecure-requests": "1",
"user-agent": self._user_agent,
"accept": "application/json, text/plain, */*",
"dnt": "1",
"accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ja;q=0.6",
"x-twitter-client-language": "zh-cn",
}
async def get_token(self):
global _GUEST_TOKEN
headers = self._gen_headers()
del headers["cookie"]
del headers["x-guest-token"]
headers["Host"] = "api.twitter.com"
url = "https://api.twitter.com/1.1/guest/activate.json"
try:
resp = await request.post(url, headers=headers)
except Exception:
raise RequestError("Request failed!")
data: dict = resp.json()
_GUEST_TOKEN = data.get("guest_token", str())
async def get_cookie(self):
global _COOKIE
headers = self._gen_headers()
del headers["cookie"]
del headers["authorization"]
url = "https://twitter.com/explore"
try:
resp = await request.get(url, headers=headers)
except Exception:
raise RequestError("Request failed!")
data = str(resp.headers)
guest_id = str()
personalization_id = str()
ct0 = str()
twitter_sess = str()
patt_g_id = r"guest_id=.+?; "
patt_ct0 = r"ct0=.+?; "
patt_per = r"personalization_id=.+?; "
patt_t_p = r"(_twitter_sess=.+?);"
for _ in data:
if re.findall(patt_g_id, data):
guest_id = re.findall(patt_g_id, data)[0]
if re.findall(patt_ct0, data):
ct0 = re.findall(patt_ct0, data)[0]
if re.findall(patt_per, data):
personalization_id = re.findall(patt_per, data)[0]
if re.findall(patt_t_p, data):
twitter_sess = re.findall(patt_t_p, data)[0]
_COOKIE = f"dnt=1; fm=0; csrf_same_site_set=1; csrf_same_site=1; gt={_GUEST_TOKEN}; {ct0}{guest_id}{personalization_id}{twitter_sess}"
async def _request(self, url: str, params: dict = dict()) -> dict:
headers = self._gen_headers()
try:
resp = await request.get(url, params=params, headers=headers)
except Exception:
raise RequestError("Request failed!")
return resp.json()
async def search_user(self, name: str) -> dict:
"""通过传入的值搜索可能的 Twitter 用户
Args:
name (str): 目标名称.
Returns:
dict: 可能的用户信息. 默认返回第一个.
"""
url = "https://api.twitter.com/1.1/users/search.json"
params = {"q": name, "count": 1}
data = await self._request(url, params)
if not data:
return dict()
return data[0]
async def get_user_info(self, tid: int) -> dict:
"""通过提供的信息获取对应用户信息
Args:
tid (int): 用户ID
Returns:
dict: 用户信息.
"""
url = f"https://api.twitter.com/2/users/{tid}"
data = await self._request(url)
if not data:
return dict()
return data
async def get_conversation(self, tweet_id: int) -> dict:
"""通过传入的值获取推文信息
Args:
tweet_id (int): 推文id
Returns:
dict: 返回json格式的推文信息
"""
url = f"https://twitter.com/i/api/2/timeline/conversation/{tweet_id}.json"
params = {
"simple_quoted_tweet": True,
"tweet_mode": "extended",
"trim_user": True,
}
data = await self._request(url, params)
return data
|