summaryrefslogtreecommitdiff
path: root/ATRI/plugins/rich
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/rich')
-rw-r--r--ATRI/plugins/rich/__init__.py76
-rw-r--r--ATRI/plugins/rich/data_source.py108
2 files changed, 112 insertions, 72 deletions
diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py
index 476eb88..87a2b02 100644
--- a/ATRI/plugins/rich/__init__.py
+++ b/ATRI/plugins/rich/__init__.py
@@ -1,70 +1,26 @@
-import re
-import json
-from random import choice
-from aiohttp.client import ClientSession
-
from nonebot.adapters.cqhttp import Bot, MessageEvent
-from nonebot.adapters.cqhttp.message import MessageSegment
-
-from ATRI.service import Service as sv
-from ATRI.utils.request import get_bytes
-from ATRI.utils.limit import is_too_exciting
-
-from .data_source import dec
+from ATRI.utils.limit import FreqLimiter
+from .data_source import Rich
-temp_list = []
-img_url = [
- "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/fkrich.png",
- "https://cdn.jsdelivr.net/gh/Kyomotoi/CDN@master/project/ATRI/xixi.jpg",
-]
+_rich_flmt = FreqLimiter(2)
-bilibili_rich = sv.on_message()
+bili_rich = Rich().on_message("小程序爪巴", block=False)
-@bilibili_rich.handle()
-async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None:
- global temp_list
+@bili_rich.handle()
+async def _fk_bili(bot: Bot, event: MessageEvent):
+ user_id = event.get_user_id()
+ if not _rich_flmt.check(user_id):
+ return
+
+ msg = str(event.message)
try:
- msg = str(event.raw_message).replace("\\", "")
- bv = False
-
- if "qqdocurl" not in msg:
- if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace("av", "")
- else:
- bv = re.findall(r"(BV\w+)", msg)
- av = str(dec(bv[0]))
- else:
- patt = r"(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+"
- bv_url = re.findall(patt, msg)
- bv_url = bv_url[3]
- async with ClientSession() as session:
- async with session.get(url=bv_url) as r:
- bv = re.findall(r"(BV\w+)", str(r.url))
- av = dec(bv[0])
-
- if not bv:
- if "av" in msg:
- av = re.findall(r"(av\d+)", msg)[0].replace("av", "")
- else:
- return
-
- user = event.user_id
- check = is_too_exciting(user, 1, 10)
- if not check:
- return
-
- URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}"
- data = json.loads(await get_bytes(URL))["data"]
- repo = (
- f"{data['bvid']} INFO:\n"
- f"Title: {data['title']}\n"
- f"Link: {data['short_link']}\n"
- "にまねげぴのTencent rich!"
- )
- await bot.send(event, MessageSegment.image(file=choice(img_url)))
- await bilibili_rich.finish(repo)
+ result, is_ok = await Rich().fk_bili(msg)
except BaseException:
return
+ if not is_ok:
+ return
+ _rich_flmt.start_cd(user_id)
+ await bili_rich.finish(result)
diff --git a/ATRI/plugins/rich/data_source.py b/ATRI/plugins/rich/data_source.py
index 59474ff..3277a58 100644
--- a/ATRI/plugins/rich/data_source.py
+++ b/ATRI/plugins/rich/data_source.py
@@ -1,5 +1,15 @@
+import re
+
+from ATRI.service import Service
+from ATRI.utils import request
+from ATRI.rule import is_in_service
+from ATRI.exceptions import RequestError
+
+
+URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid="
+
table = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF"
-tr = {}
+tr = dict()
for i in range(58):
tr[table[i]] = i
s = [11, 10, 3, 8, 4, 6]
@@ -7,16 +17,90 @@ xor = 177451812
add = 8728348608
-def dec(x) -> int:
- r = 0
- for i in range(6):
- r += tr[x[s[i]]] * 58 ** i
- return (r - add) ^ xor
+__doc__ = """
+啥b腾讯小程序给👴爪巴
+目前只整了b站的
+"""
-def enc(x) -> str:
- x = (x ^ xor) + add
- r = list("BV1 4 1 7 ")
- for i in range(6):
- r[s[i]] = table[x // 58 ** i % 58]
- return "".join(r)
+class Rich(Service):
+
+ def __init__(self):
+ Service.__init__(self, "小程序处理", __doc__, rule=is_in_service("小程序处理"))
+
+ @staticmethod
+ def _bv_dec(x) -> str:
+ r = 0
+ for i in range(6):
+ r += tr[x[s[i]]] * 58 ** i
+ result = "av" + str((r - add) ^ xor)
+ return result
+
+ @staticmethod
+ def _bv_enc(x) -> str:
+ x = (x ^ xor) + add
+ r = list("BV1 4 1 7 ")
+ for i in range(6):
+ r[s[i]] = table[x // 58 ** i % 58]
+ return "".join(r)
+
+ @classmethod
+ async def fk_bili(cls, text: str) -> tuple:
+ """
+ 为何本函数这么多 try,因为此函数被用于监听所有信息
+ 如果真出现错误,就会一直刷屏
+ """
+ msg = text.replace("\\", "")
+ bv = False
+
+ if "qqdocurl" not in msg:
+ if "av" in msg:
+ av = re.findall(r"(av\d+)", msg)
+ if not av:
+ return "Get value (av) failed!", False
+ av = av[0].replace("av", "")
+ else:
+ bv = re.findall(r"([Bb][Vv]\w+)", msg)
+ if not bv:
+ return "Get value (bv) failed!", False
+ av = str(cls._bv_dec(bv[0])).replace("av", "")
+ else:
+ pattern = r"(?:(?:https?):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+"
+ bv_url = re.findall(pattern, msg)
+ if not bv_url:
+ return "Get value (bv url) failed!", False
+ bv_url = bv_url[3]
+
+ try:
+ res = await request.get(bv_url)
+ except RequestError:
+ return "Request failed!", False
+ bv = re.findall(r"(BV\w+)", str(res.url))
+ if not bv:
+ return "Get value (bv) failed!", False
+ av = cls._bv_dec(bv[0])
+
+ if not bv:
+ if "av" in msg:
+ av = re.findall(r"(av\d+)", msg)
+ if not av:
+ return "Get value (av) failed!", False
+ av = av[0].replace("av", "")
+ else:
+ return "Not found av", False
+
+ url = URL + av
+ try:
+ res = await request.get(url)
+ except RequestError:
+ return "Request failed!", False
+ res_data = await res.json()
+ data = res_data["data"]
+
+ result = (
+ f"{data['bvid']} INFO:\n"
+ f"Title: {data['title']}\n"
+ f"Link: {data['short_link']}"
+ )
+ return result, True
+ \ No newline at end of file