summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2022-06-14 21:25:04 +0800
committerKyomotoi <[email protected]>2022-06-14 21:25:04 +0800
commit34741109da2b45f24fbabc8d00c0194ab775411d (patch)
tree637d564b36204578cd149c89c0aa0d17ff1ff0c1
parent994ffbc8253bb10b684cc26ed5ec34eb0e7d61c1 (diff)
downloadATRI-34741109da2b45f24fbabc8d00c0194ab775411d.tar.gz
ATRI-34741109da2b45f24fbabc8d00c0194ab775411d.tar.bz2
ATRI-34741109da2b45f24fbabc8d00c0194ab775411d.zip
✨ 新增插件: Twitter动态订阅
-rw-r--r--ATRI/exceptions.py4
-rw-r--r--ATRI/plugins/twitter/__init__.py175
-rw-r--r--ATRI/plugins/twitter/api.py161
-rw-r--r--ATRI/plugins/twitter/data_source.py108
-rw-r--r--ATRI/plugins/twitter/db.py24
5 files changed, 472 insertions, 0 deletions
diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py
index ddf6872..63e27a1 100644
--- a/ATRI/exceptions.py
+++ b/ATRI/exceptions.py
@@ -90,6 +90,10 @@ class BilibiliDynamicError(BaseBotException):
prompt = "b站动态订阅错误"
+class TwitterDynamicError(BaseBotException):
+ prompt = "Twitter动态订阅错误"
+
+
@run_postprocessor
async def _track_error(exception: Optional[Exception], bot: Bot, event) -> None:
if not exception:
diff --git a/ATRI/plugins/twitter/__init__.py b/ATRI/plugins/twitter/__init__.py
new file mode 100644
index 0000000..76dc81e
--- /dev/null
+++ b/ATRI/plugins/twitter/__init__.py
@@ -0,0 +1,175 @@
+import pytz
+import asyncio
+from tabulate import tabulate
+from datetime import datetime, timedelta
+
+from apscheduler.triggers.base import BaseTrigger
+from apscheduler.triggers.combining import AndTrigger
+from apscheduler.triggers.interval import IntervalTrigger
+
+from nonebot import get_bot
+from nonebot.matcher import Matcher
+from nonebot.permission import Permission
+from nonebot.params import CommandArg, ArgPlainText
+from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent
+
+from ATRI.log import logger as log
+from ATRI.utils.apscheduler import scheduler
+from ATRI.database import TwitterSubscription
+
+from .data_source import TwitterDynamicSubscriptor
+
+
+add_sub = TwitterDynamicSubscriptor().cmd_as_group("add", "添加推主订阅")
+
+
+@add_sub.handle()
+async def _td_add_sub(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("td_add_sub_name", args)
+
+
+@add_sub.got("td_add_sub_name", "推主名呢?速速")
+async def _td_deal_add_sub(
+ event: GroupMessageEvent, _name: str = ArgPlainText("td_add_sub_name")
+):
+ group_id = event.group_id
+ sub = TwitterDynamicSubscriptor()
+
+ t_name, t_screen_name = await sub.get_twitter_username(_name)
+ if not t_name or not t_screen_name:
+ await add_sub.finish(f"无法获取名为 {_name} 的推主的信息...操作失败了")
+
+ res = await sub.get_twitter_user_info(_name)
+ tid = res["id"]
+
+ query_result = await sub.get_sub_list(tid, group_id)
+ if len(query_result):
+ await add_sub.finish(f"该推主 {t_name}@{t_screen_name}\n已在本群订阅列表中啦!")
+
+ await sub.add_sub(tid, group_id)
+ await sub.update_sub(
+ tid,
+ {
+ "name": t_name,
+ "screen_name": t_screen_name,
+ "last_update": datetime.utcnow(),
+ },
+ )
+ await add_sub.finish(f"成功订阅名为 {t_name}@{t_screen_name} 推主的动态~!")
+
+
+del_sub = TwitterDynamicSubscriptor().cmd_as_group("del", "删除推主订阅")
+
+
+@del_sub.handle()
+async def _td_del_sub(matcher: Matcher, args: Message = CommandArg()):
+ msg = args.extract_plain_text()
+ if msg:
+ matcher.set_arg("td_del_sub_name", args)
+
+
+@del_sub.got("td_del_sub_name", "推主名呢?速速")
+async def _td_deal_del_sub(
+ event: GroupMessageEvent, _name: str = ArgPlainText("td_del_sub_name")
+):
+ group_id = event.group_id
+ sub = TwitterDynamicSubscriptor()
+
+ t_name, t_screen_name = await sub.get_twitter_username(_name)
+ if not t_name or not t_screen_name:
+ await add_sub.finish(f"无法获取名为 {_name} 的推主的信息...操作失败了")
+
+ res = await sub.get_twitter_user_info(_name)
+ tid = res["id"]
+
+ query_result = await sub.get_sub_list(tid=tid, group_id=group_id)
+ if not query_result:
+ await del_sub.finish(f"取消订阅失败...该推主 {t_name}@{t_screen_name} 不在本群订阅列表中")
+
+ await sub.del_sub(t_screen_name, group_id)
+ await del_sub.finish(f"成功取消该推主 {t_name}@{t_screen_name} 的订阅~")
+
+
+get_sub_list = TwitterDynamicSubscriptor().cmd_as_group(
+ "list", "获取本群推主订阅列表", permission=Permission()
+)
+
+
+@get_sub_list.handle()
+async def _get_sub_list(event: GroupMessageEvent):
+ group_id = event.group_id
+ sub = TwitterDynamicSubscriptor()
+
+ query_result = await sub.get_sub_list(group_id=group_id)
+ if not query_result:
+ await get_sub_list.finish("本群还未订阅任何推主呢...")
+
+ subs = list()
+ for i in query_result:
+ tm = i.last_update.replace(tzinfo=pytz.timezone("Asia/Shanghai"))
+ subs.append([i.name, i.tid, tm + timedelta(hours=8)])
+
+ output = "本群订阅的推主列表如下~\n" + tabulate(
+ subs, headers=["推主", "tid", "最后更新时间"], tablefmt="plain", showindex=True
+ )
+ await get_sub_list.finish(output)
+
+
+tq = asyncio.Queue()
+
+
+class TwitterDynamicChecker(BaseTrigger):
+ def get_next_fire_time(self, previous_fire_time, now):
+ sub = TwitterDynamicSubscriptor()
+ conf = sub.load_service("推特动态订阅")
+ if conf.get("enabled"):
+ return now
+
+
+ AndTrigger([IntervalTrigger(seconds=30), TwitterDynamicChecker()]),
+ name="推特动态更新检查",
+ max_instances=3, # type: ignore
+ misfire_grace_time=60, # type: ignore
+)
+async def _check_td():
+ sub = TwitterDynamicSubscriptor()
+ try:
+ all_dy = await sub.get_all_subs()
+ except Exception:
+ log.debug("推特订阅列表为空 跳过")
+ return
+
+ if tq.empty():
+ for i in all_dy:
+ await tq.put(i)
+ else:
+ m: TwitterSubscription = tq.get_nowait()
+ log.info(f"准备查询推主 {m.name}@{m.screen_name} 的动态,队列剩余 {tq.qsize()}")
+
+ ts = int(m.last_update.timestamp())
+ info: dict = await sub.get_twitter_user_info(m.screen_name)
+ if not info.get("status", list()):
+ log.warning(f"无法获取推主 {m.name}@{m.screen_name} 的动态")
+ return
+
+ tid = info["id"]
+
+ t_time = info["status"]["created_at"]
+ time_patt = "%a %b %d %H:%M:%S +0000 %Y"
+ ts_t = datetime.strptime(t_time, time_patt).timestamp()
+
+ if ts < ts_t:
+ data = {
+ "name": info["name"],
+ "content": info["status"]["text"],
+ "pic": info["status"]["media"]["media_url"],
+ "s_id": info["status"]["id"],
+ }
+ content = sub.gen_output(data)
+
+ bot = get_bot()
+ await bot.send_group_msg(group_id=m.group_id, message=content)
+ await sub.update_sub(tid, {"group_id": m.group_id, "last_update": ts_t})
diff --git a/ATRI/plugins/twitter/api.py b/ATRI/plugins/twitter/api.py
new file mode 100644
index 0000000..2fefbb2
--- /dev/null
+++ b/ATRI/plugins/twitter/api.py
@@ -0,0 +1,161 @@
+import re
+from time import sleep
+
+from ATRI import driver
+from ATRI.log import logger as log
+from ATRI.utils import request
+from ATRI.service import ServiceTools
+from ATRI.exceptions import RequestError
+from ATRI.utils.apscheduler import scheduler
+
+
+_GUEST_TOKEN: str = str()
+_COOKIE: str = str()
+
+
+class API:
+ _bearer_token = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"
+ _user_agent = "Mozilla/5.0 (Linux; Android 2.3.6) AppleWebKit/532.2 (KHTML, like Gecko) Chrome/53.0.866.0 Safari/532.2"
+
+ def _gen_headers(self) -> dict:
+ return {
+ "origin": "https://twitter.com",
+ "authorization": self._bearer_token,
+ "cookie": _COOKIE,
+ "x-guest-token": _GUEST_TOKEN,
+ "x-twitter-active-user": "yes",
+ "sec-fetch-dest": "document",
+ "sec-fetch-mode": "navigate",
+ "sec-fetch-user": "?1",
+ "sec-fetch-site": "same-site",
+ "upgrade-insecure-requests": "1",
+ "user-agent": self._user_agent,
+ "accept": "application/json, text/plain, */*",
+ "dnt": "1",
+ "accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ja;q=0.6",
+ "x-twitter-client-language": "zh-cn",
+ }
+
+ async def get_token(self):
+ global _GUEST_TOKEN
+
+ headers = self._gen_headers()
+ del headers["cookie"]
+ del headers["x-guest-token"]
+ headers["Host"] = "api.twitter.com"
+
+ url = "https://api.twitter.com/1.1/guest/activate.json"
+ try:
+ resp = await request.post(url, headers=headers)
+ except RequestError:
+ raise RequestError("Request failed!")
+
+ data: dict = resp.json()
+ _GUEST_TOKEN = data.get("guest_token", str())
+
+ async def get_cookie(self):
+ global _COOKIE
+
+ headers = self._gen_headers()
+ del headers["cookie"]
+ del headers["authorization"]
+
+ url = "https://twitter.com/explore"
+ try:
+ resp = await request.get(url, headers=headers)
+ except RequestError:
+ raise RequestError("Request failed!")
+
+ data = str(resp.headers)
+
+ guest_id = str()
+ personalization_id = str()
+ ct0 = str()
+ twitter_sess = str()
+
+ patt_g_id = r"guest_id=.+?; "
+ patt_ct0 = r"ct0=.+?; "
+ patt_per = r"personalization_id=.+?; "
+ patt_t_p = r"(_twitter_sess=.+?);"
+
+ for _ in data:
+ if re.findall(patt_g_id, data):
+ guest_id = re.findall(patt_g_id, data)[0]
+ if re.findall(patt_ct0, data):
+ ct0 = re.findall(patt_ct0, data)[0]
+ if re.findall(patt_per, data):
+ personalization_id = re.findall(patt_per, data)[0]
+ if re.findall(patt_t_p, data):
+ twitter_sess = re.findall(patt_t_p, data)[0]
+
+ _COOKIE = f"dnt=1; fm=0; csrf_same_site_set=1; csrf_same_site=1; gt={_GUEST_TOKEN}; {ct0}{guest_id}{personalization_id}{twitter_sess}"
+
+ async def _request(self, url: str, params: dict = dict()) -> dict:
+ headers = self._gen_headers()
+ try:
+ resp = await request.get(url, params=params, headers=headers)
+ except RequestError:
+ raise RequestError("Request failed!")
+ return resp.json()
+
+ async def search_user(self, name: str) -> dict:
+ """通过传入的值搜索可能的 Twitter 用户
+
+ Args:
+ name (str): 目标名称.
+
+ Returns:
+ dict: 可能的用户信息. 默认返回第一个.
+ """
+ url = "https://api.twitter.com/1.1/users/search.json"
+ params = {"q": name, "count": 1}
+ data = await self._request(url, params)
+ if not data:
+ return dict()
+ return data[0]
+
+ async def get_conversation(self, tweet_id: int) -> dict:
+ """通过传入的值获取推文信息(无法工作)
+
+ Args:
+ tweet_id (int): 推文id
+
+ Returns:
+ dict: 返回json格式的推文信息
+ """
+ url = f"https://twitter.com/i/api/2/timeline/conversation/{tweet_id}.json"
+ params = {
+ "simple_quoted_tweet": "true",
+ "tweet_mode": "extended",
+ "trim_user": "true",
+ }
+ data = await self._request(url, params)
+ return data
+
+
+async def _regot_token():
+ api = API()
+ await api.get_token()
+ # await api.get_cookie()
+
+
+async def _check_status():
+ headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
+ }
+ try:
+ await request.get("https://twitter.com/", headers=headers)
+ log.success("已成功连接: Twitter")
+ except RequestError:
+ data = ServiceTools.load_service("推特动态订阅")
+ data["enabled"] = False
+ ServiceTools.save_service(data, "推特动态订阅")
+ log.warning("无法连接至 Twitter,这将导致相关插件无法工作. 已自动禁用. 3s后继续...")
+ sleep(3)
+
+
+scheduler.add_job(_regot_token, "interval", name="刷新推特凭据", minutes=30, misfire_grace_time=10) # type: ignore
+
+
+driver().on_startup(_regot_token)
+driver().on_startup(_check_status)
diff --git a/ATRI/plugins/twitter/data_source.py b/ATRI/plugins/twitter/data_source.py
new file mode 100644
index 0000000..1695820
--- /dev/null
+++ b/ATRI/plugins/twitter/data_source.py
@@ -0,0 +1,108 @@
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.onebot.v11 import MessageSegment
+from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN
+
+from ATRI.service import Service
+from ATRI.rule import is_in_service
+from ATRI.exceptions import TwitterDynamicError
+
+from .db import DB
+from .api import API
+
+
+_DYNAMIC_OUTPUT_FORMAT = """
+{t_nickname} 的推更新了!
+(限制 {limit_content} 字)
+{t_dy_content}
+{t_dy_media}
+链接: {t_dy_link}
+""".strip()
+
+
+class TwitterDynamicSubscriptor(Service):
+ def __init__(self):
+ Service.__init__(
+ self,
+ "推特动态订阅",
+ "推特动态订阅助手~",
+ rule=is_in_service("推特动态订阅"),
+ permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN,
+ main_cmd="/td",
+ )
+
+ async def add_sub(self, tid: int, group_id: int):
+ try:
+ async with DB() as db:
+ await db.add_sub(tid, group_id)
+ except TwitterDynamicError:
+ raise TwitterDynamicError("添加订阅失败")
+
+ async def update_sub(self, tid: int, update_map: dict):
+ try:
+ async with DB() as db:
+ await db.update_sub(tid, update_map)
+ except TwitterDynamicError:
+ raise TwitterDynamicError("更新订阅失败")
+
+ async def del_sub(self, screen_name: str, group_id: int):
+ try:
+ async with DB() as db:
+ await db.del_sub({"screen_name": screen_name, "group_id": group_id})
+ except TwitterDynamicError:
+ raise TwitterDynamicError("删除订阅失败")
+
+ async def get_sub_list(self, tid: int = int(), group_id: int = int()) -> list:
+ if not tid:
+ query_map = {"group_id": group_id}
+ else:
+ query_map = {"tid": tid, "group_id": group_id}
+
+ try:
+ async with DB() as db:
+ return await db.get_sub_list(query_map)
+ except TwitterDynamicError:
+ raise TwitterDynamicError("获取订阅列表失败")
+
+ async def get_all_subs(self) -> list:
+ try:
+ async with DB() as db:
+ return await db.get_all_subs()
+ except TwitterDynamicError:
+ raise TwitterDynamicError("获取全部订阅列表失败")
+
+ async def get_twitter_user_info(self, name: str) -> dict:
+ api = API()
+ resp = await api.search_user(name)
+ return resp
+
+ async def get_twitter_username(self, name: str) -> tuple:
+ data = await self.get_twitter_user_info(name)
+ _name = data.get("name", None)
+ screen_name = data.get("screen_name", None)
+ return _name, screen_name
+
+ async def gen_output(self, data: dict, limit_content: int = 100) -> str:
+ """生成动态信息
+
+ Args:
+ data (dict): dict形式的动态数据.
+ limit_content (int, optional): 内容字数限制. 默认 100.
+
+ Returns:
+ str: 动态信息
+ """
+ return _DYNAMIC_OUTPUT_FORMAT.format(
+ t_nickname=data["name"],
+ limit_content=limit_content,
+ t_dy_content=str(data["content"][:limit_content])
+ .replace("https://", str())
+ .replace("http://", str()),
+ t_dy_media=MessageSegment.image(data["pic"]) if data.get("pic") else str(),
+ t_dy_link="twitter.com/nihui/status/" + data["s_id"],
+ )
+
+
+# TODO
+# class TwitterHelper(Service):
+# def __init__(self):
+# Service.__init__(self, "推特助手", "推特小助手", rule=is_in_service("推特助手"))
diff --git a/ATRI/plugins/twitter/db.py b/ATRI/plugins/twitter/db.py
new file mode 100644
index 0000000..52bedd5
--- /dev/null
+++ b/ATRI/plugins/twitter/db.py
@@ -0,0 +1,24 @@
+from ATRI.database import TwitterSubscription
+
+
+class DB:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ async def add_sub(self, tid: int, group_id: int):
+ await TwitterSubscription.create(tid=tid, group_id=group_id)
+
+ async def update_sub(self, tid: int, update_map: dict):
+ await TwitterSubscription.filter(tid=tid).update(**update_map)
+
+ async def del_sub(self, query_map: dict):
+ await TwitterSubscription.filter(**query_map).delete()
+
+ async def get_sub_list(self, query_map: dict) -> list:
+ return await TwitterSubscription.filter(**query_map)
+
+ async def get_all_subs(self) -> list:
+ return await TwitterSubscription.all()