summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ATRI/plugins/applet/__init__.py12
-rw-r--r--ATRI/plugins/applet/data_source.py66
-rw-r--r--ATRI/utils/request.py3
3 files changed, 37 insertions, 44 deletions
diff --git a/ATRI/plugins/applet/__init__.py b/ATRI/plugins/applet/__init__.py
index d6e2ab6..8d89514 100644
--- a/ATRI/plugins/applet/__init__.py
+++ b/ATRI/plugins/applet/__init__.py
@@ -1,21 +1,21 @@
from nonebot.adapters.onebot.v11 import MessageEvent
from nonebot.adapters.onebot.v11.helpers import Cooldown
-from ATRI.log import logger as log
from .data_source import Applet
-bili_applet = Applet().on_message("小程序检测", "小程序爪巴", block=False)
+bili_applet = Applet().on_message("b站小程序检测", "B站小程序爪巴", block=False)
@bili_applet.handle([Cooldown(3)])
-async def _fk_bili(event: MessageEvent):
- msg = str(event.message)
+async def _(event: MessageEvent):
+ msg = str(event.get_message())
try:
- result, is_ok = await Applet().fk_bili(msg)
+ result, is_ok = await Applet().msg_builder(msg)
except Exception:
return
- log.debug(result, is_ok)
+
if not is_ok:
return
+
await bili_applet.finish(result)
diff --git a/ATRI/plugins/applet/data_source.py b/ATRI/plugins/applet/data_source.py
index fb7dc79..3fc1bc5 100644
--- a/ATRI/plugins/applet/data_source.py
+++ b/ATRI/plugins/applet/data_source.py
@@ -3,7 +3,6 @@ import re
from ATRI.service import Service
from ATRI.utils import request
from ATRI.rule import is_in_service
-from ATRI.exceptions import RequestError
URL = "https://api.kyomotoi.moe/api/bilibili/v3/video_info?aid="
@@ -16,9 +15,7 @@ s = [11, 10, 3, 8, 4, 6]
xor = 177451812
add = 8728348608
-__doc__ = """啥b腾讯小程序给👴爪巴
-目前只整了b站的
-"""
+__doc__ = "啥b腾讯小程序给👴爪巴\n目前只整了b站的"
class Applet(Service):
@@ -30,8 +27,7 @@ class Applet(Service):
r = 0
for i in range(6):
r += tr[x[s[i]]] * 58 ** i
- result = "av" + str((r - add) ^ xor)
- return result
+ return str((r - add) ^ xor)
@staticmethod
def _bv_enc(x) -> str:
@@ -41,43 +37,37 @@ class Applet(Service):
r[s[i]] = table[x // 58 ** i % 58]
return "".join(r)
+ @staticmethod
+ async def bili_request(url: str) -> str:
+ req = await request.get(url)
+ return req.headers.get("location")
+
+ @staticmethod
+ def bili_video_code_catcher(text: str) -> str:
+ pattern = re.compile(r"BV[0-9A-Za-z]{10}")
+ result = pattern.findall(text)
+ return result[0] if result else ""
+
@classmethod
- async def fk_bili(cls, text: str) -> tuple:
- msg = text.replace("\\", "")
- bv = False
- if "https://b23" in msg:
- pattern = r"https://b23\.tv/[a-zA-Z0-9]+"
- burl = re.findall(pattern, msg)
- u = burl[0]
-
- try:
- res = await request.get(u)
- except:
- return "Request failed!", False
-
- bv_pattern = r"video/BV[a-zA-Z0-9]+"
- try:
- tu = str(res.url)
- t_bv = re.findall(bv_pattern, tu)
- bv = t_bv[0].replace("video/", "")
- except:
- return "Deal bv code failed!", False
- av = cls._bv_dec(bv).replace("av", "")
+ async def msg_builder(cls, text: str) -> tuple:
+ bv = cls.bili_video_code_catcher(text)
+ if not bv:
+ pattern = r"https://b23.tv/[a-z0-9A-z]{6,7}"
+ burl = re.findall(pattern, text)
+ u = burl[0] if burl else str()
+ if not u:
+ return None, False
+
+ rep = await cls.bili_request(u)
+ bv = cls.bili_video_code_catcher(rep)
+ av = cls._bv_dec(bv)
else:
- pattern = r"[bB][vV][a-zA-Z0-9]+"
- try:
- bv = re.findall(pattern, msg)[0]
- except:
- return "Deal bv code failed!", False
- av = cls._bv_dec(bv).replace("av", "")
+ av = cls._bv_dec(bv)
url = URL + av
- try:
- res = await request.get(url)
- except RequestError:
- return "Request failed!", False
- res_data = res.json()
+ req = await request.get(url)
+ res_data = req.json()
data = res_data["data"]
result = (
diff --git a/ATRI/utils/request.py b/ATRI/utils/request.py
index da0cdd4..0b43576 100644
--- a/ATRI/utils/request.py
+++ b/ATRI/utils/request.py
@@ -1,5 +1,6 @@
import httpx
from ATRI.config import BotSelfConfig
+from ATRI.log import logger as log
if not BotSelfConfig.proxy:
@@ -9,10 +10,12 @@ else:
async def get(url: str, **kwargs):
+ log.debug(f"GET {url} by {proxy if proxy else 'No proxy'} | MORE: \n {kwargs}")
async with httpx.AsyncClient(proxies=proxy) as client: # type: ignore
return await client.get(url, **kwargs)
async def post(url: str, **kwargs):
+ log.debug(f"POST {url} by {proxy if proxy else 'No proxy'} | MORE: \n {kwargs}")
async with httpx.AsyncClient(proxies=proxy) as client: # type: ignore
return await client.post(url, **kwargs)