diff options
Diffstat (limited to 'ATRI/plugins/setu/modules')
-rw-r--r-- | ATRI/plugins/setu/modules/data_source.py | 181 | ||||
-rw-r--r-- | ATRI/plugins/setu/modules/main_setu.py | 186 | ||||
-rw-r--r-- | ATRI/plugins/setu/modules/scheduler.py | 15 | ||||
-rw-r--r-- | ATRI/plugins/setu/modules/store.py | 136 |
4 files changed, 0 insertions, 518 deletions
diff --git a/ATRI/plugins/setu/modules/data_source.py b/ATRI/plugins/setu/modules/data_source.py deleted file mode 100644 index bda7363..0000000 --- a/ATRI/plugins/setu/modules/data_source.py +++ /dev/null @@ -1,181 +0,0 @@ -import os -import json -import string -import aiosqlite -from aiosqlite.core import Connection -from pathlib import Path -from random import sample, choice -from aiohttp import ClientSession -from nonebot.adapters.cqhttp.message import MessageSegment, Message - -from ATRI.log import logger as log -from ATRI.config import NsfwCheck -from ATRI.exceptions import RequestError, WriteError -from ATRI.utils.request import get_bytes -from ATRI.utils.img import compress_image - - -TEMP_DIR: Path = Path(".") / "ATRI" / "data" / "temp" / "setu" -SETU_DIR = Path(".") / "ATRI" / "data" / "database" / "setu" -os.makedirs(TEMP_DIR, exist_ok=True) -os.makedirs(SETU_DIR, exist_ok=True) -NSFW_URL = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url=" -SIZE_REDUCE: bool = True - - -class Hso: - @staticmethod - async def nsfw_check(url: str) -> float: - url = NSFW_URL + url - try: - data = json.loads(await get_bytes(url)) - except RequestError: - raise RequestError("Request failed!") - return round(data["score"], 4) - - @staticmethod - async def _comp_setu(url: str) -> str: - temp_id = "".join(sample(string.ascii_letters + string.digits, 8)) - file = TEMP_DIR / f"{temp_id}.png" - - try: - async with ClientSession() as session: - async with session.get(url) as r: - data = await r.read() - except RequestError: - raise RequestError("Request img failed!") - - try: - with open(file, "wb") as r: - r.write(data) - except WriteError: - raise WriteError("Writing img failed!") - - return compress_image(os.path.abspath(file)) - - @classmethod - async def setu(cls, data: dict) -> str: - pid = data["pid"] - title = data["title"] - if SIZE_REDUCE: - img = MessageSegment.image( - "file:///" + await cls._comp_setu(data["url"]), proxy=False - ) - else: - img = MessageSegment.image(data["url"], proxy=False) - - msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}" - return msg - - @classmethod - async def acc_setu(cls, d: list) -> str: - data: dict = choice(d) - - for i in data["tags"]: - if i["name"] == "R-18": - return "太涩了不方便发w" - - pid = data["id"] - title = data["title"] - try: - pic = data["meta_single_page"]["original_image_url"].replace( - "pximg.net", "pixiv.cat" - ) - except Exception: - pic = choice(data["meta_pages"])["original"]["image_urls"].replace( - "pximg.net", "pixiv.cat" - ) - if SIZE_REDUCE: - img = MessageSegment.image( - "file:///" + await cls._comp_setu(pic), proxy=False - ) - else: - img = MessageSegment.image(pic, proxy=False) - - msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}" - return msg - - -class SetuData: - SETU_DATA = SETU_DIR / "setu.db" - - @classmethod - async def _check_database(cls) -> bool: - if not cls.SETU_DATA.exists(): - log.warning(f"未发现数据库\n-> {cls.SETU_DATA}\n将开始创建") - async with aiosqlite.connect(cls.SETU_DATA) as db: - cur = await db.cursor() - await cur.execute( - """ - CREATE TABLE setu( - pid PID, title TITLE, tags TAGS, - user_id USER_ID, user_name USER_NAME, - user_account USER_ACCOUNT, url URL, - UNIQUE( - pid, title, tags, user_id, - user_name, user_account, url - ) - ); - """ - ) - await db.commit() - log.warning(f"...创建数据库\n-> {cls.SETU_DATA}\n完成!") - return True - return True - - @classmethod - async def add_data(cls, d: dict) -> None: - data = ( - d["pid"], - d["title"], - d["tags"], - d["user_id"], - d["user_name"], - d["user_account"], - d["url"], - ) - - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - await db.execute( - """ - INSERT INTO setu( - pid, title, tags, user_id, - user_name, user_account, url - ) VALUES( - ?, ?, ?, ?, ?, ?, ? - ); - """, - data, - ) - await db.commit() - - @classmethod - async def del_data(cls, pid: int) -> None: - if not isinstance(pid, int): # 防注入 - raise ValueError("Please provide int.") - - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - await db.execute(f"DELETE FROM setu WHERE pid = {str(pid)};") - await db.commit() - - @classmethod - async def count(cls): - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - async with db.execute("SELECT * FROM setu") as cursor: - return len(await cursor.fetchall()) # type: ignore - - @classmethod - async def get_setu(cls): - check = await cls._check_database() - if check: - async with aiosqlite.connect(cls.SETU_DATA) as db: - async with db.execute( - "SELECT * FROM setu ORDER BY RANDOM() limit 1;" - ) as cursor: - return await cursor.fetchall() diff --git a/ATRI/plugins/setu/modules/main_setu.py b/ATRI/plugins/setu/modules/main_setu.py deleted file mode 100644 index a82ef83..0000000 --- a/ATRI/plugins/setu/modules/main_setu.py +++ /dev/null @@ -1,186 +0,0 @@ -import re -import json -from random import choice, random - -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message - -from ATRI.service import Service as sv -from ATRI.rule import is_in_service -from ATRI.utils.request import get_bytes, post_bytes -from ATRI.utils.limit import is_too_exciting -from ATRI.config import Setu, BotSelfConfig -from ATRI.exceptions import RequestError - -from .data_source import Hso, SIZE_REDUCE, SetuData - - -LOLICON_URL: str = "https://api.lolicon.app/setu/" -PIXIV_URL: str = ( - "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word=" -) -R18_ENABLED: int = 0 -USE_LOCAL_DATA: bool = False -MIX_LOCAL_DATA: bool = False - - -setu = sv.on_regex( - r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]", rule=is_in_service("setu") -) - - -async def _setu(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 3, hours=1) - if not check: - return - - await bot.send(event, "别急,在找了!") - params = {"apikey": Setu.key, "r18": str(R18_ENABLED), "size1200": "true"} - try: - data = json.loads(await post_bytes(LOLICON_URL, params))["data"][0] - except RequestError: - raise RequestError("Request failed!") - - check = await Hso.nsfw_check(data["url"]) - score = "{:.2%}".format(check, 4) - - if not MIX_LOCAL_DATA: - if USE_LOCAL_DATA: - data = choice(await SetuData.get_setu()) # type: ignore - data = {"pid": data[0], "title": data[1], "url": data[6]} - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if check >= 0.9: - if random() <= 0.2: - repo = "我找到图了,但我发给主人了❤\n" f"涩值:{score}" - await bot.send(event, repo) - msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - if random() <= 0.5: - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - else: - data = choice(await SetuData.get_setu()) # type: ignore - data = {"pid": data[0], "title": data[1], "url": data[6]} - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.setu(data))) - - -key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service("setu")) - - -@key_setu.handle() -async def _key_setu(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 10, hours=1) - if not check: - await setu.finish("休息一下吧❤") - - await bot.send(event, "别急,在找了!") - msg = str(event.message).strip() - tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0] - URL = PIXIV_URL + tag - - try: - data = json.loads(await get_bytes(URL))["illusts"] - except RequestError: - raise RequestError("Request msg failed!") - - if random() <= 0.1: - await bot.send(event, "我找到图了,但我发给主人了❤") - msg = await Hso.acc_setu(data) + f"\n由用户({user})提供" - for sup in BotSelfConfig.superusers: - await bot.send_private_msg(user_id=sup, message=msg) - else: - await setu.finish(Message(await Hso.acc_setu(data))) - - -__doc__ = """ -涩图设置 -权限组:维护者 -用法: - 涩图设置 启用/禁用r18 - 涩图设置 启用/禁用压缩 - 涩图设置 启用/禁用本地涩图 - 涩图设置 启用/禁用混合本地涩图 -""" - -setu_config = sv.on_command(cmd="涩图设置", docs=__doc__, permission=SUPERUSER) - - -@setu_config.handle() -async def _setu_config(bot: Bot, event: MessageEvent) -> None: - global R18_ENABLED, SIZE_REDUCE, USE_LOCAL_DATA, MIX_LOCAL_DATA - msg = str(event.message).split(" ") - if msg[0] == "": - repo = "可用设置如下:\n启用/禁用r18\n启用/禁用压缩\n启用/禁用本地涩图\n启用/禁用混合本地涩图" - await setu_config.finish(repo) - elif msg[0] == "启用r18": - R18_ENABLED = 1 - await setu_config.finish("已启用r18") - elif msg[0] == "禁用r18": - R18_ENABLED = 0 - await setu_config.finish("已禁用r18") - elif msg[0] == "启用压缩": - SIZE_REDUCE = True - await setu_config.finish("已启用图片压缩") - elif msg[0] == "禁用压缩": - SIZE_REDUCE = False - await setu_config.finish("已禁用图片压缩") - elif msg[0] == "启用本地涩图": - USE_LOCAL_DATA = True - await setu_config.finish("已启用本地涩图") - elif msg[0] == "禁用本地涩图": - USE_LOCAL_DATA = False - await setu_config.finish("已禁用本地涩图") - elif msg[0] == "启用混合本地涩图": - MIX_LOCAL_DATA = True - await setu_config.finish("已启用混合本地涩图") - elif msg[0] == "禁用混合本地涩图": - MIX_LOCAL_DATA = False - await setu_config.finish("已禁用混合本地涩图") - else: - await setu_config.finish("阿!请检查拼写") - - -not_get_se = sv.on_command("不够涩") - - -@not_get_se.handle() -async def _not_se(bot: Bot, event: MessageEvent) -> None: - user = event.user_id - check = is_too_exciting(user, 1, 120) - if check: - msg = choice(["那你来发", "那你来发❤"]) - await not_get_se.finish(msg) diff --git a/ATRI/plugins/setu/modules/scheduler.py b/ATRI/plugins/setu/modules/scheduler.py deleted file mode 100644 index e66c398..0000000 --- a/ATRI/plugins/setu/modules/scheduler.py +++ /dev/null @@ -1,15 +0,0 @@ -import shutil -from ATRI.log import logger as log -from ATRI.utils.apscheduler import scheduler - -from .data_source import TEMP_DIR - - [email protected]_job("interval", days=7, misfire_grace_time=10) -async def clear_temp(): - log.info("正在清除涩图缓存") - try: - shutil.rmtree(TEMP_DIR) - log.info("清除缓存成功!") - except Exception: - log.warn("清除图片缓存失败!") diff --git a/ATRI/plugins/setu/modules/store.py b/ATRI/plugins/setu/modules/store.py deleted file mode 100644 index e278c77..0000000 --- a/ATRI/plugins/setu/modules/store.py +++ /dev/null @@ -1,136 +0,0 @@ -import json -from random import choice - -from nonebot.typing import T_State -from nonebot.permission import SUPERUSER -from nonebot.adapters.cqhttp import Bot, MessageEvent -from nonebot.adapters.cqhttp.message import Message, MessageSegment - -from ATRI.service import Service as sv -from ATRI.utils.request import get_bytes -from ATRI.exceptions import RequestError - -from .data_source import SetuData - - -API_URL: str = "https://api.kyomotoi.moe/api/pixiv/illust?id=" - - -__doc__ = """ -为本地添加涩图! -权限组:维护者 -用法: - 添加涩图 (pid) -补充: - pid: Pixiv 作品id -""" - - -add_setu = sv.on_command(cmd="添加涩图", docs=__doc__, permission=SUPERUSER) - - -@add_setu.args_parser # type: ignore -async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await add_setu.finish("好吧...") - if not msg: - await add_setu.reject("涩图(pid)速发!") - else: - state["setu_add"] = msg - - -@add_setu.handle() -async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["setu_add"] = msg - - -@add_setu.got("setu_add", prompt="涩图(pid)速发!") -async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - pid = state["setu_add"] - - URL = API_URL + pid - try: - data = json.loads(await get_bytes(URL))["illust"] - except RequestError: - raise RequestError("Request failed!") - - try: - pic = data["meta_single_page"]["original_image_url"].replace( - "pximg.net", "pixiv.cat" - ) - except Exception: - pic = choice(data["meta_pages"])["image_urls"]["original"].replace( - "pximg.net", "pixiv.cat" - ) - - d = { - "pid": pid, - "title": data["title"], - "tags": str(data["tags"]), - "user_id": data["user"]["id"], - "user_name": data["user"]["name"], - "user_account": data["user"]["account"], - "url": pic, - } - await SetuData.add_data(d) - - show_img = data["image_urls"]["medium"].replace("pximg.net", "pixiv.cat") - msg = ( - "好欸!是新涩图:\n" - f"Pid: {pid}\n" - f"Title: {data['title']}\n" - f"{MessageSegment.image(show_img)}" - ) - await add_setu.finish(Message(msg)) - - -__doc__ = """ -删除涩图! -权限组:维护者 -用法: - 删除涩图 (pid) -补充: - pid: Pixiv 作品id -""" - - -del_setu = sv.on_command(cmd="删除涩图", docs=__doc__, permission=SUPERUSER) - - -@del_setu.args_parser # type: ignore -async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - cancel = ["算了", "罢了"] - if msg in cancel: - await add_setu.finish("好吧...") - if not msg: - await add_setu.reject("涩图(pid)速发!") - else: - state["setu_del"] = msg - - -@del_setu.handle() -async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - msg = str(event.message).strip() - if msg: - state["setu_del"] = msg - - -@del_setu.got("setu_del", prompt="涩图(pid)速发!") -async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None: - pid = int(state["setu_del"]) - await SetuData.del_data(pid) - await del_setu.finish(f"涩图({pid})已删除...") - - -count_setu = sv.on_command(cmd="涩图总量", permission=SUPERUSER) - - -@count_setu.handle() -async def _count_setu(bot: Bot, event: MessageEvent) -> None: - msg = f"咱本地搭载了 {await SetuData.count()} 张涩图!" - await count_setu.finish(msg) |