path: root/ATRI/plugins/setu/modules
diff options
authorKyomotoi <[email protected]>2021-05-04 14:27:56 +0800
committerKyomotoi <[email protected]>2021-05-04 14:27:56 +0800
commitea7f48011c34fdaec7e91af7eb373c8174e439e6 (patch)
treee4f14eaacde37774d49e7f98ac1ff4635049d80a /ATRI/plugins/setu/modules
parent07a7e41f72cfa9dfd207a04445f4aa5b0b6fa3ce (diff)
✨🐛 更新
新增:老婆! 新增:涩图 修复:manage中出现的bug 优化:nsfw不再用float,换为int
Diffstat (limited to 'ATRI/plugins/setu/modules')
4 files changed, 528 insertions, 0 deletions
diff --git a/ATRI/plugins/setu/modules/ b/ATRI/plugins/setu/modules/
new file mode 100644
index 0000000..72e2270
--- /dev/null
+++ b/ATRI/plugins/setu/modules/
@@ -0,0 +1,178 @@
+import os
+import json
+import string
+import aiosqlite
+from aiosqlite.core import Connection
+from pathlib import Path
+from random import sample, choice
+from aiohttp import ClientSession
+from nonebot.adapters.cqhttp.message import MessageSegment, Message
+from ATRI.log import logger as log
+from ATRI.config import NsfwCheck
+from ATRI.exceptions import RequestError, WriteError
+from ATRI.utils.request import get_bytes
+from ATRI.utils.img import compress_image
+TEMP_DIR: Path = Path('.') / 'ATRI' / 'data' / 'temp' / 'setu'
+SETU_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'setu'
+os.makedirs(TEMP_DIR, exist_ok=True)
+os.makedirs(SETU_DIR, exist_ok=True)
+NSFW_URL = f"http://{}:{NsfwCheck.port}/?url="
+SIZE_REDUCE: bool = True
+class Hso:
+ @staticmethod
+ async def nsfw_check(url: str) -> float:
+ url = NSFW_URL + url
+ try:
+ data = json.loads(await get_bytes(url))
+ except RequestError:
+ raise RequestError('Request failed!')
+ return round(data['score'], 4)
+ @staticmethod
+ async def _comp_setu(url: str) -> str:
+ temp_id = ''.join(sample(string.ascii_letters + string.digits, 8))
+ file = TEMP_DIR / f'{temp_id}.png'
+ try:
+ async with ClientSession() as session:
+ async with session.get(url) as r:
+ data = await
+ except RequestError:
+ raise RequestError('Request img failed!')
+ try:
+ with open(file, 'wb') as r:
+ r.write(data)
+ except WriteError:
+ raise WriteError('Writing img failed!')
+ return compress_image(os.path.abspath(file))
+ @classmethod
+ async def setu(cls, data: dict) -> str:
+ pid = data['pid']
+ title = data['title']
+ img = MessageSegment.image(
+ 'file:///' + await cls._comp_setu(data['url']), proxy=False)
+ else:
+ img = MessageSegment.image(data['url'], proxy=False)
+ msg = (
+ f"Pid: {pid}\n"
+ f"Title: {title}\n"
+ f"{img}"
+ )
+ return msg
+ @classmethod
+ async def acc_setu(cls, d: list) -> str:
+ data: dict = choice(d)
+ for i in data['tags']:
+ if i['name'] == "R-18":
+ return "太涩了不方便发w"
+ pid = data['id']
+ title = data['title']
+ try:
+ pic = data['meta_single_page']['original_image_url'] \
+ .replace('', '')
+ except Exception:
+ pic = choice(data['meta_pages'])['original']['image_urls'] \
+ .replace('', '')
+ img = MessageSegment.image(
+ 'file:///' + await cls._comp_setu(pic), proxy=False)
+ else:
+ img = MessageSegment.image(pic, proxy=False)
+ msg = (
+ f"Pid: {pid}\n"
+ f"Title: {title}\n"
+ f"{img}"
+ )
+ return msg
+class SetuData:
+ SETU_DATA = SETU_DIR / 'setu.db'
+ @classmethod
+ async def _check_database(cls) -> bool:
+ if not cls.SETU_DATA.exists():
+ log.warning(f'未发现数据库\n-> {cls.SETU_DATA}\n将开始创建')
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ cur = await db.cursor()
+ await cur.execute(
+ """
+ pid PID, title TITLE, tags TAGS,
+ user_id USER_ID, user_name USER_NAME,
+ user_account USER_ACCOUNT, url URL,
+ pid, title, tags, user_id,
+ user_name, user_account, url
+ )
+ );
+ """
+ )
+ await db.commit()
+ log.warning(f'...创建数据库\n-> {cls.SETU_DATA}\n完成!')
+ return True
+ return True
+ @classmethod
+ async def add_data(cls, d: dict) -> None:
+ data = (
+ d['pid'], d['title'], d['tags'], d['user_id'],
+ d['user_name'], d['user_account'], d['url']
+ )
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ await db.execute(
+ """
+ pid, title, tags, user_id,
+ user_name, user_account, url
+ ?, ?, ?, ?, ?, ?, ?
+ );
+ """,
+ data
+ )
+ await db.commit()
+ @classmethod
+ async def del_data(cls, pid: int) -> None:
+ if not isinstance(pid, int): # 防注入
+ raise ValueError('Please provide int.')
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ await db.execute(f"DELETE FROM setu WHERE pid = {str(pid)};")
+ await db.commit()
+ @classmethod
+ async def count(cls):
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ async with db.execute("SELECT * FROM setu") as cursor:
+ return len(await cursor.fetchall()) # type: ignore
+ @classmethod
+ async def get_setu(cls):
+ check = await cls._check_database()
+ if check:
+ async with aiosqlite.connect(cls.SETU_DATA) as db:
+ async with db.execute("SELECT * FROM setu ORDER BY RANDOM() limit 1;") as cursor:
+ return await cursor.fetchall()
diff --git a/ATRI/plugins/setu/modules/ b/ATRI/plugins/setu/modules/
new file mode 100644
index 0000000..649cd6a
--- /dev/null
+++ b/ATRI/plugins/setu/modules/
@@ -0,0 +1,193 @@
+import re
+import json
+from random import choice, random
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.cqhttp.message import Message
+from ATRI.service import Service as sv
+from ATRI.rule import is_in_service
+from ATRI.utils.request import get_bytes, post_bytes
+from ATRI.utils.limit import is_too_exciting
+from ATRI.config import Setu, BotSelfConfig
+from ATRI.exceptions import RequestError
+from .data_source import Hso, SIZE_REDUCE, SetuData
+LOLICON_URL: str = ""
+PIXIV_URL:str = ""
+R18_ENABLED: int = 0
+USE_LOCAL_DATA: bool = False
+MIX_LOCAL_DATA: bool = False
+setu = sv.on_regex(r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]",
+ rule=is_in_service('setu'))
+async def _setu(bot: Bot, event: MessageEvent) -> None:
+ user = event.user_id
+ check = is_too_exciting(user, 3, hours=1)
+ if not check:
+ return
+ await bot.send(event, "别急,在找了!")
+ params = {
+ "apikey": Setu.key,
+ "r18": str(R18_ENABLED),
+ "size1200": "true"
+ }
+ try:
+ data = json.loads(await post_bytes(LOLICON_URL, params))['data'][0]
+ except RequestError:
+ raise RequestError('Request failed!')
+ check = await Hso.nsfw_check(data['url'])
+ score = "{:.2%}".format(check, 4)
+ if not MIX_LOCAL_DATA:
+ data = (await SetuData.get_setu())[0] # type: ignore
+ data = {
+ "pid": data[0],
+ "title": data[1],
+ "url": data[6]
+ }
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ if check >= 0.9:
+ if random() <= 0.2:
+ repo = (
+ "我找到图了,但我发给主人了❤\n"
+ f"涩值:{score}"
+ )
+ await bot.send(event, repo)
+ msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ if random() <= 0.5:
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+ else:
+ data = (await SetuData.get_setu())[0] # type: ignore
+ data = {
+ "pid": data[0],
+ "title": data[1],
+ "url": data[6]
+ }
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.setu(data)))
+key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service('setu'))
+async def _key_setu(bot: Bot, event: MessageEvent) -> None:
+ user = event.user_id
+ check = is_too_exciting(user, 10, hours=1)
+ if not check:
+ await setu.finish('休息一下吧❤')
+ await bot.send(event, "别急,在找了!")
+ msg = str(event.message).strip()
+ tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0]
+ URL = PIXIV_URL + tag
+ try:
+ data = json.loads(await get_bytes(URL))['illusts']
+ except RequestError:
+ raise RequestError('Request msg failed!')
+ if random() <= 0.1:
+ await bot.send(event, '我找到图了,但我发给主人了❤')
+ msg = await Hso.acc_setu(data) + f"\n由用户({user})提供"
+ for sup in BotSelfConfig.superusers:
+ await bot.send_private_msg(user_id=sup, message=msg)
+ else:
+ await setu.finish(Message(await Hso.acc_setu(data)))
+setu_config = sv.on_command(cmd='涩图设置', permission=SUPERUSER)
+async def _setu_config(bot: Bot, event: MessageEvent) -> None:
+ msg = str(event.message).split(' ')
+ if msg[0] == "":
+ repo = (
+ "可用设置如下:\n"
+ "启用/禁用r18\n"
+ "启用/禁用压缩\n"
+ "启用/禁用本地涩图\n"
+ "启用/禁用混合本地涩图"
+ )
+ await setu_config.finish(repo)
+ elif msg[0] == "启用r18":
+ R18_ENABLED = 1
+ await setu_config.finish('已启用r18')
+ elif msg[0] == "禁用r18":
+ R18_ENABLED = 0
+ await setu_config.finish('已禁用r18')
+ elif msg[0] == "启用压缩":
+ await setu_config.finish('已启用图片压缩')
+ elif msg[0] == "禁用压缩":
+ await setu_config.finish('已禁用图片压缩')
+ elif msg[0] == "启用本地涩图":
+ await setu_config.finish('已启用本地涩图')
+ elif msg[0] == "禁用本地涩图":
+ await setu_config.finish('已禁用本地涩图')
+ elif msg[0] == "启用混合本地涩图":
+ await setu_config.finish('启用混合本地涩图')
+ elif msg[0] == "禁用混合本地涩图":
+ await setu_config.finish('禁用混合本地涩图')
+ else:
+ await setu_config.finish('阿!请检查拼写')
+not_get_se = sv.on_command("不够涩")
+async def _not_se(bot: Bot, event: MessageEvent) -> None:
+ user = event.user_id
+ check = is_too_exciting(user, 1, 120)
+ if check:
+ msg = choice([
+ "那你来发",
+ "那你来发❤"
+ ])
+ await not_get_se.finish(msg)
diff --git a/ATRI/plugins/setu/modules/ b/ATRI/plugins/setu/modules/
new file mode 100644
index 0000000..3030881
--- /dev/null
+++ b/ATRI/plugins/setu/modules/
@@ -0,0 +1,19 @@
+import shutil
+from ATRI.log import logger as log
+from ATRI.utils.apscheduler import scheduler
+from .data_source import TEMP_DIR
+ 'interval',
+ days=7,
+ misfire_grace_time=10
+async def clear_temp():
+ try:
+ shutil.rmtree(TEMP_DIR)
+ except Exception:
+ log.warn('清除图片缓存失败!')
diff --git a/ATRI/plugins/setu/modules/ b/ATRI/plugins/setu/modules/
new file mode 100644
index 0000000..6f383f4
--- /dev/null
+++ b/ATRI/plugins/setu/modules/
@@ -0,0 +1,138 @@
+import json
+from random import choice
+from nonebot.typing import T_State
+from nonebot.permission import SUPERUSER
+from nonebot.adapters.cqhttp import Bot, MessageEvent
+from nonebot.adapters.cqhttp.message import Message, MessageSegment
+from ATRI.service import Service as sv
+from ATRI.utils.request import get_bytes
+from ATRI.exceptions import RequestError
+from .data_source import SetuData
+API_URL: str = ""
+__doc__ = """
+ 添加涩图 (pid)
+ pid: Pixiv 作品id
+add_setu = sv.on_command(
+ cmd="添加涩图",
+ docs=__doc__,
+ permission=SUPERUSER
+@add_setu.args_parser # type: ignore
+async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ cancel = ['算了', '罢了']
+ if msg in cancel:
+ await add_setu.finish('好吧...')
+ if not msg:
+ await add_setu.reject('涩图(pid)速发!')
+ else:
+ state['setu_add'] = msg
+async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ if msg:
+ state['setu_add'] = msg
+'setu_add', prompt='涩图(pid)速发!')
+async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ pid = state['setu_add']
+ URL = API_URL + pid
+ try:
+ data = json.loads(await get_bytes(URL))['illust']
+ except RequestError:
+ raise RequestError('Request failed!')
+ try:
+ pic = data['meta_single_page']['original_image_url'] \
+ .replace('', '')
+ except Exception:
+ pic = choice(data['meta_pages'])['image_urls']['original'] \
+ .replace('', '')
+ d = {
+ "pid": pid,
+ "title": data['title'],
+ "tags": str(data['tags']),
+ "user_id": data['user']['id'],
+ "user_name": data['user']['name'],
+ "user_account": data['user']['account'],
+ "url": pic
+ }
+ await SetuData.add_data(d)
+ show_img = data['image_urls']['medium'].replace('', '')
+ msg = (
+ "好欸!是新涩图:\n"
+ f"Pid: {pid}\n"
+ f"Title: {data['title']}\n"
+ f"{MessageSegment.image(show_img)}"
+ )
+ await add_setu.finish(Message(msg))
+__doc__ = """
+ 删除涩图 (pid)
+ pid: Pixiv 作品id
+del_setu = sv.on_command(
+ cmd="删除涩图",
+ docs=__doc__,
+ permission=SUPERUSER
+@del_setu.args_parser # type: ignore
+async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ cancel = ['算了', '罢了']
+ if msg in cancel:
+ await add_setu.finish('好吧...')
+ if not msg:
+ await add_setu.reject('涩图(pid)速发!')
+ else:
+ state['setu_del'] = msg
+async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ msg = str(event.message).strip()
+ if msg:
+ state['setu_del'] = msg
+'setu_del', prompt='涩图(pid)速发!')
+async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
+ pid = int(state['setu_del'])
+ await SetuData.del_data(pid)
+ await del_setu.finish(f'涩图({pid})已删除...')
+count_setu = sv.on_command(
+ cmd="涩图总量",
+ permission=SUPERUSER
+async def _count_setu(bot: Bot, event: MessageEvent) -> None:
+ msg = f"咱本地搭载了 {await SetuData.count()} 张涩图!"
+ await count_setu.finish(msg)