diff options
-rw-r--r-- | ATRI/exceptions.py | 4 | ||||
-rw-r--r-- | ATRI/plugins/twitter/__init__.py | 175 | ||||
-rw-r--r-- | ATRI/plugins/twitter/api.py | 161 | ||||
-rw-r--r-- | ATRI/plugins/twitter/data_source.py | 108 | ||||
-rw-r--r-- | ATRI/plugins/twitter/db.py | 24 |
5 files changed, 472 insertions, 0 deletions
diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py index ddf6872..63e27a1 100644 --- a/ATRI/exceptions.py +++ b/ATRI/exceptions.py @@ -90,6 +90,10 @@ class BilibiliDynamicError(BaseBotException): prompt = "b站动态订阅错误" +class TwitterDynamicError(BaseBotException): + prompt = "Twitter动态订阅错误" + + @run_postprocessor async def _track_error(exception: Optional[Exception], bot: Bot, event) -> None: if not exception: diff --git a/ATRI/plugins/twitter/__init__.py b/ATRI/plugins/twitter/__init__.py new file mode 100644 index 0000000..76dc81e --- /dev/null +++ b/ATRI/plugins/twitter/__init__.py @@ -0,0 +1,175 @@ +import pytz +import asyncio +from tabulate import tabulate +from datetime import datetime, timedelta + +from apscheduler.triggers.base import BaseTrigger +from apscheduler.triggers.combining import AndTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from nonebot import get_bot +from nonebot.matcher import Matcher +from nonebot.permission import Permission +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent + +from ATRI.log import logger as log +from ATRI.utils.apscheduler import scheduler +from ATRI.database import TwitterSubscription + +from .data_source import TwitterDynamicSubscriptor + + +add_sub = TwitterDynamicSubscriptor().cmd_as_group("add", "添加推主订阅") + + +@add_sub.handle() +async def _td_add_sub(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("td_add_sub_name", args) + + +@add_sub.got("td_add_sub_name", "推主名呢?速速") +async def _td_deal_add_sub( + event: GroupMessageEvent, _name: str = ArgPlainText("td_add_sub_name") +): + group_id = event.group_id + sub = TwitterDynamicSubscriptor() + + t_name, t_screen_name = await sub.get_twitter_username(_name) + if not t_name or not t_screen_name: + await add_sub.finish(f"无法获取名为 {_name} 的推主的信息...操作失败了") + + res = await sub.get_twitter_user_info(_name) + tid = res["id"] + + query_result = await sub.get_sub_list(tid, group_id) + if len(query_result): + await add_sub.finish(f"该推主 {t_name}@{t_screen_name}\n已在本群订阅列表中啦!") + + await sub.add_sub(tid, group_id) + await sub.update_sub( + tid, + { + "name": t_name, + "screen_name": t_screen_name, + "last_update": datetime.utcnow(), + }, + ) + await add_sub.finish(f"成功订阅名为 {t_name}@{t_screen_name} 推主的动态~!") + + +del_sub = TwitterDynamicSubscriptor().cmd_as_group("del", "删除推主订阅") + + +@del_sub.handle() +async def _td_del_sub(matcher: Matcher, args: Message = CommandArg()): + msg = args.extract_plain_text() + if msg: + matcher.set_arg("td_del_sub_name", args) + + +@del_sub.got("td_del_sub_name", "推主名呢?速速") +async def _td_deal_del_sub( + event: GroupMessageEvent, _name: str = ArgPlainText("td_del_sub_name") +): + group_id = event.group_id + sub = TwitterDynamicSubscriptor() + + t_name, t_screen_name = await sub.get_twitter_username(_name) + if not t_name or not t_screen_name: + await add_sub.finish(f"无法获取名为 {_name} 的推主的信息...操作失败了") + + res = await sub.get_twitter_user_info(_name) + tid = res["id"] + + query_result = await sub.get_sub_list(tid=tid, group_id=group_id) + if not query_result: + await del_sub.finish(f"取消订阅失败...该推主 {t_name}@{t_screen_name} 不在本群订阅列表中") + + await sub.del_sub(t_screen_name, group_id) + await del_sub.finish(f"成功取消该推主 {t_name}@{t_screen_name} 的订阅~") + + +get_sub_list = TwitterDynamicSubscriptor().cmd_as_group( + "list", "获取本群推主订阅列表", permission=Permission() +) + + +@get_sub_list.handle() +async def _get_sub_list(event: GroupMessageEvent): + group_id = event.group_id + sub = TwitterDynamicSubscriptor() + + query_result = await sub.get_sub_list(group_id=group_id) + if not query_result: + await get_sub_list.finish("本群还未订阅任何推主呢...") + + subs = list() + for i in query_result: + tm = i.last_update.replace(tzinfo=pytz.timezone("Asia/Shanghai")) + subs.append([i.name, i.tid, tm + timedelta(hours=8)]) + + output = "本群订阅的推主列表如下~\n" + tabulate( + subs, headers=["推主", "tid", "最后更新时间"], tablefmt="plain", showindex=True + ) + await get_sub_list.finish(output) + + +tq = asyncio.Queue() + + +class TwitterDynamicChecker(BaseTrigger): + def get_next_fire_time(self, previous_fire_time, now): + sub = TwitterDynamicSubscriptor() + conf = sub.load_service("推特动态订阅") + if conf.get("enabled"): + return now + + [email protected]_job( + AndTrigger([IntervalTrigger(seconds=30), TwitterDynamicChecker()]), + name="推特动态更新检查", + max_instances=3, # type: ignore + misfire_grace_time=60, # type: ignore +) +async def _check_td(): + sub = TwitterDynamicSubscriptor() + try: + all_dy = await sub.get_all_subs() + except Exception: + log.debug("推特订阅列表为空 跳过") + return + + if tq.empty(): + for i in all_dy: + await tq.put(i) + else: + m: TwitterSubscription = tq.get_nowait() + log.info(f"准备查询推主 {m.name}@{m.screen_name} 的动态,队列剩余 {tq.qsize()}") + + ts = int(m.last_update.timestamp()) + info: dict = await sub.get_twitter_user_info(m.screen_name) + if not info.get("status", list()): + log.warning(f"无法获取推主 {m.name}@{m.screen_name} 的动态") + return + + tid = info["id"] + + t_time = info["status"]["created_at"] + time_patt = "%a %b %d %H:%M:%S +0000 %Y" + ts_t = datetime.strptime(t_time, time_patt).timestamp() + + if ts < ts_t: + data = { + "name": info["name"], + "content": info["status"]["text"], + "pic": info["status"]["media"]["media_url"], + "s_id": info["status"]["id"], + } + content = sub.gen_output(data) + + bot = get_bot() + await bot.send_group_msg(group_id=m.group_id, message=content) + await sub.update_sub(tid, {"group_id": m.group_id, "last_update": ts_t}) diff --git a/ATRI/plugins/twitter/api.py b/ATRI/plugins/twitter/api.py new file mode 100644 index 0000000..2fefbb2 --- /dev/null +++ b/ATRI/plugins/twitter/api.py @@ -0,0 +1,161 @@ +import re +from time import sleep + +from ATRI import driver +from ATRI.log import logger as log +from ATRI.utils import request +from ATRI.service import ServiceTools +from ATRI.exceptions import RequestError +from ATRI.utils.apscheduler import scheduler + + +_GUEST_TOKEN: str = str() +_COOKIE: str = str() + + +class API: + _bearer_token = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" + _user_agent = "Mozilla/5.0 (Linux; Android 2.3.6) AppleWebKit/532.2 (KHTML, like Gecko) Chrome/53.0.866.0 Safari/532.2" + + def _gen_headers(self) -> dict: + return { + "origin": "https://twitter.com", + "authorization": self._bearer_token, + "cookie": _COOKIE, + "x-guest-token": _GUEST_TOKEN, + "x-twitter-active-user": "yes", + "sec-fetch-dest": "document", + "sec-fetch-mode": "navigate", + "sec-fetch-user": "?1", + "sec-fetch-site": "same-site", + "upgrade-insecure-requests": "1", + "user-agent": self._user_agent, + "accept": "application/json, text/plain, */*", + "dnt": "1", + "accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ja;q=0.6", + "x-twitter-client-language": "zh-cn", + } + + async def get_token(self): + global _GUEST_TOKEN + + headers = self._gen_headers() + del headers["cookie"] + del headers["x-guest-token"] + headers["Host"] = "api.twitter.com" + + url = "https://api.twitter.com/1.1/guest/activate.json" + try: + resp = await request.post(url, headers=headers) + except RequestError: + raise RequestError("Request failed!") + + data: dict = resp.json() + _GUEST_TOKEN = data.get("guest_token", str()) + + async def get_cookie(self): + global _COOKIE + + headers = self._gen_headers() + del headers["cookie"] + del headers["authorization"] + + url = "https://twitter.com/explore" + try: + resp = await request.get(url, headers=headers) + except RequestError: + raise RequestError("Request failed!") + + data = str(resp.headers) + + guest_id = str() + personalization_id = str() + ct0 = str() + twitter_sess = str() + + patt_g_id = r"guest_id=.+?; " + patt_ct0 = r"ct0=.+?; " + patt_per = r"personalization_id=.+?; " + patt_t_p = r"(_twitter_sess=.+?);" + + for _ in data: + if re.findall(patt_g_id, data): + guest_id = re.findall(patt_g_id, data)[0] + if re.findall(patt_ct0, data): + ct0 = re.findall(patt_ct0, data)[0] + if re.findall(patt_per, data): + personalization_id = re.findall(patt_per, data)[0] + if re.findall(patt_t_p, data): + twitter_sess = re.findall(patt_t_p, data)[0] + + _COOKIE = f"dnt=1; fm=0; csrf_same_site_set=1; csrf_same_site=1; gt={_GUEST_TOKEN}; {ct0}{guest_id}{personalization_id}{twitter_sess}" + + async def _request(self, url: str, params: dict = dict()) -> dict: + headers = self._gen_headers() + try: + resp = await request.get(url, params=params, headers=headers) + except RequestError: + raise RequestError("Request failed!") + return resp.json() + + async def search_user(self, name: str) -> dict: + """通过传入的值搜索可能的 Twitter 用户 + + Args: + name (str): 目标名称. + + Returns: + dict: 可能的用户信息. 默认返回第一个. + """ + url = "https://api.twitter.com/1.1/users/search.json" + params = {"q": name, "count": 1} + data = await self._request(url, params) + if not data: + return dict() + return data[0] + + async def get_conversation(self, tweet_id: int) -> dict: + """通过传入的值获取推文信息(无法工作) + + Args: + tweet_id (int): 推文id + + Returns: + dict: 返回json格式的推文信息 + """ + url = f"https://twitter.com/i/api/2/timeline/conversation/{tweet_id}.json" + params = { + "simple_quoted_tweet": "true", + "tweet_mode": "extended", + "trim_user": "true", + } + data = await self._request(url, params) + return data + + +async def _regot_token(): + api = API() + await api.get_token() + # await api.get_cookie() + + +async def _check_status(): + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" + } + try: + await request.get("https://twitter.com/", headers=headers) + log.success("已成功连接: Twitter") + except RequestError: + data = ServiceTools.load_service("推特动态订阅") + data["enabled"] = False + ServiceTools.save_service(data, "推特动态订阅") + log.warning("无法连接至 Twitter,这将导致相关插件无法工作. 已自动禁用. 3s后继续...") + sleep(3) + + +scheduler.add_job(_regot_token, "interval", name="刷新推特凭据", minutes=30, misfire_grace_time=10) # type: ignore + + +driver().on_startup(_regot_token) +driver().on_startup(_check_status) diff --git a/ATRI/plugins/twitter/data_source.py b/ATRI/plugins/twitter/data_source.py new file mode 100644 index 0000000..1695820 --- /dev/null +++ b/ATRI/plugins/twitter/data_source.py @@ -0,0 +1,108 @@ +from nonebot.permission import SUPERUSER +from nonebot.adapters.onebot.v11 import MessageSegment +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.exceptions import TwitterDynamicError + +from .db import DB +from .api import API + + +_DYNAMIC_OUTPUT_FORMAT = """ +{t_nickname} 的推更新了! +(限制 {limit_content} 字) +{t_dy_content} +{t_dy_media} +链接: {t_dy_link} +""".strip() + + +class TwitterDynamicSubscriptor(Service): + def __init__(self): + Service.__init__( + self, + "推特动态订阅", + "推特动态订阅助手~", + rule=is_in_service("推特动态订阅"), + permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN, + main_cmd="/td", + ) + + async def add_sub(self, tid: int, group_id: int): + try: + async with DB() as db: + await db.add_sub(tid, group_id) + except TwitterDynamicError: + raise TwitterDynamicError("添加订阅失败") + + async def update_sub(self, tid: int, update_map: dict): + try: + async with DB() as db: + await db.update_sub(tid, update_map) + except TwitterDynamicError: + raise TwitterDynamicError("更新订阅失败") + + async def del_sub(self, screen_name: str, group_id: int): + try: + async with DB() as db: + await db.del_sub({"screen_name": screen_name, "group_id": group_id}) + except TwitterDynamicError: + raise TwitterDynamicError("删除订阅失败") + + async def get_sub_list(self, tid: int = int(), group_id: int = int()) -> list: + if not tid: + query_map = {"group_id": group_id} + else: + query_map = {"tid": tid, "group_id": group_id} + + try: + async with DB() as db: + return await db.get_sub_list(query_map) + except TwitterDynamicError: + raise TwitterDynamicError("获取订阅列表失败") + + async def get_all_subs(self) -> list: + try: + async with DB() as db: + return await db.get_all_subs() + except TwitterDynamicError: + raise TwitterDynamicError("获取全部订阅列表失败") + + async def get_twitter_user_info(self, name: str) -> dict: + api = API() + resp = await api.search_user(name) + return resp + + async def get_twitter_username(self, name: str) -> tuple: + data = await self.get_twitter_user_info(name) + _name = data.get("name", None) + screen_name = data.get("screen_name", None) + return _name, screen_name + + async def gen_output(self, data: dict, limit_content: int = 100) -> str: + """生成动态信息 + + Args: + data (dict): dict形式的动态数据. + limit_content (int, optional): 内容字数限制. 默认 100. + + Returns: + str: 动态信息 + """ + return _DYNAMIC_OUTPUT_FORMAT.format( + t_nickname=data["name"], + limit_content=limit_content, + t_dy_content=str(data["content"][:limit_content]) + .replace("https://", str()) + .replace("http://", str()), + t_dy_media=MessageSegment.image(data["pic"]) if data.get("pic") else str(), + t_dy_link="twitter.com/nihui/status/" + data["s_id"], + ) + + +# TODO +# class TwitterHelper(Service): +# def __init__(self): +# Service.__init__(self, "推特助手", "推特小助手", rule=is_in_service("推特助手")) diff --git a/ATRI/plugins/twitter/db.py b/ATRI/plugins/twitter/db.py new file mode 100644 index 0000000..52bedd5 --- /dev/null +++ b/ATRI/plugins/twitter/db.py @@ -0,0 +1,24 @@ +from ATRI.database import TwitterSubscription + + +class DB: + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + async def add_sub(self, tid: int, group_id: int): + await TwitterSubscription.create(tid=tid, group_id=group_id) + + async def update_sub(self, tid: int, update_map: dict): + await TwitterSubscription.filter(tid=tid).update(**update_map) + + async def del_sub(self, query_map: dict): + await TwitterSubscription.filter(**query_map).delete() + + async def get_sub_list(self, query_map: dict) -> list: + return await TwitterSubscription.filter(**query_map) + + async def get_all_subs(self) -> list: + return await TwitterSubscription.all() |