1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import re
import json
from aiohttp import FormData
from nonebot.adapters.cqhttp import Bot, MessageEvent
from nonebot.adapters.cqhttp.message import Message
from nonebot.typing import T_State
from ATRI.service import Service as sv
from ATRI.rule import is_in_service
from ATRI.exceptions import RequestTimeOut
from ATRI.utils.request import get_bytes
from ATRI.utils.translate import to_simple_string
from ATRI.utils.ub_paste import paste
URL = "https://trace.moe/api/search?url="
__doc__ = """
以图搜番
权限组:所有人
用法:
以图搜番 (pic)
"""
anime_search = sv.on_command(
cmd="以图搜番",
docs=__doc__,
rule=is_in_service('以图搜番')
)
@anime_search.args_parser # type: ignore
async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
quit_list = ['算了', '罢了', '不搜了', '取消']
if msg in quit_list:
await anime_search.finish('好吧...')
if not msg:
await anime_search.reject('图呢?')
else:
state['pic_anime'] = msg
@anime_search.handle()
async def _anime_search(bot: Bot,
event: MessageEvent,
state: T_State) -> None:
msg = str(event.message).strip()
if msg:
state['pic_anime'] = msg
@anime_search.got('pic_anime', prompt='图呢?')
async def _deal_search(bot: Bot,
event: MessageEvent,
state: T_State) -> None:
msg = state['pic_anime']
img = re.findall(r"url=(.*?)]", msg)
if not img:
await anime_search.reject("请发送图片而不是其它东西!!")
try:
req = await get_bytes(URL + img[0])
except RequestTimeOut:
raise RequestTimeOut("Request failed!")
data = json.loads(req)["docs"]
try:
d = dict()
for i in range(len(data)):
if data[i]["title_chinese"] in d.keys():
d[data[i]["title_chinese"]][0] += data[i]["similarity"]
else:
m = data[i]["at"] / 60
s = data[i]["at"] % 60
if not data[i]["episode"]:
n = 1
else:
n = data[i]["episode"]
d[to_simple_string(data[i]["title_chinese"])] = [
data[i]["similarity"],
f"第{n}集",
f"{int(m)}分{int(s)}秒处"
]
except Exception as err:
raise Exception(f"Invalid data.\n{err}")
result = sorted(
d.items(),
key=lambda x:x[1],
reverse=True
)
t = 0
msg0 = f"> {event.sender.nickname}"
for i in result:
t += 1
s = "%.2f%%" % (i[1][0] * 100)
msg0 = msg0 + (
"\n——————————\n"
f"({t}) Similarity: {s}\n"
f"Name: {i[0]}\n"
f"Time: {i[1][1]} {i[1][2]}"
)
if len(result) == 2:
await anime_search.finish(Message(msg0))
else:
data = FormData()
data.add_field('poster', 'ATRI running log')
data.add_field('syntax', 'text')
data.add_field('expiration', 'day')
data.add_field('content', msg0)
repo = f"> {event.sender.nickname}\n"
repo = repo + f"详细请移步此处~\n{await paste(data)}"
await anime_search.finish(repo)
|