summaryrefslogtreecommitdiff
path: root/ATRI/plugins/saucenao
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/saucenao')
-rw-r--r--ATRI/plugins/saucenao/__init__.py83
-rw-r--r--ATRI/plugins/saucenao/data_source.py70
2 files changed, 88 insertions, 65 deletions
diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py
index 6361ade..9e52d66 100644
--- a/ATRI/plugins/saucenao/__init__.py
+++ b/ATRI/plugins/saucenao/__init__.py
@@ -1,80 +1,51 @@
-import re
-import json
+from re import findall
from random import choice
+from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, MessageEvent
from nonebot.adapters.cqhttp.message import Message, MessageSegment
-from nonebot.typing import T_State
from ATRI.config import SauceNAO
-from ATRI.service import Service as sv
-from ATRI.rule import is_in_service
-from ATRI.exceptions import RequestError
-
-from .data_source import SauceNao
+from ATRI.utils.limit import FreqLimiter
+from .data_source import SaouceNao
-__doc__ = """
-以图搜图
-权限组:所有人
-用法:
- 以图搜图 (pic)
-"""
+_search_flmt = FreqLimiter(5)
+_search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
-saucenao = sv.on_command(cmd="以图搜图", docs=__doc__, rule=is_in_service("以图搜图"))
+saucenao = SaouceNao().on_command("以图搜图", "透过一张图搜索可能的来源")
@saucenao.args_parser # type: ignore
-async def _load_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message)
+async def _get_img(bot: Bot, event: MessageEvent, state: T_State):
+ msg = str(event.message).strip()
quit_list = ["算了", "罢了", "不搜了"]
if msg in quit_list:
await saucenao.finish("好吧...")
-
if not msg:
await saucenao.reject("图呢?")
else:
- state["pic_sau"] = msg
-
+ state["img"] = msg
@saucenao.handle()
-async def _sauce_nao(bot: Bot, event: MessageEvent, state: T_State) -> None:
+async def _ready_search(bot: Bot, event: MessageEvent, state: T_State):
+ user_id = event.get_user_id()
+ if not _search_flmt.check(user_id):
+ await saucenao.finish(_search_flmt_notice)
+
msg = str(event.message).strip()
if msg:
- state["pic_sau"] = msg
+ state["img"] = msg
-
[email protected]("pic_sau", prompt="图呢?")
-async def _deal_saucenao(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = state["pic_sau"]
- img = re.findall(r"url=(.*?)]", msg)
[email protected]("img", "图呢?")
+async def _deal_search(bot: Bot, event: MessageEvent, state: T_State):
+ user_id = event.get_user_id()
+ msg = state["img"]
+ img = findall(r"url=(.*?)]", msg)
if not img:
- await saucenao.finish("请发送图片而不是其他东西!!")
-
- try:
- task = SauceNao(api_key=SauceNAO.key)
- data = json.loads(await task.search(img[0]))
- except RequestError:
- raise RequestError("Request failed!")
-
- res = data["results"]
- result = list()
- for i in range(0, 3):
- data = res[i]
-
- _result = dict()
- _result["similarity"] = data["header"]["similarity"]
- _result["index_name"] = data["header"]["index_name"]
- _result["url"] = choice(data["data"].get("ext_urls", ["None"]))
- result.append(_result)
-
- msg0 = f"> {MessageSegment.at(event.user_id)}"
- for i in result:
- msg0 = msg0 + (
- "\n——————————\n"
- f"Similarity: {i['similarity']}\n"
- f"Name: {i['index_name']}\n"
- f"URL: {i['url'].replace('https://', '')}"
- )
-
- await saucenao.finish(Message(msg0))
+ await saucenao.reject("请发送图片而不是其他东西!!")
+
+ a = SaouceNao(SauceNAO.key)
+ result = f"> {MessageSegment.at(user_id)}" + await a.search(img[0])
+ _search_flmt.start_cd(user_id)
+ await saucenao.finish(Message(result))
diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py
index efe8fe1..d948f91 100644
--- a/ATRI/plugins/saucenao/data_source.py
+++ b/ATRI/plugins/saucenao/data_source.py
@@ -1,13 +1,25 @@
-from ATRI.utils.request import post_bytes
+from random import choice
+from aiohttp import FormData
+
+from ATRI.service import Service
+from ATRI.rule import is_in_service
+from ATRI.exceptions import RequestError
+from ATRI.utils import request, UbuntuPaste
URL = "https://saucenao.com/search.php"
-class SauceNao:
- def __init__(
- self, api_key: str, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5
- ) -> None:
+__doc__ = """
+以图搜图,仅限二刺螈
+"""
+
+
+class SaouceNao(Service):
+
+ def __init__(self, api_key: str = None, output_type=2, testmode=1, dbmaski=32768, db=5, numres=5):
+ Service.__init__(self, "以图搜图", __doc__, rule=is_in_service("以图搜图"))
+
params = dict()
params["api_key"] = api_key
params["output_type"] = output_type
@@ -16,8 +28,48 @@ class SauceNao:
params["db"] = db
params["numres"] = numres
self.params = params
-
- async def search(self, url: str):
+
+ async def _request(self, url: str):
self.params["url"] = url
- res = await post_bytes(url=URL, params=self.params)
- return res
+
+ try:
+ res = await request.post(URL, params=self.params)
+ except RequestError:
+ raise RequestError("Request failed!")
+ data = await res.json()
+ return data
+
+ async def search(self, url: str) -> str:
+ data = await self._request(url)
+ res = data["results"]
+
+ result = list()
+ for i in range(len(res)):
+ data = res[i]
+
+ _result = dict()
+ _result["similarity"] = data["header"]["similarity"]
+ _result["index_name"] = data["header"]["index_name"]
+ _result["url"] = choice(data["data"].get("ext_urls", ["None"]))
+ result.append(_result)
+
+ msg0 = str()
+ for i in result:
+ msg0 += (
+ "\n——————————\n"
+ f"Similarity: {i['similarity']}\n"
+ f"Name: {i['index_name']}\n"
+ f"URL: {i['url'].replace('https://', '')}"
+ )
+
+ if len(res) <= 3:
+ return msg0
+ else:
+ data = FormData()
+ data.add_field("poster", "ATRI running log")
+ data.add_field("syntax", "text")
+ data.add_field("expiration", "day")
+ data.add_field("content", msg0)
+
+ repo = f"\n详细请移步此处~\n{await UbuntuPaste(data).paste()}"
+ return repo