1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
import re
import json
from aiohttp import FormData
from nonebot.adapters.cqhttp import Bot, MessageEvent
from nonebot.adapters.cqhttp.message import Message
from nonebot.typing import T_State
from ATRI.service import Service as sv
from ATRI.rule import is_in_service
from ATRI.exceptions import RequestError
from ATRI.utils.request import get_bytes
from ATRI.utils.translate import to_simple_string
from ATRI.utils.ub_paste import paste
URL = "https://trace.moe/api/search?url="
__doc__ = """
以图搜番
权限组:所有人
用法:
以图搜番 (pic)
"""
anime_search = sv.on_command(cmd="以图搜番", docs=__doc__, rule=is_in_service("以图搜番"))
@anime_search.args_parser # type: ignore
async def _load_anime(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message)
quit_list = ["算了", "罢了", "不搜了", "取消"]
if msg in quit_list:
await anime_search.finish("好吧...")
if not msg:
await anime_search.reject("图呢?")
else:
state["pic_anime"] = msg
@anime_search.handle()
async def _anime_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
state["pic_anime"] = msg
@anime_search.got("pic_anime", prompt="图呢?")
async def _deal_search(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = state["pic_anime"]
img = re.findall(r"url=(.*?)]", msg)
if not img:
await anime_search.reject("请发送图片而不是其它东西!!")
try:
req = await get_bytes(URL + img[0])
except RequestError:
raise RequestError("Request failed!")
data = json.loads(req)["docs"]
try:
d = dict()
for i in range(len(data)):
if data[i]["title_chinese"] in d.keys():
d[data[i]["title_chinese"]][0] += data[i]["similarity"]
else:
m = data[i]["at"] / 60
s = data[i]["at"] % 60
if not data[i]["episode"]:
n = 1
else:
n = data[i]["episode"]
d[to_simple_string(data[i]["title_chinese"])] = [
data[i]["similarity"],
f"第{n}集",
f"{int(m)}分{int(s)}秒处",
]
except Exception as err:
raise Exception(f"Invalid data.\n{err}")
result = sorted(d.items(), key=lambda x: x[1], reverse=True)
t = 0
msg0 = f"> {event.sender.nickname}"
for i in result:
t += 1
s = "%.2f%%" % (i[1][0] * 100)
msg0 = msg0 + (
"\n——————————\n"
f"({t}) Similarity: {s}\n"
f"Name: {i[0]}\n"
f"Time: {i[1][1]} {i[1][2]}"
)
if len(result) == 2:
await anime_search.finish(Message(msg0))
else:
data = FormData()
data.add_field("poster", "ATRI running log")
data.add_field("syntax", "text")
data.add_field("expiration", "day")
data.add_field("content", msg0)
repo = f"> {event.sender.nickname}\n"
repo = repo + f"详细请移步此处~\n{await paste(data)}"
await anime_search.finish(repo)
|