diff options
Diffstat (limited to 'ATRI/plugins')
| -rw-r--r-- | ATRI/plugins/saucenao/__init__.py | 28 | ||||
| -rw-r--r-- | ATRI/plugins/saucenao/data_source.py | 96 | ||||
| -rw-r--r-- | ATRI/plugins/saucenao/models.py | 86 | 
3 files changed, 146 insertions, 64 deletions
| diff --git a/ATRI/plugins/saucenao/__init__.py b/ATRI/plugins/saucenao/__init__.py index add9c26..410a126 100644 --- a/ATRI/plugins/saucenao/__init__.py +++ b/ATRI/plugins/saucenao/__init__.py @@ -1,40 +1,38 @@  from random import choice  from nonebot.adapters.onebot.v11 import MessageEvent, Message, MessageSegment -from nonebot.adapters.onebot.v11.helpers import extract_image_urls, Cooldown +from nonebot.adapters.onebot.v11.helpers import Cooldown, extract_image_urls  from ATRI import conf  from ATRI.log import log  from ATRI.service import Service, ServiceTools -from .data_source import SauceNao + +from .data_source import SauceNAO  plugin = Service("以图搜图").document("以图搜图,仅限二刺螈") -sau = SauceNao()  _search_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"]) -saucenao = plugin.on_command("以图搜图", "透过一张图搜索可能的来源") +search = plugin.on_command("以图搜图", "透过一张图搜索可能的来源") -@saucenao.got("saucenao_img", "图呢?", [Cooldown(5, prompt=_search_flmt_notice)]) -async def _deal_search(event: MessageEvent): -    # cache fix +@search.got("img", "图呢?", [Cooldown(5, prompt=_search_flmt_notice)]) +async def _do_search(event: MessageEvent):      if not conf.SauceNAO.key:          ServiceTools("以图搜图").service_controller(False) -        log.warning("插件 SauceNao 所需的 key 未配置,将被全局禁用,后续填写请手动启用") +        log.warning("插件 以图搜图 所需的 key (SauceNAO) 未配置,将被全局禁用,后续填写请手动启用")      user_id = event.get_user_id() -    img = extract_image_urls(event.message) +    img = extract_image_urls(event.get_message())      if not img: -        await saucenao.reject("请发送图片而不是其他东西!!") +        await search.reject("请发送图片而不是其他东西!!")      try: -        a = SauceNao(conf.SauceNAO.key) -    except Exception: -        await saucenao.finish("失败了...") +        result = await SauceNAO(conf.SauceNAO.key).search(img[0]) +    except Exception as err: +        await search.finish(f"搜索失败:{str(err)}") -    result = f"> {MessageSegment.at(user_id)}" + await a.search(img[0])  # type: ignore -    await saucenao.finish(Message(result)) +    await search.finish(Message(f"> {MessageSegment.at(user_id)}\n" + result)) diff --git a/ATRI/plugins/saucenao/data_source.py b/ATRI/plugins/saucenao/data_source.py index 004d9de..e270db1 100644 --- a/ATRI/plugins/saucenao/data_source.py +++ b/ATRI/plugins/saucenao/data_source.py @@ -1,69 +1,67 @@ -from random import choice +import json +from typing import List -from ATRI.exceptions import RequestError  from ATRI.utils import request +from ATRI.message import MessageBuilder +from .models import SauceNAORequest, SauceNAOResponse, SauceNAOResult -URL = "https://saucenao.com/search.php" +SAUCENAO_URL: str = "https://saucenao.com/search.php" -class SauceNao: +class SauceNAO:      def __init__(          self, -        api_key: str = str(), -        output_type=2, -        testmode=1, -        dbmaski=32768, -        db=5, -        numres=5, -    ): -        params = dict() -        params["api_key"] = api_key -        params["output_type"] = output_type -        params["testmode"] = testmode -        params["dbmaski"] = dbmaski -        params["db"] = db -        params["numres"] = numres -        self.params = params +        api_key: str, +        output_type: int = 2, +        testmode: int = 1, +        dbmaski: int = 32768, +        db: int = 5, +        numres: int = 5, +    ) -> None: +        self.params = SauceNAORequest( +            api_key=api_key, +            output_type=output_type, +            testmode=testmode, +            dbmaski=dbmaski, +            db=db, +            numres=numres, +        ) -    async def _request(self, url: str): -        self.params["url"] = url -        try: -            res = await request.get(URL, params=self.params) -        except Exception: -            raise RequestError("Request failed!") -        data = res.json() -        return data +    async def _request(self, url: str) -> SauceNAOResponse: +        self.params.url = url +        resp = await request.get(SAUCENAO_URL, params=self.params.dict()) +        return SauceNAOResponse.parse_obj(resp.json())      async def search(self, url: str) -> str: -        data = await self._request(url)          try: -            res = data.get("results", "result") -        except Exception: -            return "没有相似的结果呢..." +            data = await self._request(url) +        except Exception as err: +            raise Exception(f"处理 SauceNAO 数据失败:{str(err)}") -        r = list() +        r: List[SauceNAOResult] = list()          for i in range(3): -            data = res[i] - -            sim = data["header"]["similarity"] +            _data = data.results[i] +            sim = _data.header.similarity              if float(sim) >= 70: -                _result = dict() -                _result["similarity"] = sim -                _result["index_name"] = data["header"]["index_name"] -                _result["url"] = choice(data["data"].get("ext_urls", ["None"])) -                r.append(_result) +                r.append( +                    SauceNAOResult( +                        similarity=sim, +                        index_name=_data.header.index_name, +                        url=_data.data.ext_urls[0] if _data.data.ext_urls else "None", +                    ) +                )          if not r: -            return "没有相似的结果呢..." +            return "SauceNAO 中没有相似的结果" -        msg0 = str() +        result = str()          for i in r: -            msg0 += ( -                "\n——————————\n" -                f"Similarity: {i['similarity']}\n" -                f"Name: {i['index_name']}\n" -                f"URL: {i['url'].replace('https://', '')}" +            result += ( +                MessageBuilder("\n——————————") +                .text(f"相似度:{i.similarity}") +                .text(f"名称:{i.index_name}") +                .text(f"URL: {i.url.replace('https://', str()).replace('http://', str())}") +                .done()              ) - -        return msg0 +        return result diff --git a/ATRI/plugins/saucenao/models.py b/ATRI/plugins/saucenao/models.py new file mode 100644 index 0000000..bc345cb --- /dev/null +++ b/ATRI/plugins/saucenao/models.py @@ -0,0 +1,86 @@ +from typing import List, Optional, Union, Dict + +from pydantic import BaseModel + + +class SauceNAORequest(BaseModel): +    api_key: str +    url: str = str() +    output_type: int +    testmode: int +    dbmaski: int +    db: int +    numres: int + + +class SauceNAOResponseIndexFields(BaseModel): +    status: int +    parent_id: int +    id: int +    results: Optional[int] = None + + +class SauceNAOResponseHeader(BaseModel): +    user_id: str +    account_type: str +    short_limit: str +    long_limit: str +    long_remaining: int +    short_remaining: int +    status: int +    results_requested: int +    index: Dict[str, SauceNAOResponseIndexFields] +    search_depth: str +    minimum_similarity: float +    query_image_display: str +    query_image: str +    results_returned: int + + +class SauceNAOResponseResultsHeader(BaseModel): +    similarity: str +    thumbnail: str +    index_id: int +    index_name: str +    dupes: int +    hidden: int + + +class SauceNAOResponseResultsData(BaseModel): +    ext_urls: Optional[List[str]] = None +    title: Optional[str] = None +    pixiv_id: Optional[int] = None +    member_name: Optional[str] = None +    member_id: Optional[int] = None +    published: Optional[str] = None +    service: Optional[str] = None +    service_name: Optional[str] = None +    id: Optional[str] = None +    user_id: Optional[str] = None +    user_name: Optional[str] = None +    yandere_id: Optional[int] = None +    konachan_id: Optional[int] = None +    creator: Optional[Union[str, List[str]]] = None +    material: Optional[str] = None +    characters: Optional[str] = None +    source: Optional[str] = None +    danbooru_id: Optional[int] = None +    gelbooru_id: Optional[int] = None +    eng_name: Optional[str] = None +    jp_name: Optional[str] = None + + +class SauceNAOResponseResults(BaseModel): +    header: SauceNAOResponseResultsHeader +    data: SauceNAOResponseResultsData + + +class SauceNAOResponse(BaseModel): +    header: SauceNAOResponseHeader +    results: List[SauceNAOResponseResults] + + +class SauceNAOResult(BaseModel): +    similarity: str +    index_name: str +    url: str | 
