summaryrefslogtreecommitdiff
path: root/ATRI/plugins/SauceNAO.py
blob: cb32345f63e1bd3cb632f926d54848a6a2a998a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import requests
import json
import re
from datetime import datetime
from random import choice
from nonebot import on_command, CommandSession
from aiohttp import ClientSession

import config # type: ignore
from ATRI.modules.funcControl import checkSwitch, checkNoob # type: ignore

API_KEY = config.SAUCENAO_KEY()
__plugin_name__ = "saucenao_search"

async def get_bytes(url, headers = None):
    async with ClientSession() as asyncSession:
        async with asyncSession.get(url, headers = headers) as response:
            a = await response.read()
    return a

def now_time():
    now_ = datetime.now()
    hour = now_.hour
    minute = now_.minute
    now = hour + minute / 60
    return now

class SauceNAO:

    def __init__(self, api_key, output_type=2, testmode=0, dbmask=None, dbmaski=32768, db=5, numres=1):
        api = 'https://saucenao.com/search.php'
        self.api = api
        params = dict()
        params['api_key'] = api_key
        params['output_type'] = output_type
        params['testmode'] = testmode
        params['dbmask'] = dbmask
        params['dbmaski'] = dbmaski
        params['db'] = db
        params['numres'] = numres
        self.params = params

    def search(self, url):
        self.params['url'] = url
        res = requests.get(url=self.api,params=self.params)
        return res.content

@on_command('SauceNAO', aliases = ['以图识图'], only_to_me = False)
async def SaucenaoSearch(session: CommandSession):
    user = session.event.user_id
    group = session.event.group_id
    msg = session.current_arg.strip()

    if checkNoob(user, group):
        if 0 <= now_time() < 5.5:
            await session.send(
                choice(
                    [
                        'zzzz......',
                        'zzzzzzzz......',
                        'zzz...好涩哦..zzz....',
                        '别...不要..zzz..那..zzz..',
                        '嘻嘻..zzz..呐~..zzzz..'
                    ]
                )
            )
        else:
            if checkSwitch(__plugin_name__):
                if not msg:
                    msg = session.get('message', prompt="请发送一张图片")

                await session.send("开始以图识图\n(如搜索时间过长或无反应则为图片格式有问题)")

                p = '\\[CQ\\:image\\,file\\=.*?\\,url\\=(.*?)\\]'

                img = re.findall(p, msg)

                task = SauceNAO(api_key=API_KEY)
                data = task.search(url=img)
                msg0 = ''
                try:
                    data = json.loads(data)['results'][0]
                    url = data['data']['ext_urls'][0]
                    title = data['data']['title']
                    pixiv_id = data['data']['pixiv_id']
                    member_name = data['data']['member_name']
                    member_id = data['data']['member_id']
                    similarity = data['header']['similarity']
                    mini_url = data['header']['thumbnail']
                    msg0 = f'SauceNAO结果:'
                    msg0 += f'[CQ:image,file={mini_url}]\n'
                    msg0 += f'相似度:{similarity}%\n'
                    msg0 += f'标题:{title}\n'
                    msg0 += f'插画ID:{pixiv_id}\n'
                    msg0 += f'画师:{member_name}\n'
                    msg0 += f'画师ID:{member_id}\n'
                    msg0 += f'直链:https://pixiv.cat/{pixiv_id}.jpg'
                except:
                    print('网络请求失败...')
                if msg0:
                    if float(similarity) > 70:
                        await session.send(msg0)
                    else:
                        await session.send("找不到相似的图呢...")
                else:
                        await session.send("搜索似乎失败了呢...")
            else:
                session.finish('该功能已关闭...')


@SaucenaoSearch.args_parser
async def _(session: CommandSession):
    if not session.is_first_run and session.current_arg.startswith('算了'):
        session.switch(session.current_arg[len('算了'):])