diff options
Diffstat (limited to 'ATRI')
-rw-r--r-- | ATRI/plugins/plugin_admin/__init__.py | 15 | ||||
-rw-r--r-- | ATRI/plugins/plugin_anime/__init__.py | 46 | ||||
-rw-r--r-- | ATRI/plugins/plugin_chat/__init__.py | 2 | ||||
-rw-r--r-- | ATRI/plugins/plugin_link/__init__.py | 26 | ||||
-rw-r--r-- | ATRI/plugins/plugin_pixiv/__init__.py | 32 | ||||
-rw-r--r-- | ATRI/plugins/plugin_test/__init__.py | 10 | ||||
-rw-r--r-- | ATRI/plugins/plugin_utils/__init__.py | 42 | ||||
-rw-r--r-- | ATRI/plugins/plugin_utils/data_source.py | 143 |
8 files changed, 257 insertions, 59 deletions
diff --git a/ATRI/plugins/plugin_admin/__init__.py b/ATRI/plugins/plugin_admin/__init__.py index 1dbbb76..a8bb61b 100644 --- a/ATRI/plugins/plugin_admin/__init__.py +++ b/ATRI/plugins/plugin_admin/__init__.py @@ -45,16 +45,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: data = json.load(f) if not func: - msg0 = "-==ATRI Switch Control System==-\n" - msg0 += "Usage: /switch on/off-{service}\n" - msg0 += "* For SUPERUSER:\n" - msg0 += " - Usage: /switch all-on/off-{service}\n" - msg0 += "Service:\n" - - for i in data.keys(): - msg0 += f" {i}\n" - - await switch.finish(msg0) + await switch.finish('请查看文档获取帮助(') funct = re.findall(r"[on|off]-(.*)", func) @@ -202,10 +193,10 @@ async def _(bot: Bot, event: Event, state: dict) -> None: err_list = [] for group in group_list: - asyncio.sleep(randint(1, 5)) + asyncio.sleep(randint(2, 10)) try: await bot.send_group_msg(group_id=group['group_id'], - message=msg) + message=msg) sc_list.append(group['group_id']) except: await bot.send(event, f"在尝试推送到群[{group['group_id']}]时失败了呢...") diff --git a/ATRI/plugins/plugin_anime/__init__.py b/ATRI/plugins/plugin_anime/__init__.py index 556b6db..fea720b 100644 --- a/ATRI/plugins/plugin_anime/__init__.py +++ b/ATRI/plugins/plugin_anime/__init__.py @@ -20,9 +20,9 @@ from apscheduler.triggers.date import DateTrigger from nonebot.rule import Rule from nonebot.log import logger -from nonebot.sched import scheduler from nonebot.typing import Bot, Event from nonebot.permission import SUPERUSER +from nonebot_plugin_apscheduler import scheduler from nonebot.plugin import on_message, on_command, on_regex from ATRI.utils.utils_times import countX @@ -141,7 +141,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: if len(img): pass else: - await SaucenaoSearch.reject("请发送一张目标图片,而非文字或其他非图片成分( -'`-; )") + await SaucenaoSearch.reject("请发送一张目标图片,而非文字或其他非图片成分(") await bot.send(event, "别急!正在搜索!") req = None @@ -152,11 +152,10 @@ async def _(bot: Bot, event: Event, state: dict) -> None: except: await AnimeSearch.finish(errorRepo("请求数据失败")) + d = {} data = json.loads(req.decode()) try: - d = {} - for i in range(len(data['docs'])): if data['docs'][i]['title_chinese'] in d: d[data['docs'][i] @@ -180,12 +179,12 @@ async def _(bot: Bot, event: Event, state: dict) -> None: await AnimeSearch.finish(errorRepo("处理数据失败")) result = sorted( - d.items(), # type: ignore + d.items(), key=lambda x: x[1], reverse=True) t = 0 - msg0 = f'[CQ:at,qq={state["user"]}]\n根据所提供的图片按照相似度找到{len(d)}个结果:' # type: ignore + msg0 = f'[CQ:at,qq={state["user"]}]\n根据所提供的图片按照相似度找到{len(d)}个结果:' for i in result: t += 1 @@ -205,11 +204,15 @@ SP_temp_list = [] SP_list = [] -def check_sepi(user) -> bool: - if user in SP_list: - return True - else: - return False +def check_sepi() -> Rule: + """检查目标是否是涩批""" + async def _check_sepi(bot: Bot, event: Event, state: dict) -> bool: + if event.user_id in SP_list: + await bot.send(event, "你可少冲点吧!涩批!哼唧") + return False + else: + return True + return Rule(_check_sepi) def add_sepi(user: int) -> None: """将目标移入涩批名单""" @@ -224,7 +227,7 @@ def del_sepi(user: int) -> None: setu = on_regex( r"来[点丶张份副个幅][涩色瑟][图圖]|[涩色瑟][图圖]来|[涩色瑟][图圖][gkd|GKD|搞快点]|[gkd|GKD|搞快点][涩色瑟][图圖]", - rule=check_banlist() & check_switch(plugin_name_2, False)) + rule=check_banlist() & check_switch(plugin_name_2, False) & check_sepi()) @setu.handle() @@ -233,9 +236,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: user = event.user_id group = event.group_id res = randint(1, 5) - - if check_sepi(user): - await setu.finish("你可少冲点吧!涩批!哼唧") + print(1) if countX(SP_temp_list, user) == 5: add_sepi(user) # type: ignore @@ -317,22 +318,13 @@ async def _(bot: Bot, event: Event, state: dict) -> None: setuType = on_command("setu-type", permission=SUPERUSER) [email protected]() # type: ignore async def _(bot: Bot, event: Event, state: dict) -> None: global setu_type msg = str(event.message).strip() - if msg: - pass - else: - msg0 = "-==ATRI Setu Type Control System==-\n" - msg0 += "**Tips: For SUPERUSERS**\n" - msg0 += "┌Usage: setu-type {type}\n" - msg0 += "└Type:\n" - msg0 += " ├local\n" - msg0 += " └url" - - await setuType.finish(msg0) + if not msg: + await setuType.finish("请查看文档获取帮助(") if msg == "local": setu_type = 1 diff --git a/ATRI/plugins/plugin_chat/__init__.py b/ATRI/plugins/plugin_chat/__init__.py index 0474d46..5a323c3 100644 --- a/ATRI/plugins/plugin_chat/__init__.py +++ b/ATRI/plugins/plugin_chat/__init__.py @@ -23,9 +23,9 @@ from apscheduler.triggers.date import DateTrigger from nonebot.log import logger from nonebot.rule import to_me -from nonebot.sched import scheduler from nonebot.typing import Bot, Event from nonebot.permission import SUPERUSER +from nonebot_plugin_apscheduler import scheduler from nonebot.plugin import on_command, on_message, on_notice, on_request, on_regex from ATRI.utils.utils_times import countX diff --git a/ATRI/plugins/plugin_link/__init__.py b/ATRI/plugins/plugin_link/__init__.py new file mode 100644 index 0000000..029e2a8 --- /dev/null +++ b/ATRI/plugins/plugin_link/__init__.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +''' +@File : __init__.py +@Time : 2020/12/05 18:40:43 +@Author : Kyomotoi +@Contact : [email protected] +@Github : https://github.com/Kyomotoi +@License : Copyright © 2018-2020 Kyomotoi, All Rights Reserved. +''' +__author__ = 'kyomotoi' + +import nonebot +from nonebot.plugin import on_command +from nonebot.typing import Bot, Event +from nonebot.permission import SUPERUSER + + +bots = nonebot.get_bots() + +testGetBot = on_command('获取bot', permission=SUPERUSER) + +async def _(bot: Bot, event: Event, state: dict) -> None: + print(bots) diff --git a/ATRI/plugins/plugin_pixiv/__init__.py b/ATRI/plugins/plugin_pixiv/__init__.py index c7687ed..3bb2ae0 100644 --- a/ATRI/plugins/plugin_pixiv/__init__.py +++ b/ATRI/plugins/plugin_pixiv/__init__.py @@ -12,13 +12,13 @@ __author__ = 'kyomotoi' import re import json -from requests import exceptions from nonebot.plugin import on_command from nonebot.typing import Bot, Event from ATRI.utils.utils_error import errorRepo from ATRI.utils.utils_img import aio_download_pics +from ATRI.utils.utils_request import aio_get_bytes from ATRI.utils.utils_rule import check_banlist, check_switch plugin_name_0 = "pixiv-pic-search" @@ -56,8 +56,8 @@ async def _(bot: Bot, event: Event, state: dict) -> None: data = {} try: - data = json.loads(await aio_download_pics(URL)) - except exceptions: + data = json.loads(await aio_get_bytes(URL)) + except: await pixivSearchIMG.finish(errorRepo("请求数据失败")) IMG = data["response"][0]["image_urls"]["large"] @@ -72,7 +72,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: msg0 += f'Account Name: {data["response"][0]["user"]["account"]}\n' msg0 += f'Author Name: {data["response"][0]["user"]["name"]}\n' msg0 += f'Link: https://www.pixiv.net/users/{data["response"][0]["user"]["id"]}\n' - msg0 += IMG + msg0 += IMG.replace('https://', '') await pixivSearchIMG.finish(msg0) @@ -113,8 +113,8 @@ async def _(bot: Bot, event: Event, state: dict) -> None: data = {} try: - data = json.loads(await aio_download_pics(URL)) - except exceptions: + data = json.loads(await aio_get_bytes(URL)) + except: await pixivSearchAuthor.finish(errorRepo("请求网络失败")) d = {} @@ -138,7 +138,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None: msg += f"({t})\n" msg += f"Title: {i[1][1]}\n" msg += f"Pid: {i[1][0]}\n" - msg += f"{i[1][2]}" + msg += f"{i[1][2].replace('https://', '')}" await pixivSearchAuthor.finish(msg) @@ -152,22 +152,21 @@ pixivRank = on_command("p站排行榜", async def _(bot: Bot, event: Event, state: dict) -> None: user = str(event.user_id) - await bot.send(event, "正在获取P站每日排行榜前五作品") + await bot.send(event, "正在获取P站每日排行榜前三作品") URL = "https://api.imjad.cn/pixiv/v1/?type=rank" data = {} try: - data = json.loads(await aio_download_pics(URL)) - except exceptions: + data = json.loads(await aio_get_bytes(URL)) + except: await pixivRank.finish(errorRepo("网络请求失败")) d = {} - - for i in range(0, 5): + for i in range(0, 3): pid = data["response"][0]["works"][i]["work"]["id"] title = data["response"][0]["works"][i]["work"]["title"] - IMG = data["response"][i]["works"]["image_urls"]["large"] + IMG = data["response"][0]["works"][i]["work"]["image_urls"]["large"] IMG = IMG.replace("i.pximg.net", "i.pixiv.cat") d[i] = [f"{pid}", f"{title}", f"{IMG}"] @@ -181,8 +180,9 @@ async def _(bot: Bot, event: Event, state: dict) -> None: t += 1 msg += "\n————————————\n" msg += f"({t})\n" - msg += f"Title: {i[1][1]}" - msg += f"Pid: {i[1][0]}" - msg += f"{i[1][2]}" + msg += f"Title: {i[1][1]}\n" + msg += f"Pid: {i[1][0]}\n" + msg += f"{i[1][2].replace('https://', '')}" + print(msg) await pixivRank.finish(msg) diff --git a/ATRI/plugins/plugin_test/__init__.py b/ATRI/plugins/plugin_test/__init__.py index 72c7372..573cf8b 100644 --- a/ATRI/plugins/plugin_test/__init__.py +++ b/ATRI/plugins/plugin_test/__init__.py @@ -11,6 +11,7 @@ ''' __author__ = 'kyomotoi' +import inspect import os from pathlib import Path from random import sample @@ -40,3 +41,12 @@ async def _(bot: Bot, event: Event, state: dict) -> None: group_list = await bot.get_group_list() group = sample(group_list, 1) print(group[0]['group_id'], type(group[0]['group_id'])) + + +testSendFormat = on_command('测试发送', permission=SUPERUSER) + + +async def _(bot: Bot, event: Event, state: dict) -> None: + msg = ("test0\n" "test1\n" "test2") + await bot.send(event, msg) diff --git a/ATRI/plugins/plugin_utils/__init__.py b/ATRI/plugins/plugin_utils/__init__.py index 11f6c8b..4181e30 100644 --- a/ATRI/plugins/plugin_utils/__init__.py +++ b/ATRI/plugins/plugin_utils/__init__.py @@ -20,7 +20,7 @@ from nonebot.typing import Bot, Event from ATRI.utils.utils_error import errorRepo from ATRI.utils.utils_rule import check_banlist, check_switch -from .data_source import Generate, Genshin, Roll +from .data_source import Generate, Genshin, Roll, RCNB plugin_name_0 = "one-key-adult" generateID = on_command("我要转大人,一天打25小时游戏", @@ -114,3 +114,43 @@ async def _(bot: Bot, event: Event, state: dict) -> None: else: await genshinInfo.finish('UID检查未通过,请确保此ID为9位数或者是否为国服ID~!') + + +rcnb = RCNB() + +rcnbEncode = on_command('RC一下', + aliases={'rc一下', '啊西一下', '阿西一下'}, + rule=check_banlist()) + + +async def _(bot: Bot, event: Event, state: dict) -> None: + msg = str(event.message).strip() + + if msg: + state['msg'] = msg + + [email protected]('msg', prompt='请告诉咱需要RC一下的字符~!') +async def _(bot: Bot, event: Event, state: dict) -> None: + msg = state['msg'] + await rcnbEncode.finish(rcnb._encode(msg)) + + +rcnbDecode = on_command('一下RC', + aliases={'一下rc', '一下啊西', '一下阿西'}, + rule=check_banlist()) + + +async def _(bot: Bot, event: Event, state: dict) -> None: + msg = str(event.message).strip() + + if msg: + state['msg'] = msg + + [email protected]('msg', prompt='请告诉咱需要一下RC的字符~!') +async def _(bot: Bot, event: Event, state: dict) -> None: + msg = state['msg'] + await rcnbDecode.finish(rcnb._decode(msg)) diff --git a/ATRI/plugins/plugin_utils/data_source.py b/ATRI/plugins/plugin_utils/data_source.py index da6c52c..116b977 100644 --- a/ATRI/plugins/plugin_utils/data_source.py +++ b/ATRI/plugins/plugin_utils/data_source.py @@ -18,9 +18,10 @@ import string import random import hashlib import requests +from math import floor from pathlib import Path from zipfile import PyZipFile -from typing import Tuple, Dict, List +from typing import Tuple, Dict, List, Union class Generate: @@ -191,4 +192,142 @@ class Roll: result = f"{par}=({proc})={result}" - return str(result)
\ No newline at end of file + return str(result) + + +class RCNB(): + """R C N B""" + cr = 'rRŔŕŖŗŘřƦȐȑȒȓɌɍ' + cc = 'cCĆćĈĉĊċČčƇƈÇȻȼ' + cn = 'nNŃńŅņŇňƝƞÑǸǹȠȵ' + cb = 'bBƀƁƃƄƅßÞþ' + + sr = len(cr) + sc = len(cc) + sn = len(cn) + sb = len(cb) + src = sr * sc + snb = sn * sb + scnb = sc * snb + + def _div(self, a: int, b: int) -> int: + return floor(a / b) + + def _encodeByte(self, i) -> Union[str, None]: + if i > 0xFF: + raise ValueError('ERROR! rc/nb overflow') + + if i > 0x7F: + i = i & 0x7F + return self.cn[self._div(i, self.sb) + int(self.cb[i % self.sb])] + + return self.cr[self._div(i, self.sc) + int(self.cc[i % self.sc])] + + def _encodeShort(self, i) -> str: + if i > 0xFFFF: + raise ValueError('ERROR! rcnb overflow') + + reverse = False + if i > 0x7FFF: + reverse = True + i = i & 0x7FFF + + char = [ + self._div(i, self.scnb), + self._div(i % self.scnb, self.snb), + self._div(i % self.snb, self.sb), i % self.sb + ] + char = [ + self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], + self.cb[char[3]] + ] + + if reverse: + return char[2] + char[3] + char[0] + char[1] + + return ''.join(char) + + def _decodeByte(self, c) -> int: + nb = False + idx = [self.cr.index(c[0]), self.cc.index(c[1])] + if idx[0] < 0 or idx[1] < 0: + idx = [self.cn.index(c[0]), self.cb.index(c[1])] + nb = True + raise ValueError('ERROR! rc/nb overflow') + + result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1] + if result > 0x7F: + raise ValueError('ERROR! rc/nb overflow') + + return result | 0x80 if nb else 0 + + def _decodeShort(self, c) -> int: + reverse = c[0] not in self.cr + if not reverse: + idx = [ + self.cr.index(c[0]), + self.cc.index(c[1]), + self.cn.index(c[2]), + self.cb.index(c[3]) + ] + else: + idx = [ + self.cr.index(c[2]), + self.cc.index(c[3]), + self.cn.index(c[0]), + self.cb.index(c[1]) + ] + + if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0: + raise ValueError('ERROR! not rcnb') + + result = idx[0] * self.scnb + idx[1] * self.snb + idx[ + 2] * self.sb + idx[3] + if result > 0x7FFF: + raise ValueError('ERROR! rcnb overflow') + + result |= 0x8000 if reverse else 0 + return result + + def _encodeBytes(self, b) -> str: + result = [] + for i in range(0, (len(b) >> 1)): + result.append(self._encodeShort((b[i * 2] << 8 | b[i * 2 + 1]))) + + if len(b) & 1 == 1: + result.append(self._encodeByte(b[-1])) + + return ''.join(result) + + def _encode(self, s: str, encoding: str = 'utf-8'): + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + return self._encodeBytes(s.encode(encoding)) + + def _decodeBytes(self, s: str): + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + if len(s) & 1: + raise ValueError('ERROR length') + + result = [] + for i in range(0, (len(s) >> 2)): + result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8])) + result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF + ])) + + if (len(s) & 2) == 2: + result.append(bytes([self._decodeByte(s[-2:])])) + + return b''.join(result) + + def _decode(self, s: str, encoding: str = 'utf-8') -> str: + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + try: + return self._decodeBytes(s).decode(encoding) + except UnicodeDecodeError: + raise ValueError('Decoding failed') |