summaryrefslogtreecommitdiff
path: root/ATRI/plugins/setu/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI/plugins/setu/modules')
-rw-r--r--ATRI/plugins/setu/modules/data_source.py181
-rw-r--r--ATRI/plugins/setu/modules/main_setu.py186
-rw-r--r--ATRI/plugins/setu/modules/scheduler.py15
-rw-r--r--ATRI/plugins/setu/modules/store.py136
4 files changed, 0 insertions, 518 deletions
diff --git a/ATRI/plugins/setu/modules/data_source.py b/ATRI/plugins/setu/modules/data_source.py
deleted file mode 100644
index bda7363..0000000
--- a/ATRI/plugins/setu/modules/data_source.py
+++ /dev/null
@@ -1,181 +0,0 @@
-import os
-import json
-import string
-import aiosqlite
-from aiosqlite.core import Connection
-from pathlib import Path
-from random import sample, choice
-from aiohttp import ClientSession
-from nonebot.adapters.cqhttp.message import MessageSegment, Message
-
-from ATRI.log import logger as log
-from ATRI.config import NsfwCheck
-from ATRI.exceptions import RequestError, WriteError
-from ATRI.utils.request import get_bytes
-from ATRI.utils.img import compress_image
-
-
-TEMP_DIR: Path = Path(".") / "ATRI" / "data" / "temp" / "setu"
-SETU_DIR = Path(".") / "ATRI" / "data" / "database" / "setu"
-os.makedirs(TEMP_DIR, exist_ok=True)
-os.makedirs(SETU_DIR, exist_ok=True)
-NSFW_URL = f"http://{NsfwCheck.host}:{NsfwCheck.port}/?url="
-SIZE_REDUCE: bool = True
-
-
-class Hso:
- @staticmethod
- async def nsfw_check(url: str) -> float:
- url = NSFW_URL + url
- try:
- data = json.loads(await get_bytes(url))
- except RequestError:
- raise RequestError("Request failed!")
- return round(data["score"], 4)
-
- @staticmethod
- async def _comp_setu(url: str) -> str:
- temp_id = "".join(sample(string.ascii_letters + string.digits, 8))
- file = TEMP_DIR / f"{temp_id}.png"
-
- try:
- async with ClientSession() as session:
- async with session.get(url) as r:
- data = await r.read()
- except RequestError:
- raise RequestError("Request img failed!")
-
- try:
- with open(file, "wb") as r:
- r.write(data)
- except WriteError:
- raise WriteError("Writing img failed!")
-
- return compress_image(os.path.abspath(file))
-
- @classmethod
- async def setu(cls, data: dict) -> str:
- pid = data["pid"]
- title = data["title"]
- if SIZE_REDUCE:
- img = MessageSegment.image(
- "file:///" + await cls._comp_setu(data["url"]), proxy=False
- )
- else:
- img = MessageSegment.image(data["url"], proxy=False)
-
- msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}"
- return msg
-
- @classmethod
- async def acc_setu(cls, d: list) -> str:
- data: dict = choice(d)
-
- for i in data["tags"]:
- if i["name"] == "R-18":
- return "太涩了不方便发w"
-
- pid = data["id"]
- title = data["title"]
- try:
- pic = data["meta_single_page"]["original_image_url"].replace(
- "pximg.net", "pixiv.cat"
- )
- except Exception:
- pic = choice(data["meta_pages"])["original"]["image_urls"].replace(
- "pximg.net", "pixiv.cat"
- )
- if SIZE_REDUCE:
- img = MessageSegment.image(
- "file:///" + await cls._comp_setu(pic), proxy=False
- )
- else:
- img = MessageSegment.image(pic, proxy=False)
-
- msg = f"Pid: {pid}\n" f"Title: {title}\n" f"{img}"
- return msg
-
-
-class SetuData:
- SETU_DATA = SETU_DIR / "setu.db"
-
- @classmethod
- async def _check_database(cls) -> bool:
- if not cls.SETU_DATA.exists():
- log.warning(f"未发现数据库\n-> {cls.SETU_DATA}\n将开始创建")
- async with aiosqlite.connect(cls.SETU_DATA) as db:
- cur = await db.cursor()
- await cur.execute(
- """
- CREATE TABLE setu(
- pid PID, title TITLE, tags TAGS,
- user_id USER_ID, user_name USER_NAME,
- user_account USER_ACCOUNT, url URL,
- UNIQUE(
- pid, title, tags, user_id,
- user_name, user_account, url
- )
- );
- """
- )
- await db.commit()
- log.warning(f"...创建数据库\n-> {cls.SETU_DATA}\n完成!")
- return True
- return True
-
- @classmethod
- async def add_data(cls, d: dict) -> None:
- data = (
- d["pid"],
- d["title"],
- d["tags"],
- d["user_id"],
- d["user_name"],
- d["user_account"],
- d["url"],
- )
-
- check = await cls._check_database()
- if check:
- async with aiosqlite.connect(cls.SETU_DATA) as db:
- await db.execute(
- """
- INSERT INTO setu(
- pid, title, tags, user_id,
- user_name, user_account, url
- ) VALUES(
- ?, ?, ?, ?, ?, ?, ?
- );
- """,
- data,
- )
- await db.commit()
-
- @classmethod
- async def del_data(cls, pid: int) -> None:
- if not isinstance(pid, int): # 防注入
- raise ValueError("Please provide int.")
-
- check = await cls._check_database()
- if check:
- async with aiosqlite.connect(cls.SETU_DATA) as db:
- await db.execute(f"DELETE FROM setu WHERE pid = {str(pid)};")
- await db.commit()
-
- @classmethod
- async def count(cls):
- check = await cls._check_database()
- if check:
- async with aiosqlite.connect(cls.SETU_DATA) as db:
- async with db.execute("SELECT * FROM setu") as cursor:
- return len(await cursor.fetchall()) # type: ignore
-
- @classmethod
- async def get_setu(cls):
- check = await cls._check_database()
- if check:
- async with aiosqlite.connect(cls.SETU_DATA) as db:
- async with db.execute(
- "SELECT * FROM setu ORDER BY RANDOM() limit 1;"
- ) as cursor:
- return await cursor.fetchall()
diff --git a/ATRI/plugins/setu/modules/main_setu.py b/ATRI/plugins/setu/modules/main_setu.py
deleted file mode 100644
index a82ef83..0000000
--- a/ATRI/plugins/setu/modules/main_setu.py
+++ /dev/null
@@ -1,186 +0,0 @@
-import re
-import json
-from random import choice, random
-
-from nonebot.permission import SUPERUSER
-from nonebot.adapters.cqhttp import Bot, MessageEvent
-from nonebot.adapters.cqhttp.message import Message
-
-from ATRI.service import Service as sv
-from ATRI.rule import is_in_service
-from ATRI.utils.request import get_bytes, post_bytes
-from ATRI.utils.limit import is_too_exciting
-from ATRI.config import Setu, BotSelfConfig
-from ATRI.exceptions import RequestError
-
-from .data_source import Hso, SIZE_REDUCE, SetuData
-
-
-LOLICON_URL: str = "https://api.lolicon.app/setu/"
-PIXIV_URL: str = (
- "https://api.kyomotoi.moe/api/pixiv/search?mode=exact_match_for_tags&word="
-)
-R18_ENABLED: int = 0
-USE_LOCAL_DATA: bool = False
-MIX_LOCAL_DATA: bool = False
-
-
-setu = sv.on_regex(
- r"来[张点][色涩]图|[涩色]图来|想要[涩色]图|[涩色]图[Tt][Ii][Mm][Ee]", rule=is_in_service("setu")
-)
-
-
-async def _setu(bot: Bot, event: MessageEvent) -> None:
- user = event.user_id
- check = is_too_exciting(user, 3, hours=1)
- if not check:
- return
-
- await bot.send(event, "别急,在找了!")
- params = {"apikey": Setu.key, "r18": str(R18_ENABLED), "size1200": "true"}
- try:
- data = json.loads(await post_bytes(LOLICON_URL, params))["data"][0]
- except RequestError:
- raise RequestError("Request failed!")
-
- check = await Hso.nsfw_check(data["url"])
- score = "{:.2%}".format(check, 4)
-
- if not MIX_LOCAL_DATA:
- if USE_LOCAL_DATA:
- data = choice(await SetuData.get_setu()) # type: ignore
- data = {"pid": data[0], "title": data[1], "url": data[6]}
- if random() <= 0.1:
- await bot.send(event, "我找到图了,但我发给主人了❤")
- msg = await Hso.setu(data) + f"\n由用户({user})提供"
- for sup in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=msg)
- else:
- await setu.finish(Message(await Hso.setu(data)))
- else:
- if check >= 0.9:
- if random() <= 0.2:
- repo = "我找到图了,但我发给主人了❤\n" f"涩值:{score}"
- await bot.send(event, repo)
- msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
- for sup in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=msg)
- else:
- await setu.finish(Message(await Hso.setu(data)))
- else:
- if random() <= 0.1:
- await bot.send(event, "我找到图了,但我发给主人了❤")
- msg = await Hso.setu(data) + f"\n由用户({user})提供,涩值:{score}"
- for sup in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=msg)
- else:
- await setu.finish(Message(await Hso.setu(data)))
- else:
- if random() <= 0.5:
- if random() <= 0.1:
- await bot.send(event, "我找到图了,但我发给主人了❤")
- msg = await Hso.setu(data) + f"\n由用户({user})提供"
- for sup in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=msg)
- else:
- await setu.finish(Message(await Hso.setu(data)))
- else:
- data = choice(await SetuData.get_setu()) # type: ignore
- data = {"pid": data[0], "title": data[1], "url": data[6]}
- if random() <= 0.1:
- await bot.send(event, "我找到图了,但我发给主人了❤")
- msg = await Hso.setu(data) + f"\n由用户({user})提供"
- for sup in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=msg)
- else:
- await setu.finish(Message(await Hso.setu(data)))
-
-
-key_setu = sv.on_regex(r"来[点张](.*?)的[涩色🐍]图", rule=is_in_service("setu"))
-
-
-@key_setu.handle()
-async def _key_setu(bot: Bot, event: MessageEvent) -> None:
- user = event.user_id
- check = is_too_exciting(user, 10, hours=1)
- if not check:
- await setu.finish("休息一下吧❤")
-
- await bot.send(event, "别急,在找了!")
- msg = str(event.message).strip()
- tag = re.findall(r"来[点张](.*?)的?[涩色🐍]图", msg)[0]
- URL = PIXIV_URL + tag
-
- try:
- data = json.loads(await get_bytes(URL))["illusts"]
- except RequestError:
- raise RequestError("Request msg failed!")
-
- if random() <= 0.1:
- await bot.send(event, "我找到图了,但我发给主人了❤")
- msg = await Hso.acc_setu(data) + f"\n由用户({user})提供"
- for sup in BotSelfConfig.superusers:
- await bot.send_private_msg(user_id=sup, message=msg)
- else:
- await setu.finish(Message(await Hso.acc_setu(data)))
-
-
-__doc__ = """
-涩图设置
-权限组:维护者
-用法:
- 涩图设置 启用/禁用r18
- 涩图设置 启用/禁用压缩
- 涩图设置 启用/禁用本地涩图
- 涩图设置 启用/禁用混合本地涩图
-"""
-
-setu_config = sv.on_command(cmd="涩图设置", docs=__doc__, permission=SUPERUSER)
-
-
-@setu_config.handle()
-async def _setu_config(bot: Bot, event: MessageEvent) -> None:
- global R18_ENABLED, SIZE_REDUCE, USE_LOCAL_DATA, MIX_LOCAL_DATA
- msg = str(event.message).split(" ")
- if msg[0] == "":
- repo = "可用设置如下:\n启用/禁用r18\n启用/禁用压缩\n启用/禁用本地涩图\n启用/禁用混合本地涩图"
- await setu_config.finish(repo)
- elif msg[0] == "启用r18":
- R18_ENABLED = 1
- await setu_config.finish("已启用r18")
- elif msg[0] == "禁用r18":
- R18_ENABLED = 0
- await setu_config.finish("已禁用r18")
- elif msg[0] == "启用压缩":
- SIZE_REDUCE = True
- await setu_config.finish("已启用图片压缩")
- elif msg[0] == "禁用压缩":
- SIZE_REDUCE = False
- await setu_config.finish("已禁用图片压缩")
- elif msg[0] == "启用本地涩图":
- USE_LOCAL_DATA = True
- await setu_config.finish("已启用本地涩图")
- elif msg[0] == "禁用本地涩图":
- USE_LOCAL_DATA = False
- await setu_config.finish("已禁用本地涩图")
- elif msg[0] == "启用混合本地涩图":
- MIX_LOCAL_DATA = True
- await setu_config.finish("已启用混合本地涩图")
- elif msg[0] == "禁用混合本地涩图":
- MIX_LOCAL_DATA = False
- await setu_config.finish("已禁用混合本地涩图")
- else:
- await setu_config.finish("阿!请检查拼写")
-
-
-not_get_se = sv.on_command("不够涩")
-
-
-@not_get_se.handle()
-async def _not_se(bot: Bot, event: MessageEvent) -> None:
- user = event.user_id
- check = is_too_exciting(user, 1, 120)
- if check:
- msg = choice(["那你来发", "那你来发❤"])
- await not_get_se.finish(msg)
diff --git a/ATRI/plugins/setu/modules/scheduler.py b/ATRI/plugins/setu/modules/scheduler.py
deleted file mode 100644
index e66c398..0000000
--- a/ATRI/plugins/setu/modules/scheduler.py
+++ /dev/null
@@ -1,15 +0,0 @@
-import shutil
-from ATRI.log import logger as log
-from ATRI.utils.apscheduler import scheduler
-
-from .data_source import TEMP_DIR
-
-
[email protected]_job("interval", days=7, misfire_grace_time=10)
-async def clear_temp():
- log.info("正在清除涩图缓存")
- try:
- shutil.rmtree(TEMP_DIR)
- log.info("清除缓存成功!")
- except Exception:
- log.warn("清除图片缓存失败!")
diff --git a/ATRI/plugins/setu/modules/store.py b/ATRI/plugins/setu/modules/store.py
deleted file mode 100644
index e278c77..0000000
--- a/ATRI/plugins/setu/modules/store.py
+++ /dev/null
@@ -1,136 +0,0 @@
-import json
-from random import choice
-
-from nonebot.typing import T_State
-from nonebot.permission import SUPERUSER
-from nonebot.adapters.cqhttp import Bot, MessageEvent
-from nonebot.adapters.cqhttp.message import Message, MessageSegment
-
-from ATRI.service import Service as sv
-from ATRI.utils.request import get_bytes
-from ATRI.exceptions import RequestError
-
-from .data_source import SetuData
-
-
-API_URL: str = "https://api.kyomotoi.moe/api/pixiv/illust?id="
-
-
-__doc__ = """
-为本地添加涩图!
-权限组:维护者
-用法:
- 添加涩图 (pid)
-补充:
- pid: Pixiv 作品id
-"""
-
-
-add_setu = sv.on_command(cmd="添加涩图", docs=__doc__, permission=SUPERUSER)
-
-
-@add_setu.args_parser # type: ignore
-async def _load_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
- cancel = ["算了", "罢了"]
- if msg in cancel:
- await add_setu.finish("好吧...")
- if not msg:
- await add_setu.reject("涩图(pid)速发!")
- else:
- state["setu_add"] = msg
-
-
-@add_setu.handle()
-async def _add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
- if msg:
- state["setu_add"] = msg
-
-
-@add_setu.got("setu_add", prompt="涩图(pid)速发!")
-async def _deal_add_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- pid = state["setu_add"]
-
- URL = API_URL + pid
- try:
- data = json.loads(await get_bytes(URL))["illust"]
- except RequestError:
- raise RequestError("Request failed!")
-
- try:
- pic = data["meta_single_page"]["original_image_url"].replace(
- "pximg.net", "pixiv.cat"
- )
- except Exception:
- pic = choice(data["meta_pages"])["image_urls"]["original"].replace(
- "pximg.net", "pixiv.cat"
- )
-
- d = {
- "pid": pid,
- "title": data["title"],
- "tags": str(data["tags"]),
- "user_id": data["user"]["id"],
- "user_name": data["user"]["name"],
- "user_account": data["user"]["account"],
- "url": pic,
- }
- await SetuData.add_data(d)
-
- show_img = data["image_urls"]["medium"].replace("pximg.net", "pixiv.cat")
- msg = (
- "好欸!是新涩图:\n"
- f"Pid: {pid}\n"
- f"Title: {data['title']}\n"
- f"{MessageSegment.image(show_img)}"
- )
- await add_setu.finish(Message(msg))
-
-
-__doc__ = """
-删除涩图!
-权限组:维护者
-用法:
- 删除涩图 (pid)
-补充:
- pid: Pixiv 作品id
-"""
-
-
-del_setu = sv.on_command(cmd="删除涩图", docs=__doc__, permission=SUPERUSER)
-
-
-@del_setu.args_parser # type: ignore
-async def _load_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
- cancel = ["算了", "罢了"]
- if msg in cancel:
- await add_setu.finish("好吧...")
- if not msg:
- await add_setu.reject("涩图(pid)速发!")
- else:
- state["setu_del"] = msg
-
-
-@del_setu.handle()
-async def _del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- msg = str(event.message).strip()
- if msg:
- state["setu_del"] = msg
-
-
-@del_setu.got("setu_del", prompt="涩图(pid)速发!")
-async def _deal_del_setu(bot: Bot, event: MessageEvent, state: T_State) -> None:
- pid = int(state["setu_del"])
- await SetuData.del_data(pid)
- await del_setu.finish(f"涩图({pid})已删除...")
-
-
-count_setu = sv.on_command(cmd="涩图总量", permission=SUPERUSER)
-
-
-@count_setu.handle()
-async def _count_setu(bot: Bot, event: MessageEvent) -> None:
- msg = f"咱本地搭载了 {await SetuData.count()} 张涩图!"
- await count_setu.finish(msg)