From 7cfcecc533d8ce7a09e14fdab719a4cd3ff8eae3 Mon Sep 17 00:00:00 2001 From: Kyomotoi <0w0@imki.moe> Date: Sun, 4 Jun 2023 17:57:36 +0800 Subject: =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=E9=87=8D=E6=9E=84=E6=8F=92?= =?UTF-8?q?=E4=BB=B6:=20=E4=BB=A5=E5=9B=BE=E6=90=9C=E5=9B=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ATRI/plugins/saucenao/__init__.py | 28 +++++------ ATRI/plugins/saucenao/data_source.py | 96 ++++++++++++++++++------------------ ATRI/plugins/saucenao/models.py | 86 ++++++++++++++++++++++++++++++++ 3 files changed, 146 insertions(+), 64 deletions(-) create mode 100644 ATRI/plugins/saucenao/models.py diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index add9c26..410a126 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -1,40 +1,38 @@ from random import choice from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment -from nonebot.adapters.onebot.v11.helpers import extract_image_urls, Cooldown +from nonebot.adapters.onebot.v11.helpers import Cooldown, extract_image_urls from ATRI import conf from ATRI.log import log from ATRI.service import Service, ServiceTools -from .data_source import SauceNao + +from .data_source import SauceNAO plugin = Service("以图搜图").document("以图搜图,仅限二刺螈") -sau = SauceNao() _search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) -saucenao = plugin.on_command("以图搜图", "透过一张图搜索可能的来源") +search = plugin.on_command("以图搜图", "透过一张图搜索可能的来源") -@saucenao.got("saucenao_img", "图呢?", [Cooldown(5, prompt=_search_flmt_notice)]) -async def _deal_search(event: MessageEvent): - # cache fix +@search.got("img", "图呢?", [Cooldown(5, prompt=_search_flmt_notice)]) +async def _do_search(event: MessageEvent): if not conf.SauceNAO.key: ServiceTools("以图搜图").service_controller(False) - log.warning("插件 SauceNao 所需的 key 未配置,将被全局禁用,后续填写请手动启用") + log.warning("插件 以图搜图 所需的 key (SauceNAO) 未配置,将被全局禁用,后续填写请手动启用") user_id = event.get_user_id() - img = extract_image_urls(event.message) + img = extract_image_urls(event.get_message()) if not img: - await saucenao.reject("请发送图片而不是其他东西!!") + await search.reject("请发送图片而不是其他东西!!") try: - a = SauceNao(conf.SauceNAO.key) - except Exception: - await saucenao.finish("失败了...") + result = await SauceNAO(conf.SauceNAO.key).search(img[0]) + except Exception as err: + await search.finish(f"搜索失败:{str(err)}") - result = f"> {MessageSegment.at(user_id)}" + await a.search(img[0]) # type: ignore - await saucenao.finish(Message(result)) + await search.finish(Message(f"> {MessageSegment.at(user_id)}\n" + result)) diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py index 004d9de..e270db1 100644 --- a/ATRI/plugins/saucenao/data_source.py +++ b/ATRI/plugins/saucenao/data_source.py @@ -1,69 +1,67 @@ -from random import choice +import json +from typing import List -from ATRI.exceptions import RequestError from ATRI.utils import request +from ATRI.message import MessageBuilder +from .models import SauceNAORequest, SauceNAOResponse, SauceNAOResult -URL = "https://saucenao.com/search.php" +SAUCENAO_URL: str = "https://saucenao.com/search.php" -class SauceNao: +class SauceNAO: def __init__( self, - api_key: str = str(), - output_type=2, - testmode=1, - dbmaski=32768, - db=5, - numres=5, - ): - params = dict() - params["api_key"] = api_key - params["output_type"] = output_type - params["testmode"] = testmode - params["dbmaski"] = dbmaski - params["db"] = db - params["numres"] = numres - self.params = params + api_key: str, + output_type: int = 2, + testmode: int = 1, + dbmaski: int = 32768, + db: int = 5, + numres: int = 5, + ) -> None: + self.params = SauceNAORequest( + api_key=api_key, + output_type=output_type, + testmode=testmode, + dbmaski=dbmaski, + db=db, + numres=numres, + ) - async def _request(self, url: str): - self.params["url"] = url - try: - res = await request.get(URL, params=self.params) - except Exception: - raise RequestError("Request failed!") - data = res.json() - return data + async def _request(self, url: str) -> SauceNAOResponse: + self.params.url = url + resp = await request.get(SAUCENAO_URL, params=self.params.dict()) + return SauceNAOResponse.parse_obj(resp.json()) async def search(self, url: str) -> str: - data = await self._request(url) try: - res = data.get("results", "result") - except Exception: - return "没有相似的结果呢..." + data = await self._request(url) + except Exception as err: + raise Exception(f"处理 SauceNAO 数据失败:{str(err)}") - r = list() + r: List[SauceNAOResult] = list() for i in range(3): - data = res[i] - - sim = data["header"]["similarity"] + _data = data.results[i] + sim = _data.header.similarity if float(sim) >= 70: - _result = dict() - _result["similarity"] = sim - _result["index_name"] = data["header"]["index_name"] - _result["url"] = choice(data["data"].get("ext_urls", ["None"])) - r.append(_result) + r.append( + SauceNAOResult( + similarity=sim, + index_name=_data.header.index_name, + url=_data.data.ext_urls[0] if _data.data.ext_urls else "None", + ) + ) if not r: - return "没有相似的结果呢..." + return "SauceNAO 中没有相似的结果" - msg0 = str() + result = str() for i in r: - msg0 += ( - "\n——————————\n" - f"Similarity: {i['similarity']}\n" - f"Name: {i['index_name']}\n" - f"URL: {i['url'].replace('https://', '')}" + result += ( + MessageBuilder("\n——————————") + .text(f"相似度:{i.similarity}") + .text(f"名称:{i.index_name}") + .text(f"URL: {i.url.replace('https://', str()).replace('http://', str())}") + .done() ) - - return msg0 + return result diff --git a/ATRI/plugins/saucenao/models.py b/ATRI/plugins/saucenao/models.py new file mode 100644 index 0000000..bc345cb --- /dev/null +++ b/ATRI/plugins/saucenao/models.py @@ -0,0 +1,86 @@ +from typing import List, Optional, Union, Dict + +from pydantic import BaseModel + + +class SauceNAORequest(BaseModel): + api_key: str + url: str = str() + output_type: int + testmode: int + dbmaski: int + db: int + numres: int + + +class SauceNAOResponseIndexFields(BaseModel): + status: int + parent_id: int + id: int + results: Optional[int] = None + + +class SauceNAOResponseHeader(BaseModel): + user_id: str + account_type: str + short_limit: str + long_limit: str + long_remaining: int + short_remaining: int + status: int + results_requested: int + index: Dict[str, SauceNAOResponseIndexFields] + search_depth: str + minimum_similarity: float + query_image_display: str + query_image: str + results_returned: int + + +class SauceNAOResponseResultsHeader(BaseModel): + similarity: str + thumbnail: str + index_id: int + index_name: str + dupes: int + hidden: int + + +class SauceNAOResponseResultsData(BaseModel): + ext_urls: Optional[List[str]] = None + title: Optional[str] = None + pixiv_id: Optional[int] = None + member_name: Optional[str] = None + member_id: Optional[int] = None + published: Optional[str] = None + service: Optional[str] = None + service_name: Optional[str] = None + id: Optional[str] = None + user_id: Optional[str] = None + user_name: Optional[str] = None + yandere_id: Optional[int] = None + konachan_id: Optional[int] = None + creator: Optional[Union[str, List[str]]] = None + material: Optional[str] = None + characters: Optional[str] = None + source: Optional[str] = None + danbooru_id: Optional[int] = None + gelbooru_id: Optional[int] = None + eng_name: Optional[str] = None + jp_name: Optional[str] = None + + +class SauceNAOResponseResults(BaseModel): + header: SauceNAOResponseResultsHeader + data: SauceNAOResponseResultsData + + +class SauceNAOResponse(BaseModel): + header: SauceNAOResponseHeader + results: List[SauceNAOResponseResults] + + +class SauceNAOResult(BaseModel): + similarity: str + index_name: str + url: str -- cgit v1.2.3