1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import re
from aiohttp import FormData
from random import choice
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, MessageEvent
from nonebot.adapters.cqhttp.message import Message, MessageSegment
from ATRI.service import Service
from ATRI.rule import is_in_service
from ATRI.utils import request, UbuntuPaste, Translate
from ATRI.utils.limit import FreqLimiter
from ATRI.exceptions import RequestError
URL = "https://trace.moe/api/search?url="
_anime_flmt = FreqLimiter(10)
_anime_flmt_notice = choice(["慢...慢一..点❤", "冷静1下", "歇会歇会~~"])
__doc__ = """
通过一张图片搜索你需要的番!据说里*也可以
"""
class Anime(Service):
def __init__(self):
Service.__init__(self, "以图搜番", __doc__, rule=is_in_service("以图搜番"))
@staticmethod
async def _request(url: str) -> dict:
aim = URL + url
try:
res = await request.get(aim)
except RequestError:
raise RequestError("Request failed!")
result = await res.json()
return result
@classmethod
async def search(cls, url: str) -> str:
data = await cls._request(url)
data = data["docs"]
d = dict()
for i in range(len(data)):
if data[i]["title_chinese"] in d.keys():
d[data[i]["title_chinese"]][0] += data[i]["similarity"]
else:
m = data[i]["at"] / 60
s = data[i]["at"] % 60
if not data[i]["episode"]:
n = 1
else:
n = data[i]["episode"]
d[Translate(data[i]["title_chinese"]).to_simple()] = [
data[i]["similarity"],
f"第{n}集",
f"{int(m)}分{int(s)}秒处",
]
result = sorted(d.items(), key=lambda x: x[1], reverse=True)
t = 0
msg0 = str()
for i in result:
t += 1
s = "%.2f%%" % (i[1][0] * 100)
msg0 = msg0 + (
"\n——————————\n"
f"({t}) Similarity: {s}\n"
f"Name: {i[0]}\n"
f"Time: {i[1][1]} {i[1][2]}"
)
if len(result) == 2:
return msg0
else:
data = FormData()
data.add_field("poster", "ATRI running log")
data.add_field("syntax", "text")
data.add_field("expiration", "day")
data.add_field("content", msg0)
repo = f"详细请移步此处~\n{await UbuntuPaste(data).paste()}"
return repo
anime_search = Anime().on_command("以图搜番", "发送一张图以搜索可能的番剧")
@anime_search.args_parser # type: ignore
async def _get_anime(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.message).strip()
quit_list = ["算了", "罢了", "不搜了", "取消"]
if msg in quit_list:
await anime_search.finish("好吧...")
if not msg:
await anime_search.reject("图呢?")
else:
state["anime"] = msg
@anime_search.handle()
async def _ready_sear(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
if not _anime_flmt.check(user_id):
await anime_search.finish(_anime_flmt_notice)
msg = str(event.message).strip()
if msg:
state["anime"] = msg
@anime_search.got("anime", "图呢?")
async def _deal_sear(bot: Bot, event: MessageEvent, state: T_State):
user_id = event.get_user_id()
msg = state["anime"]
img = re.findall(r"url=(.*?)]", msg)
if not img:
await anime_search.reject("请发送图片而不是其它东西!!")
a = await Anime().search(img[0])
result = f"> {MessageSegment.at(user_id)}\n" + a
_anime_flmt.start_cd(user_id)
await anime_search.finish(Message(result))
|