diff options
| author | Kyomotoi <kyomotoiowo@gmail.com> | 2022-06-14 21:25:04 +0800 | 
|---|---|---|
| committer | Kyomotoi <kyomotoiowo@gmail.com> | 2022-06-14 21:25:04 +0800 | 
| commit | 34741109da2b45f24fbabc8d00c0194ab775411d (patch) | |
| tree | 637d564b36204578cd149c89c0aa0d17ff1ff0c1 | |
| parent | 994ffbc8253bb10b684cc26ed5ec34eb0e7d61c1 (diff) | |
| download | ATRI-34741109da2b45f24fbabc8d00c0194ab775411d.tar.gz ATRI-34741109da2b45f24fbabc8d00c0194ab775411d.tar.bz2 ATRI-34741109da2b45f24fbabc8d00c0194ab775411d.zip | |
✨ 新增插件: Twitter动态订阅
| -rw-r--r-- | ATRI/exceptions.py | 4 | ||||
| -rw-r--r-- | ATRI/plugins/twitter/__init__.py | 175 | ||||
| -rw-r--r-- | ATRI/plugins/twitter/api.py | 161 | ||||
| -rw-r--r-- | ATRI/plugins/twitter/data_source.py | 108 | ||||
| -rw-r--r-- | ATRI/plugins/twitter/db.py | 24 | 
5 files changed, 472 insertions, 0 deletions
| diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py index ddf6872..63e27a1 100644 --- a/ATRI/exceptions.py +++ b/ATRI/exceptions.py @@ -90,6 +90,10 @@ class BilibiliDynamicError(BaseBotException):      prompt = "b站动态订阅错误" +class TwitterDynamicError(BaseBotException): +    prompt = "Twitter动态订阅错误" + +  @run_postprocessor  async def _track_error(exception: Optional[Exception], bot: Bot, event) -> None:      if not exception: diff --git a/ATRI/plugins/twitter/__init__.py b/ATRI/plugins/twitter/__init__.py new file mode 100644 index 0000000..76dc81e --- /dev/null +++ b/ATRI/plugins/twitter/__init__.py @@ -0,0 +1,175 @@ +import pytz +import asyncio +from tabulate import tabulate +from datetime import datetime, timedelta + +from apscheduler.triggers.base import BaseTrigger +from apscheduler.triggers.combining import AndTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from nonebot import get_bot +from nonebot.matcher import Matcher +from nonebot.permission import Permission +from nonebot.params import CommandArg, ArgPlainText +from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent + +from ATRI.log import logger as log +from ATRI.utils.apscheduler import scheduler +from ATRI.database import TwitterSubscription + +from .data_source import TwitterDynamicSubscriptor + + +add_sub = TwitterDynamicSubscriptor().cmd_as_group("add", "添加推主订阅") + + +@add_sub.handle() +async def _td_add_sub(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("td_add_sub_name", args) + + +@add_sub.got("td_add_sub_name", "推主名呢?速速") +async def _td_deal_add_sub( +    event: GroupMessageEvent, _name: str = ArgPlainText("td_add_sub_name") +): +    group_id = event.group_id +    sub = TwitterDynamicSubscriptor() + +    t_name, t_screen_name = await sub.get_twitter_username(_name) +    if not t_name or not t_screen_name: +        await add_sub.finish(f"无法获取名为 {_name} 的推主的信息...操作失败了") + +    res = await sub.get_twitter_user_info(_name) +    tid = res["id"] + +    query_result = await sub.get_sub_list(tid, group_id) +    if len(query_result): +        await add_sub.finish(f"该推主 {t_name}@{t_screen_name}\n已在本群订阅列表中啦!") + +    await sub.add_sub(tid, group_id) +    await sub.update_sub( +        tid, +        { +            "name": t_name, +            "screen_name": t_screen_name, +            "last_update": datetime.utcnow(), +        }, +    ) +    await add_sub.finish(f"成功订阅名为 {t_name}@{t_screen_name} 推主的动态~!") + + +del_sub = TwitterDynamicSubscriptor().cmd_as_group("del", "删除推主订阅") + + +@del_sub.handle() +async def _td_del_sub(matcher: Matcher, args: Message = CommandArg()): +    msg = args.extract_plain_text() +    if msg: +        matcher.set_arg("td_del_sub_name", args) + + +@del_sub.got("td_del_sub_name", "推主名呢?速速") +async def _td_deal_del_sub( +    event: GroupMessageEvent, _name: str = ArgPlainText("td_del_sub_name") +): +    group_id = event.group_id +    sub = TwitterDynamicSubscriptor() + +    t_name, t_screen_name = await sub.get_twitter_username(_name) +    if not t_name or not t_screen_name: +        await add_sub.finish(f"无法获取名为 {_name} 的推主的信息...操作失败了") + +    res = await sub.get_twitter_user_info(_name) +    tid = res["id"] + +    query_result = await sub.get_sub_list(tid=tid, group_id=group_id) +    if not query_result: +        await del_sub.finish(f"取消订阅失败...该推主 {t_name}@{t_screen_name} 不在本群订阅列表中") + +    await sub.del_sub(t_screen_name, group_id) +    await del_sub.finish(f"成功取消该推主 {t_name}@{t_screen_name} 的订阅~") + + +get_sub_list = TwitterDynamicSubscriptor().cmd_as_group( +    "list", "获取本群推主订阅列表", permission=Permission() +) + + +@get_sub_list.handle() +async def _get_sub_list(event: GroupMessageEvent): +    group_id = event.group_id +    sub = TwitterDynamicSubscriptor() + +    query_result = await sub.get_sub_list(group_id=group_id) +    if not query_result: +        await get_sub_list.finish("本群还未订阅任何推主呢...") + +    subs = list() +    for i in query_result: +        tm = i.last_update.replace(tzinfo=pytz.timezone("Asia/Shanghai")) +        subs.append([i.name, i.tid, tm + timedelta(hours=8)]) + +    output = "本群订阅的推主列表如下~\n" + tabulate( +        subs, headers=["推主", "tid", "最后更新时间"], tablefmt="plain", showindex=True +    ) +    await get_sub_list.finish(output) + + +tq = asyncio.Queue() + + +class TwitterDynamicChecker(BaseTrigger): +    def get_next_fire_time(self, previous_fire_time, now): +        sub = TwitterDynamicSubscriptor() +        conf = sub.load_service("推特动态订阅") +        if conf.get("enabled"): +            return now + + +@scheduler.scheduled_job( +    AndTrigger([IntervalTrigger(seconds=30), TwitterDynamicChecker()]), +    name="推特动态更新检查", +    max_instances=3,  # type: ignore +    misfire_grace_time=60,  # type: ignore +) +async def _check_td(): +    sub = TwitterDynamicSubscriptor() +    try: +        all_dy = await sub.get_all_subs() +    except Exception: +        log.debug("推特订阅列表为空 跳过") +        return + +    if tq.empty(): +        for i in all_dy: +            await tq.put(i) +    else: +        m: TwitterSubscription = tq.get_nowait() +        log.info(f"准备查询推主 {m.name}@{m.screen_name} 的动态,队列剩余 {tq.qsize()}") + +        ts = int(m.last_update.timestamp()) +        info: dict = await sub.get_twitter_user_info(m.screen_name) +        if not info.get("status", list()): +            log.warning(f"无法获取推主 {m.name}@{m.screen_name} 的动态") +            return + +        tid = info["id"] + +        t_time = info["status"]["created_at"] +        time_patt = "%a %b %d %H:%M:%S +0000 %Y" +        ts_t = datetime.strptime(t_time, time_patt).timestamp() + +        if ts < ts_t: +            data = { +                "name": info["name"], +                "content": info["status"]["text"], +                "pic": info["status"]["media"]["media_url"], +                "s_id": info["status"]["id"], +            } +            content = sub.gen_output(data) + +            bot = get_bot() +            await bot.send_group_msg(group_id=m.group_id, message=content) +            await sub.update_sub(tid, {"group_id": m.group_id, "last_update": ts_t}) diff --git a/ATRI/plugins/twitter/api.py b/ATRI/plugins/twitter/api.py new file mode 100644 index 0000000..2fefbb2 --- /dev/null +++ b/ATRI/plugins/twitter/api.py @@ -0,0 +1,161 @@ +import re +from time import sleep + +from ATRI import driver +from ATRI.log import logger as log +from ATRI.utils import request +from ATRI.service import ServiceTools +from ATRI.exceptions import RequestError +from ATRI.utils.apscheduler import scheduler + + +_GUEST_TOKEN: str = str() +_COOKIE: str = str() + + +class API: +    _bearer_token = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA" +    _user_agent = "Mozilla/5.0 (Linux; Android 2.3.6) AppleWebKit/532.2 (KHTML, like Gecko) Chrome/53.0.866.0 Safari/532.2" + +    def _gen_headers(self) -> dict: +        return { +            "origin": "https://twitter.com", +            "authorization": self._bearer_token, +            "cookie": _COOKIE, +            "x-guest-token": _GUEST_TOKEN, +            "x-twitter-active-user": "yes", +            "sec-fetch-dest": "document", +            "sec-fetch-mode": "navigate", +            "sec-fetch-user": "?1", +            "sec-fetch-site": "same-site", +            "upgrade-insecure-requests": "1", +            "user-agent": self._user_agent, +            "accept": "application/json, text/plain, */*", +            "dnt": "1", +            "accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ja;q=0.6", +            "x-twitter-client-language": "zh-cn", +        } + +    async def get_token(self): +        global _GUEST_TOKEN + +        headers = self._gen_headers() +        del headers["cookie"] +        del headers["x-guest-token"] +        headers["Host"] = "api.twitter.com" + +        url = "https://api.twitter.com/1.1/guest/activate.json" +        try: +            resp = await request.post(url, headers=headers) +        except RequestError: +            raise RequestError("Request failed!") + +        data: dict = resp.json() +        _GUEST_TOKEN = data.get("guest_token", str()) + +    async def get_cookie(self): +        global _COOKIE + +        headers = self._gen_headers() +        del headers["cookie"] +        del headers["authorization"] + +        url = "https://twitter.com/explore" +        try: +            resp = await request.get(url, headers=headers) +        except RequestError: +            raise RequestError("Request failed!") + +        data = str(resp.headers) + +        guest_id = str() +        personalization_id = str() +        ct0 = str() +        twitter_sess = str() + +        patt_g_id = r"guest_id=.+?; " +        patt_ct0 = r"ct0=.+?; " +        patt_per = r"personalization_id=.+?; " +        patt_t_p = r"(_twitter_sess=.+?);" + +        for _ in data: +            if re.findall(patt_g_id, data): +                guest_id = re.findall(patt_g_id, data)[0] +            if re.findall(patt_ct0, data): +                ct0 = re.findall(patt_ct0, data)[0] +            if re.findall(patt_per, data): +                personalization_id = re.findall(patt_per, data)[0] +            if re.findall(patt_t_p, data): +                twitter_sess = re.findall(patt_t_p, data)[0] + +        _COOKIE = f"dnt=1; fm=0; csrf_same_site_set=1; csrf_same_site=1; gt={_GUEST_TOKEN}; {ct0}{guest_id}{personalization_id}{twitter_sess}" + +    async def _request(self, url: str, params: dict = dict()) -> dict: +        headers = self._gen_headers() +        try: +            resp = await request.get(url, params=params, headers=headers) +        except RequestError: +            raise RequestError("Request failed!") +        return resp.json() + +    async def search_user(self, name: str) -> dict: +        """通过传入的值搜索可能的 Twitter 用户 + +        Args: +            name (str): 目标名称. + +        Returns: +            dict: 可能的用户信息. 默认返回第一个. +        """ +        url = "https://api.twitter.com/1.1/users/search.json" +        params = {"q": name, "count": 1} +        data = await self._request(url, params) +        if not data: +            return dict() +        return data[0] + +    async def get_conversation(self, tweet_id: int) -> dict: +        """通过传入的值获取推文信息(无法工作) + +        Args: +            tweet_id (int): 推文id + +        Returns: +            dict: 返回json格式的推文信息 +        """ +        url = f"https://twitter.com/i/api/2/timeline/conversation/{tweet_id}.json" +        params = { +            "simple_quoted_tweet": "true", +            "tweet_mode": "extended", +            "trim_user": "true", +        } +        data = await self._request(url, params) +        return data + + +async def _regot_token(): +    api = API() +    await api.get_token() +    # await api.get_cookie() + + +async def _check_status(): +    headers = { +        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" +    } +    try: +        await request.get("https://twitter.com/", headers=headers) +        log.success("已成功连接: Twitter") +    except RequestError: +        data = ServiceTools.load_service("推特动态订阅") +        data["enabled"] = False +        ServiceTools.save_service(data, "推特动态订阅") +        log.warning("无法连接至 Twitter,这将导致相关插件无法工作. 已自动禁用. 3s后继续...") +        sleep(3) + + +scheduler.add_job(_regot_token, "interval", name="刷新推特凭据", minutes=30, misfire_grace_time=10)  # type: ignore + + +driver().on_startup(_regot_token) +driver().on_startup(_check_status) diff --git a/ATRI/plugins/twitter/data_source.py b/ATRI/plugins/twitter/data_source.py new file mode 100644 index 0000000..1695820 --- /dev/null +++ b/ATRI/plugins/twitter/data_source.py @@ -0,0 +1,108 @@ +from nonebot.permission import SUPERUSER +from nonebot.adapters.onebot.v11 import MessageSegment +from nonebot.adapters.onebot.v11 import GROUP_OWNER, GROUP_ADMIN + +from ATRI.service import Service +from ATRI.rule import is_in_service +from ATRI.exceptions import TwitterDynamicError + +from .db import DB +from .api import API + + +_DYNAMIC_OUTPUT_FORMAT = """ +{t_nickname} 的推更新了! +(限制 {limit_content} 字) +{t_dy_content} +{t_dy_media} +链接: {t_dy_link} +""".strip() + + +class TwitterDynamicSubscriptor(Service): +    def __init__(self): +        Service.__init__( +            self, +            "推特动态订阅", +            "推特动态订阅助手~", +            rule=is_in_service("推特动态订阅"), +            permission=SUPERUSER | GROUP_OWNER | GROUP_ADMIN, +            main_cmd="/td", +        ) + +    async def add_sub(self, tid: int, group_id: int): +        try: +            async with DB() as db: +                await db.add_sub(tid, group_id) +        except TwitterDynamicError: +            raise TwitterDynamicError("添加订阅失败") + +    async def update_sub(self, tid: int, update_map: dict): +        try: +            async with DB() as db: +                await db.update_sub(tid, update_map) +        except TwitterDynamicError: +            raise TwitterDynamicError("更新订阅失败") + +    async def del_sub(self, screen_name: str, group_id: int): +        try: +            async with DB() as db: +                await db.del_sub({"screen_name": screen_name, "group_id": group_id}) +        except TwitterDynamicError: +            raise TwitterDynamicError("删除订阅失败") + +    async def get_sub_list(self, tid: int = int(), group_id: int = int()) -> list: +        if not tid: +            query_map = {"group_id": group_id} +        else: +            query_map = {"tid": tid, "group_id": group_id} + +        try: +            async with DB() as db: +                return await db.get_sub_list(query_map) +        except TwitterDynamicError: +            raise TwitterDynamicError("获取订阅列表失败") + +    async def get_all_subs(self) -> list: +        try: +            async with DB() as db: +                return await db.get_all_subs() +        except TwitterDynamicError: +            raise TwitterDynamicError("获取全部订阅列表失败") + +    async def get_twitter_user_info(self, name: str) -> dict: +        api = API() +        resp = await api.search_user(name) +        return resp + +    async def get_twitter_username(self, name: str) -> tuple: +        data = await self.get_twitter_user_info(name) +        _name = data.get("name", None) +        screen_name = data.get("screen_name", None) +        return _name, screen_name + +    async def gen_output(self, data: dict, limit_content: int = 100) -> str: +        """生成动态信息 + +        Args: +            data (dict): dict形式的动态数据. +            limit_content (int, optional): 内容字数限制. 默认 100. + +        Returns: +            str: 动态信息 +        """ +        return _DYNAMIC_OUTPUT_FORMAT.format( +            t_nickname=data["name"], +            limit_content=limit_content, +            t_dy_content=str(data["content"][:limit_content]) +            .replace("https://", str()) +            .replace("http://", str()), +            t_dy_media=MessageSegment.image(data["pic"]) if data.get("pic") else str(), +            t_dy_link="twitter.com/nihui/status/" + data["s_id"], +        ) + + +# TODO +# class TwitterHelper(Service): +#     def __init__(self): +#         Service.__init__(self, "推特助手", "推特小助手", rule=is_in_service("推特助手")) diff --git a/ATRI/plugins/twitter/db.py b/ATRI/plugins/twitter/db.py new file mode 100644 index 0000000..52bedd5 --- /dev/null +++ b/ATRI/plugins/twitter/db.py @@ -0,0 +1,24 @@ +from ATRI.database import TwitterSubscription + + +class DB: +    async def __aenter__(self): +        return self + +    async def __aexit__(self, exc_type, exc_val, exc_tb): +        pass + +    async def add_sub(self, tid: int, group_id: int): +        await TwitterSubscription.create(tid=tid, group_id=group_id) + +    async def update_sub(self, tid: int, update_map: dict): +        await TwitterSubscription.filter(tid=tid).update(**update_map) + +    async def del_sub(self, query_map: dict): +        await TwitterSubscription.filter(**query_map).delete() + +    async def get_sub_list(self, query_map: dict) -> list: +        return await TwitterSubscription.filter(**query_map) + +    async def get_all_subs(self) -> list: +        return await TwitterSubscription.all() | 
