summaryrefslogtreecommitdiff
path: root/ATRI/plugins/setu
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/setu')
-rw-r--r--ATRI/plugins/setu/__init__.py3
-rw-r--r--ATRI/plugins/setu/modules/data_source.py115
-rw-r--r--ATRI/plugins/setu/modules/main_setu.py105
-rw-r--r--ATRI/plugins/setu/modules/scheduler.py12
-rw-r--r--ATRI/plugins/setu/modules/store.py90
5 files changed, 152 insertions, 173 deletions
diff --git a/ATRI/plugins/setu/__init__.py b/ATRI/plugins/setu/__init__.py
index d9f1ffe..b316020 100644
--- a/ATRI/plugins/setu/__init__.py
+++ b/ATRI/plugins/setu/__init__.py
@@ -4,5 +4,4 @@ from pathlib import Path
_sub_plugins = set()
-_sub_plugins |= nonebot.load_plugins(
- str((Path(__file__).parent / 'modules').resolve()))
+_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "modules").resolve()))
diff --git a/ATRI/plugins/setu/modules/data_source.py b/ATRI/plugins/setu/modules/data_source.py
index 72e2270..bda7363 100644
--- a/ATRI/plugins/setu/modules/data_source.py
+++ b/ATRI/plugins/setu/modules/data_source.py
@@ -15,8 +15,8 @@ from ATRI.utils.request import get_bytes
from ATRI.utils.img import compress_image
-TEMP_DIR: Path = Path('.') / 'ATRI' / 'data' / 'temp' / 'setu'
-SETU_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'setu'
+TEMP_DIR: Path = Path(".") / "ATRI" / "data" / "temp" / "setu"
+SETU_DIR = Path(".") / "ATRI" / "data" / "database" / "setu"
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(SETU_DIR, exist_ok=True)
NSFW_URL = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url="
@@ -30,83 +30,79 @@ class Hso:
try:
data = json.loads(await get_bytes(url))
except RequestError:
- raise RequestError('Request failed!')
- return round(data['score'], 4)
-
+ raise RequestError("Request failed!")
+ return round(data["score"], 4)
+
@staticmethod
async def _comp_setu(url: str) -> str:
- temp_id = ''.join(sample(string.ascii_letters + string.digits, 8))
- file = TEMP_DIR / f'{temp_id}.png'
-
+ temp_id = "".join(sample(string.ascii_letters + string.digits, 8))
+ file = TEMP_DIR / f"{temp_id}.png"
+
try:
async with ClientSession() as session:
async with session.get(url) as r:
data = await r.read()
except RequestError:
- raise RequestError('Request img failed!')
-
+ raise RequestError("Request img failed!")
+
try:
- with open(file, 'wb') as r:
+ with open(file, "wb") as r:
r.write(data)
except WriteError:
- raise WriteError('Writing img failed!')
-
+ raise WriteError("Writing img failed!")
+
return compress_image(os.path.abspath(file))
-
+
@classmethod
async def setu(cls, data: dict) -> str:
- pid = data['pid']
- title = data['title']
+ pid = data["pid"]
+ title = data["title"]
if SIZE_REDUCE:
img = MessageSegment.image(
- 'file:///' + await cls._comp_setu(data['url']), proxy=False)
+ "file:///" + await cls._comp_setu(data["url"]), proxy=False
+ )
else:
- img = MessageSegment.image(data['url'], proxy=False)
-
- msg = (
- f"Pid: {pid}\n"
- f"Title: {title}\n"
- f"{img}"
- )
+ img = MessageSegment.image(data["url"], proxy=False)
+
+ msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}"
return msg
-
+
@classmethod
async def acc_setu(cls, d: list) -> str:
data: dict = choice(d)
-
- for i in data['tags']:
- if i['name'] == "R-18":
+
+ for i in data["tags"]:
+ if i["name"] == "R-18":
return "太涩了不方便发w"
-
- pid = data['id']
- title = data['title']
+
+ pid = data["id"]
+ title = data["title"]
try:
- pic = data['meta_single_page']['original_image_url'] \
- .replace('pximg.net', 'pixiv.cat')
+ pic = data["meta_single_page"]["original_image_url"].replace(
+ "pximg.net", "pixiv.cat"
+ )
except Exception:
- pic = choice(data['meta_pages'])['original']['image_urls'] \
- .replace('pximg.net', 'pixiv.cat')
+ pic = choice(data["meta_pages"])["original"]["image_urls"].replace(
+ "pximg.net", "pixiv.cat"
+ )
if SIZE_REDUCE:
img = MessageSegment.image(
- 'file:///' + await cls._comp_setu(pic), proxy=False)
+ "file:///" + await cls._comp_setu(pic), proxy=False
+ )
else:
img = MessageSegment.image(pic, proxy=False)
-
- msg = (
- f"Pid: {pid}\n"
- f"Title: {title}\n"
- f"{img}"
- )
+
+ msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}"
return msg
class SetuData:
- SETU_DATA = SETU_DIR / 'setu.db'
-
+ SETU_DATA = SETU_DIR / "setu.db"
+
@classmethod
async def _check_database(cls) -> bool:
if not cls.SETU_DATA.exists():
- log.warning(f'未发现数据库\n-> {cls.SETU_DATA}\n将开始创建')
+ log.warning(f"未发现数据库\n-> {cls.SETU_DATA}\n将开始创建")
async with aiosqlite.connect(cls.SETU_DATA) as db:
cur = await db.cursor()
await cur.execute(
@@ -123,17 +119,22 @@ class SetuData:
"""
)
await db.commit()
- log.warning(f'...创建数据库\n-> {cls.SETU_DATA}\n完成!')
+ log.warning(f"...创建数据库\n-> {cls.SETU_DATA}\n完成!")
return True
return True
-
+
@classmethod
async def add_data(cls, d: dict) -> None:
data = (
- d['pid'], d['title'], d['tags'], d['user_id'],
- d['user_name'], d['user_account'], d['url']
+ d["pid"],
+ d["title"],
+ d["tags"],
+ d["user_id"],
+ d["user_name"],
+ d["user_account"],
+ d["url"],
)
-
+
check = await cls._check_database()
if check:
async with aiosqlite.connect(cls.SETU_DATA) as db:
@@ -146,15 +147,15 @@ class SetuData:
?, ?, ?, ?, ?, ?, ?
);
""",
- data
+ data,
)
await db.commit()
-
+
@classmethod
async def del_data(cls, pid: int) -> None:
if not isinstance(pid, int): # 防注入
- raise ValueError('Please provide int.')
-
+ raise ValueError("Please provide int.")
+
check = await cls._check_database()
if check:
async with aiosqlite.connect(cls.SETU_DATA) as db:
@@ -168,11 +169,13 @@ class SetuData:
async with aiosqlite.connect(cls.SETU_DATA) as db:
async with db.execute("SELECT * FROM setu") as cursor:
return len(await cursor.fetchall()) # type: ignore
-
+
@classmethod
async def get_setu(cls):
check = await cls._check_database()
if check:
async with aiosqlite.connect(cls.SETU_DATA) as db:
- async with db.execute("SELECT * FROM setu ORDER BY RANDOM() limit 1;") as cursor:
+ async with db.execute(
+ "SELECT * FROM setu ORDER BY RANDOM() limit 1;"
+ ) as cursor:
return await cursor.fetchall()
diff --git a/ATRI/plugins/setu/modules/main_setu.py b/ATRI/plugins/setu/modules/main_setu.py
index 649cd6a..5412e90 100644
--- a/ATRI/plugins/setu/modules/main_setu.py
+++ b/ATRI/plugins/setu/modules/main_setu.py
@@ -17,14 +17,18 @@ from .data_source import Hso, SIZE_REDUCE, SetuData
LOLICON_URL: str = "https://api.lolicon.app/setu/"
-PIXIV_URL:str = "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word="
+PIXIV_URL: str = (
+ "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word="
+)
R18_ENABLED: int = 0
USE_LOCAL_DATA: bool = False
MIX_LOCAL_DATA: bool = False
-setu = sv.on_regex(r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]",
- rule=is_in_service('setu'))
+setu = sv.on_regex(
+ r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]", rule=is_in_service("setu")
+)
+
@setu.handle()
async def _setu(bot: Bot, event: MessageEvent) -> None:
@@ -32,31 +36,23 @@ async def _setu(bot: Bot, event: MessageEvent) -> None:
check = is_too_exciting(user, 3, hours=1)
if not check:
return
-
+
await bot.send(event, "别急,在找了!")
- params = {
- "apikey": Setu.key,
- "r18": str(R18_ENABLED),
- "size1200": "true"
- }
+ params = {"apikey": Setu.key, "r18": str(R18_ENABLED), "size1200": "true"}
try:
- data = json.loads(await post_bytes(LOLICON_URL, params))['data'][0]
+ data = json.loads(await post_bytes(LOLICON_URL, params))["data"][0]
except RequestError:
- raise RequestError('Request failed!')
-
- check = await Hso.nsfw_check(data['url'])
+ raise RequestError("Request failed!")
+
+ check = await Hso.nsfw_check(data["url"])
score = "{:.2%}".format(check, 4)
-
+
if not MIX_LOCAL_DATA:
if USE_LOCAL_DATA:
data = (await SetuData.get_setu())[0] # type: ignore
- data = {
- "pid": data[0],
- "title": data[1],
- "url": data[6]
- }
+ data = {"pid": data[0], "title": data[1], "url": data[6]}
if random() <= 0.1:
- await bot.send(event, '我找到图了,但我发给主人了❤')
+ await bot.send(event, "我找到图了,但我发给主人了❤")
msg = await Hso.setu(data) + f"\n由用户({user})提供"
for sup in BotSelfConfig.superusers:
await bot.send_private_msg(user_id=sup, message=msg)
@@ -65,10 +61,7 @@ async def _setu(bot: Bot, event: MessageEvent) -> None:
else:
if check >= 0.9:
if random() <= 0.2:
- repo = (
- "我找到图了,但我发给主人了❤\n"
- f"涩值:{score}"
- )
+ repo = "我找到图了,但我发给主人了❤\n" f"涩值:{score}"
await bot.send(event, repo)
msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
for sup in BotSelfConfig.superusers:
@@ -77,7 +70,7 @@ async def _setu(bot: Bot, event: MessageEvent) -> None:
await setu.finish(Message(await Hso.setu(data)))
else:
if random() <= 0.1:
- await bot.send(event, '我找到图了,但我发给主人了❤')
+ await bot.send(event, "我找到图了,但我发给主人了❤")
msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
for sup in BotSelfConfig.superusers:
await bot.send_private_msg(user_id=sup, message=msg)
@@ -86,7 +79,7 @@ async def _setu(bot: Bot, event: MessageEvent) -> None:
else:
if random() <= 0.5:
if random() <= 0.1:
- await bot.send(event, '我找到图了,但我发给主人了❤')
+ await bot.send(event, "我找到图了,但我发给主人了❤")
msg = await Hso.setu(data) + f"\n由用户({user})提供"
for sup in BotSelfConfig.superusers:
await bot.send_private_msg(user_id=sup, message=msg)
@@ -94,13 +87,9 @@ async def _setu(bot: Bot, event: MessageEvent) -> None:
await setu.finish(Message(await Hso.setu(data)))
else:
data = (await SetuData.get_setu())[0] # type: ignore
- data = {
- "pid": data[0],
- "title": data[1],
- "url": data[6]
- }
+ data = {"pid": data[0], "title": data[1], "url": data[6]}
if random() <= 0.1:
- await bot.send(event, '我找到图了,但我发给主人了❤')
+ await bot.send(event, "我找到图了,但我发给主人了❤")
msg = await Hso.setu(data) + f"\n由用户({user})提供"
for sup in BotSelfConfig.superusers:
await bot.send_private_msg(user_id=sup, message=msg)
@@ -108,27 +97,28 @@ async def _setu(bot: Bot, event: MessageEvent) -> None:
await setu.finish(Message(await Hso.setu(data)))
-key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service('setu'))
+key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service("setu"))
+
@key_setu.handle()
async def _key_setu(bot: Bot, event: MessageEvent) -> None:
user = event.user_id
check = is_too_exciting(user, 10, hours=1)
if not check:
- await setu.finish('休息一下吧❤')
+ await setu.finish("休息一下吧❤")
await bot.send(event, "别急,在找了!")
msg = str(event.message).strip()
tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0]
URL = PIXIV_URL + tag
-
+
try:
- data = json.loads(await get_bytes(URL))['illusts']
+ data = json.loads(await get_bytes(URL))["illusts"]
except RequestError:
- raise RequestError('Request msg failed!')
-
+ raise RequestError("Request msg failed!")
+
if random() <= 0.1:
- await bot.send(event, '我找到图了,但我发给主人了❤')
+ await bot.send(event, "我找到图了,但我发给主人了❤")
msg = await Hso.acc_setu(data) + f"\n由用户({user})提供"
for sup in BotSelfConfig.superusers:
await bot.send_private_msg(user_id=sup, message=msg)
@@ -136,58 +126,51 @@ async def _key_setu(bot: Bot, event: MessageEvent) -> None:
await setu.finish(Message(await Hso.acc_setu(data)))
-setu_config = sv.on_command(cmd='涩图设置', permission=SUPERUSER)
+setu_config = sv.on_command(cmd="涩图设置", permission=SUPERUSER)
+
@setu_config.handle()
async def _setu_config(bot: Bot, event: MessageEvent) -> None:
global R18_ENABLED, SIZE_REDUCE, USE_LOCAL_DATA, MIX_LOCAL_DATA
- msg = str(event.message).split(' ')
+ msg = str(event.message).split(" ")
if msg[0] == "":
- repo = (
- "可用设置如下:\n"
- "启用/禁用r18\n"
- "启用/禁用压缩\n"
- "启用/禁用本地涩图\n"
- "启用/禁用混合本地涩图"
- )
+ repo = "可用设置如下:\n" "启用/禁用r18\n" "启用/禁用压缩\n" "启用/禁用本地涩图\n" "启用/禁用混合本地涩图"
await setu_config.finish(repo)
elif msg[0] == "启用r18":
R18_ENABLED = 1
- await setu_config.finish('已启用r18')
+ await setu_config.finish("已启用r18")
elif msg[0] == "禁用r18":
R18_ENABLED = 0
- await setu_config.finish('已禁用r18')
+ await setu_config.finish("已禁用r18")
elif msg[0] == "启用压缩":
SIZE_REDUCE = True
- await setu_config.finish('已启用图片压缩')
+ await setu_config.finish("已启用图片压缩")
elif msg[0] == "禁用压缩":
SIZE_REDUCE = False
- await setu_config.finish('已禁用图片压缩')
+ await setu_config.finish("已禁用图片压缩")
elif msg[0] == "启用本地涩图":
USE_LOCAL_DATA = True
- await setu_config.finish('已启用本地涩图')
+ await setu_config.finish("已启用本地涩图")
elif msg[0] == "禁用本地涩图":
USE_LOCAL_DATA = False
- await setu_config.finish('已禁用本地涩图')
+ await setu_config.finish("已禁用本地涩图")
elif msg[0] == "启用混合本地涩图":
MIX_LOCAL_DATA = True
- await setu_config.finish('启用混合本地涩图')
+ await setu_config.finish("启用混合本地涩图")
elif msg[0] == "禁用混合本地涩图":
MIX_LOCAL_DATA = False
- await setu_config.finish('禁用混合本地涩图')
+ await setu_config.finish("禁用混合本地涩图")
else:
- await setu_config.finish('阿!请检查拼写')
+ await setu_config.finish("阿!请检查拼写")
not_get_se = sv.on_command("不够涩")
+
@not_get_se.handle()
async def _not_se(bot: Bot, event: MessageEvent) -> None:
user = event.user_id
check = is_too_exciting(user, 1, 120)
if check:
- msg = choice([
- "那你来发",
- "那你来发❤"
- ])
+ msg = choice(["那你来发", "那你来发❤"])
await not_get_se.finish(msg)
diff --git a/ATRI/plugins/setu/modules/scheduler.py b/ATRI/plugins/setu/modules/scheduler.py
index 3030881..e66c398 100644
--- a/ATRI/plugins/setu/modules/scheduler.py
+++ b/ATRI/plugins/setu/modules/scheduler.py
@@ -5,15 +5,11 @@ from ATRI.utils.apscheduler import scheduler
from .data_source import TEMP_DIR
- 'interval',
- days=7,
- misfire_grace_time=10
-)
[email protected]_job("interval", days=7, misfire_grace_time=10)
async def clear_temp():
- log.info('正在清除涩图缓存')
+ log.info("正在清除涩图缓存")
try:
shutil.rmtree(TEMP_DIR)
- log.info('清除缓存成功!')
+ log.info("清除缓存成功!")
except Exception:
- log.warn('清除图片缓存失败!')
+ log.warn("清除图片缓存失败!")
diff --git a/ATRI/plugins/setu/modules/store.py b/ATRI/plugins/setu/modules/store.py
index 6f383f4..e278c77 100644
--- a/ATRI/plugins/setu/modules/store.py
+++ b/ATRI/plugins/setu/modules/store.py
@@ -26,58 +26,59 @@ __doc__ = """
"""
-add_setu = sv.on_command(
- cmd="添加涩图",
- docs=__doc__,
- permission=SUPERUSER
-)
+add_setu = sv.on_command(cmd="添加涩图", docs=__doc__, permission=SUPERUSER)
+
@add_setu.args_parser # type: ignore
async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await add_setu.finish('好吧...')
+ await add_setu.finish("好吧...")
if not msg:
- await add_setu.reject('涩图(pid)速发!')
+ await add_setu.reject("涩图(pid)速发!")
else:
- state['setu_add'] = msg
+ state["setu_add"] = msg
+
@add_setu.handle()
async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['setu_add'] = msg
+ state["setu_add"] = msg
+
-@add_setu.got('setu_add', prompt='涩图(pid)速发!')
+@add_setu.got("setu_add", prompt="涩图(pid)速发!")
async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- pid = state['setu_add']
-
+ pid = state["setu_add"]
+
URL = API_URL + pid
try:
- data = json.loads(await get_bytes(URL))['illust']
+ data = json.loads(await get_bytes(URL))["illust"]
except RequestError:
- raise RequestError('Request failed!')
-
+ raise RequestError("Request failed!")
+
try:
- pic = data['meta_single_page']['original_image_url'] \
- .replace('pximg.net', 'pixiv.cat')
+ pic = data["meta_single_page"]["original_image_url"].replace(
+ "pximg.net", "pixiv.cat"
+ )
except Exception:
- pic = choice(data['meta_pages'])['image_urls']['original'] \
- .replace('pximg.net', 'pixiv.cat')
-
+ pic = choice(data["meta_pages"])["image_urls"]["original"].replace(
+ "pximg.net", "pixiv.cat"
+ )
+
d = {
"pid": pid,
- "title": data['title'],
- "tags": str(data['tags']),
- "user_id": data['user']['id'],
- "user_name": data['user']['name'],
- "user_account": data['user']['account'],
- "url": pic
+ "title": data["title"],
+ "tags": str(data["tags"]),
+ "user_id": data["user"]["id"],
+ "user_name": data["user"]["name"],
+ "user_account": data["user"]["account"],
+ "url": pic,
}
await SetuData.add_data(d)
-
- show_img = data['image_urls']['medium'].replace('pximg.net', 'pixiv.cat')
+
+ show_img = data["image_urls"]["medium"].replace("pximg.net", "pixiv.cat")
msg = (
"好欸!是新涩图:\n"
f"Pid: {pid}\n"
@@ -97,40 +98,37 @@ __doc__ = """
"""
-del_setu = sv.on_command(
- cmd="删除涩图",
- docs=__doc__,
- permission=SUPERUSER
-)
+del_setu = sv.on_command(cmd="删除涩图", docs=__doc__, permission=SUPERUSER)
+
@del_setu.args_parser # type: ignore
async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
- cancel = ['算了', '罢了']
+ cancel = ["算了", "罢了"]
if msg in cancel:
- await add_setu.finish('好吧...')
+ await add_setu.finish("好吧...")
if not msg:
- await add_setu.reject('涩图(pid)速发!')
+ await add_setu.reject("涩图(pid)速发!")
else:
- state['setu_del'] = msg
+ state["setu_del"] = msg
+
@del_setu.handle()
async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
msg = str(event.message).strip()
if msg:
- state['setu_del'] = msg
+ state["setu_del"] = msg
-@del_setu.got('setu_del', prompt='涩图(pid)速发!')
+
+@del_setu.got("setu_del", prompt="涩图(pid)速发!")
async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- pid = int(state['setu_del'])
+ pid = int(state["setu_del"])
await SetuData.del_data(pid)
- await del_setu.finish(f'涩图({pid})已删除...')
+ await del_setu.finish(f"涩图({pid})已删除...")
+
+count_setu = sv.on_command(cmd="涩图总量", permission=SUPERUSER)
-count_setu = sv.on_command(
- cmd="涩图总量",
- permission=SUPERUSER
-)
@count_setu.handle()
async def _count_setu(bot: Bot, event: MessageEvent) -> None: