summaryrefslogtreecommitdiff
path: root/ATRI
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI')
-rw-r--r--ATRI/plugins/plugin_admin/__init__.py15
-rw-r--r--ATRI/plugins/plugin_anime/__init__.py46
-rw-r--r--ATRI/plugins/plugin_chat/__init__.py2
-rw-r--r--ATRI/plugins/plugin_link/__init__.py26
-rw-r--r--ATRI/plugins/plugin_pixiv/__init__.py32
-rw-r--r--ATRI/plugins/plugin_test/__init__.py10
-rw-r--r--ATRI/plugins/plugin_utils/__init__.py42
-rw-r--r--ATRI/plugins/plugin_utils/data_source.py143
8 files changed, 257 insertions, 59 deletions
diff --git a/ATRI/plugins/plugin_admin/__init__.py b/ATRI/plugins/plugin_admin/__init__.py
index 1dbbb76..a8bb61b 100644
--- a/ATRI/plugins/plugin_admin/__init__.py
+++ b/ATRI/plugins/plugin_admin/__init__.py
@@ -45,16 +45,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
data = json.load(f)
if not func:
- msg0 = "-==ATRI Switch Control System==-\n"
- msg0 += "Usage: /switch on/off-{service}\n"
- msg0 += "* For SUPERUSER:\n"
- msg0 += " - Usage: /switch all-on/off-{service}\n"
- msg0 += "Service:\n"
-
- for i in data.keys():
- msg0 += f" {i}\n"
-
- await switch.finish(msg0)
+ await switch.finish('请查看文档获取帮助(')
funct = re.findall(r"[on|off]-(.*)", func)
@@ -202,10 +193,10 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
err_list = []
for group in group_list:
- asyncio.sleep(randint(1, 5))
+ asyncio.sleep(randint(2, 10))
try:
await bot.send_group_msg(group_id=group['group_id'],
- message=msg)
+ message=msg)
sc_list.append(group['group_id'])
except:
await bot.send(event, f"在尝试推送到群[{group['group_id']}]时失败了呢...")
diff --git a/ATRI/plugins/plugin_anime/__init__.py b/ATRI/plugins/plugin_anime/__init__.py
index 556b6db..fea720b 100644
--- a/ATRI/plugins/plugin_anime/__init__.py
+++ b/ATRI/plugins/plugin_anime/__init__.py
@@ -20,9 +20,9 @@ from apscheduler.triggers.date import DateTrigger
from nonebot.rule import Rule
from nonebot.log import logger
-from nonebot.sched import scheduler
from nonebot.typing import Bot, Event
from nonebot.permission import SUPERUSER
+from nonebot_plugin_apscheduler import scheduler
from nonebot.plugin import on_message, on_command, on_regex
from ATRI.utils.utils_times import countX
@@ -141,7 +141,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
if len(img):
pass
else:
- await SaucenaoSearch.reject("请发送一张目标图片,而非文字或其他非图片成分( -'`-; )")
+ await SaucenaoSearch.reject("请发送一张目标图片,而非文字或其他非图片成分(")
await bot.send(event, "别急!正在搜索!")
req = None
@@ -152,11 +152,10 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
except:
await AnimeSearch.finish(errorRepo("请求数据失败"))
+ d = {}
data = json.loads(req.decode())
try:
- d = {}
-
for i in range(len(data['docs'])):
if data['docs'][i]['title_chinese'] in d:
d[data['docs'][i]
@@ -180,12 +179,12 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
await AnimeSearch.finish(errorRepo("处理数据失败"))
result = sorted(
- d.items(), # type: ignore
+ d.items(),
key=lambda x: x[1],
reverse=True)
t = 0
- msg0 = f'[CQ:at,qq={state["user"]}]\n根据所提供的图片按照相似度找到{len(d)}个结果:' # type: ignore
+ msg0 = f'[CQ:at,qq={state["user"]}]\n根据所提供的图片按照相似度找到{len(d)}个结果:'
for i in result:
t += 1
@@ -205,11 +204,15 @@ SP_temp_list = []
SP_list = []
-def check_sepi(user) -> bool:
- if user in SP_list:
- return True
- else:
- return False
+def check_sepi() -> Rule:
+ """检查目标是否是涩批"""
+ async def _check_sepi(bot: Bot, event: Event, state: dict) -> bool:
+ if event.user_id in SP_list:
+ await bot.send(event, "你可少冲点吧!涩批!哼唧")
+ return False
+ else:
+ return True
+ return Rule(_check_sepi)
def add_sepi(user: int) -> None:
"""将目标移入涩批名单"""
@@ -224,7 +227,7 @@ def del_sepi(user: int) -> None:
setu = on_regex(
r"来[点丶张份副个幅][涩色瑟][图圖]|[涩色瑟][图圖]来|[涩色瑟][图圖][gkd|GKD|搞快点]|[gkd|GKD|搞快点][涩色瑟][图圖]",
- rule=check_banlist() & check_switch(plugin_name_2, False))
+ rule=check_banlist() & check_switch(plugin_name_2, False) & check_sepi())
@setu.handle()
@@ -233,9 +236,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
user = event.user_id
group = event.group_id
res = randint(1, 5)
-
- if check_sepi(user):
- await setu.finish("你可少冲点吧!涩批!哼唧")
+ print(1)
if countX(SP_temp_list, user) == 5:
add_sepi(user) # type: ignore
@@ -317,22 +318,13 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
setuType = on_command("setu-type", permission=SUPERUSER)
[email protected]() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
global setu_type
msg = str(event.message).strip()
- if msg:
- pass
- else:
- msg0 = "-==ATRI Setu Type Control System==-\n"
- msg0 += "**Tips: For SUPERUSERS**\n"
- msg0 += "┌Usage: setu-type {type}\n"
- msg0 += "└Type:\n"
- msg0 += " ├local\n"
- msg0 += " └url"
-
- await setuType.finish(msg0)
+ if not msg:
+ await setuType.finish("请查看文档获取帮助(")
if msg == "local":
setu_type = 1
diff --git a/ATRI/plugins/plugin_chat/__init__.py b/ATRI/plugins/plugin_chat/__init__.py
index 0474d46..5a323c3 100644
--- a/ATRI/plugins/plugin_chat/__init__.py
+++ b/ATRI/plugins/plugin_chat/__init__.py
@@ -23,9 +23,9 @@ from apscheduler.triggers.date import DateTrigger
from nonebot.log import logger
from nonebot.rule import to_me
-from nonebot.sched import scheduler
from nonebot.typing import Bot, Event
from nonebot.permission import SUPERUSER
+from nonebot_plugin_apscheduler import scheduler
from nonebot.plugin import on_command, on_message, on_notice, on_request, on_regex
from ATRI.utils.utils_times import countX
diff --git a/ATRI/plugins/plugin_link/__init__.py b/ATRI/plugins/plugin_link/__init__.py
new file mode 100644
index 0000000..029e2a8
--- /dev/null
+++ b/ATRI/plugins/plugin_link/__init__.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python3
+# -*- encoding: utf-8 -*-
+
+'''
+@File : __init__.py
+@Time : 2020/12/05 18:40:43
+@Author : Kyomotoi
+@Contact : [email protected]
+@Github : https://github.com/Kyomotoi
+@License : Copyright © 2018-2020 Kyomotoi, All Rights Reserved.
+'''
+__author__ = 'kyomotoi'
+
+import nonebot
+from nonebot.plugin import on_command
+from nonebot.typing import Bot, Event
+from nonebot.permission import SUPERUSER
+
+
+bots = nonebot.get_bots()
+
+testGetBot = on_command('获取bot', permission=SUPERUSER)
+
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ print(bots)
diff --git a/ATRI/plugins/plugin_pixiv/__init__.py b/ATRI/plugins/plugin_pixiv/__init__.py
index c7687ed..3bb2ae0 100644
--- a/ATRI/plugins/plugin_pixiv/__init__.py
+++ b/ATRI/plugins/plugin_pixiv/__init__.py
@@ -12,13 +12,13 @@ __author__ = 'kyomotoi'
import re
import json
-from requests import exceptions
from nonebot.plugin import on_command
from nonebot.typing import Bot, Event
from ATRI.utils.utils_error import errorRepo
from ATRI.utils.utils_img import aio_download_pics
+from ATRI.utils.utils_request import aio_get_bytes
from ATRI.utils.utils_rule import check_banlist, check_switch
plugin_name_0 = "pixiv-pic-search"
@@ -56,8 +56,8 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
data = {}
try:
- data = json.loads(await aio_download_pics(URL))
- except exceptions:
+ data = json.loads(await aio_get_bytes(URL))
+ except:
await pixivSearchIMG.finish(errorRepo("请求数据失败"))
IMG = data["response"][0]["image_urls"]["large"]
@@ -72,7 +72,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
msg0 += f'Account Name: {data["response"][0]["user"]["account"]}\n'
msg0 += f'Author Name: {data["response"][0]["user"]["name"]}\n'
msg0 += f'Link: https://www.pixiv.net/users/{data["response"][0]["user"]["id"]}\n'
- msg0 += IMG
+ msg0 += IMG.replace('https://', '')
await pixivSearchIMG.finish(msg0)
@@ -113,8 +113,8 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
data = {}
try:
- data = json.loads(await aio_download_pics(URL))
- except exceptions:
+ data = json.loads(await aio_get_bytes(URL))
+ except:
await pixivSearchAuthor.finish(errorRepo("请求网络失败"))
d = {}
@@ -138,7 +138,7 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
msg += f"({t})\n"
msg += f"Title: {i[1][1]}\n"
msg += f"Pid: {i[1][0]}\n"
- msg += f"{i[1][2]}"
+ msg += f"{i[1][2].replace('https://', '')}"
await pixivSearchAuthor.finish(msg)
@@ -152,22 +152,21 @@ pixivRank = on_command("p站排行榜",
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
- await bot.send(event, "正在获取P站每日排行榜前五作品")
+ await bot.send(event, "正在获取P站每日排行榜前三作品")
URL = "https://api.imjad.cn/pixiv/v1/?type=rank"
data = {}
try:
- data = json.loads(await aio_download_pics(URL))
- except exceptions:
+ data = json.loads(await aio_get_bytes(URL))
+ except:
await pixivRank.finish(errorRepo("网络请求失败"))
d = {}
-
- for i in range(0, 5):
+ for i in range(0, 3):
pid = data["response"][0]["works"][i]["work"]["id"]
title = data["response"][0]["works"][i]["work"]["title"]
- IMG = data["response"][i]["works"]["image_urls"]["large"]
+ IMG = data["response"][0]["works"][i]["work"]["image_urls"]["large"]
IMG = IMG.replace("i.pximg.net", "i.pixiv.cat")
d[i] = [f"{pid}", f"{title}", f"{IMG}"]
@@ -181,8 +180,9 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
t += 1
msg += "\n————————————\n"
msg += f"({t})\n"
- msg += f"Title: {i[1][1]}"
- msg += f"Pid: {i[1][0]}"
- msg += f"{i[1][2]}"
+ msg += f"Title: {i[1][1]}\n"
+ msg += f"Pid: {i[1][0]}\n"
+ msg += f"{i[1][2].replace('https://', '')}"
+ print(msg)
await pixivRank.finish(msg)
diff --git a/ATRI/plugins/plugin_test/__init__.py b/ATRI/plugins/plugin_test/__init__.py
index 72c7372..573cf8b 100644
--- a/ATRI/plugins/plugin_test/__init__.py
+++ b/ATRI/plugins/plugin_test/__init__.py
@@ -11,6 +11,7 @@
'''
__author__ = 'kyomotoi'
+import inspect
import os
from pathlib import Path
from random import sample
@@ -40,3 +41,12 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
group_list = await bot.get_group_list()
group = sample(group_list, 1)
print(group[0]['group_id'], type(group[0]['group_id']))
+
+
+testSendFormat = on_command('测试发送', permission=SUPERUSER)
+
+
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ msg = ("test0\n" "test1\n" "test2")
+ await bot.send(event, msg)
diff --git a/ATRI/plugins/plugin_utils/__init__.py b/ATRI/plugins/plugin_utils/__init__.py
index 11f6c8b..4181e30 100644
--- a/ATRI/plugins/plugin_utils/__init__.py
+++ b/ATRI/plugins/plugin_utils/__init__.py
@@ -20,7 +20,7 @@ from nonebot.typing import Bot, Event
from ATRI.utils.utils_error import errorRepo
from ATRI.utils.utils_rule import check_banlist, check_switch
-from .data_source import Generate, Genshin, Roll
+from .data_source import Generate, Genshin, Roll, RCNB
plugin_name_0 = "one-key-adult"
generateID = on_command("我要转大人,一天打25小时游戏",
@@ -114,3 +114,43 @@ async def _(bot: Bot, event: Event, state: dict) -> None:
else:
await genshinInfo.finish('UID检查未通过,请确保此ID为9位数或者是否为国服ID~!')
+
+
+rcnb = RCNB()
+
+rcnbEncode = on_command('RC一下',
+ aliases={'rc一下', '啊西一下', '阿西一下'},
+ rule=check_banlist())
+
+
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ msg = str(event.message).strip()
+
+ if msg:
+ state['msg'] = msg
+
+
[email protected]('msg', prompt='请告诉咱需要RC一下的字符~!')
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ msg = state['msg']
+ await rcnbEncode.finish(rcnb._encode(msg))
+
+
+rcnbDecode = on_command('一下RC',
+ aliases={'一下rc', '一下啊西', '一下阿西'},
+ rule=check_banlist())
+
+
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ msg = str(event.message).strip()
+
+ if msg:
+ state['msg'] = msg
+
+
[email protected]('msg', prompt='请告诉咱需要一下RC的字符~!')
+async def _(bot: Bot, event: Event, state: dict) -> None:
+ msg = state['msg']
+ await rcnbDecode.finish(rcnb._decode(msg))
diff --git a/ATRI/plugins/plugin_utils/data_source.py b/ATRI/plugins/plugin_utils/data_source.py
index da6c52c..116b977 100644
--- a/ATRI/plugins/plugin_utils/data_source.py
+++ b/ATRI/plugins/plugin_utils/data_source.py
@@ -18,9 +18,10 @@ import string
import random
import hashlib
import requests
+from math import floor
from pathlib import Path
from zipfile import PyZipFile
-from typing import Tuple, Dict, List
+from typing import Tuple, Dict, List, Union
class Generate:
@@ -191,4 +192,142 @@ class Roll:
result = f"{par}=({proc})={result}"
- return str(result) \ No newline at end of file
+ return str(result)
+
+
+class RCNB():
+ """R C N B"""
+ cr = 'rRŔŕŖŗŘřƦȐȑȒȓɌɍ'
+ cc = 'cCĆćĈĉĊċČčƇƈÇȻȼ'
+ cn = 'nNŃńŅņŇňƝƞÑǸǹȠȵ'
+ cb = 'bBƀƁƃƄƅßÞþ'
+
+ sr = len(cr)
+ sc = len(cc)
+ sn = len(cn)
+ sb = len(cb)
+ src = sr * sc
+ snb = sn * sb
+ scnb = sc * snb
+
+ def _div(self, a: int, b: int) -> int:
+ return floor(a / b)
+
+ def _encodeByte(self, i) -> Union[str, None]:
+ if i > 0xFF:
+ raise ValueError('ERROR! rc/nb overflow')
+
+ if i > 0x7F:
+ i = i & 0x7F
+ return self.cn[self._div(i, self.sb) + int(self.cb[i % self.sb])]
+
+ return self.cr[self._div(i, self.sc) + int(self.cc[i % self.sc])]
+
+ def _encodeShort(self, i) -> str:
+ if i > 0xFFFF:
+ raise ValueError('ERROR! rcnb overflow')
+
+ reverse = False
+ if i > 0x7FFF:
+ reverse = True
+ i = i & 0x7FFF
+
+ char = [
+ self._div(i, self.scnb),
+ self._div(i % self.scnb, self.snb),
+ self._div(i % self.snb, self.sb), i % self.sb
+ ]
+ char = [
+ self.cr[char[0]], self.cc[char[1]], self.cn[char[2]],
+ self.cb[char[3]]
+ ]
+
+ if reverse:
+ return char[2] + char[3] + char[0] + char[1]
+
+ return ''.join(char)
+
+ def _decodeByte(self, c) -> int:
+ nb = False
+ idx = [self.cr.index(c[0]), self.cc.index(c[1])]
+ if idx[0] < 0 or idx[1] < 0:
+ idx = [self.cn.index(c[0]), self.cb.index(c[1])]
+ nb = True
+ raise ValueError('ERROR! rc/nb overflow')
+
+ result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1]
+ if result > 0x7F:
+ raise ValueError('ERROR! rc/nb overflow')
+
+ return result | 0x80 if nb else 0
+
+ def _decodeShort(self, c) -> int:
+ reverse = c[0] not in self.cr
+ if not reverse:
+ idx = [
+ self.cr.index(c[0]),
+ self.cc.index(c[1]),
+ self.cn.index(c[2]),
+ self.cb.index(c[3])
+ ]
+ else:
+ idx = [
+ self.cr.index(c[2]),
+ self.cc.index(c[3]),
+ self.cn.index(c[0]),
+ self.cb.index(c[1])
+ ]
+
+ if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0:
+ raise ValueError('ERROR! not rcnb')
+
+ result = idx[0] * self.scnb + idx[1] * self.snb + idx[
+ 2] * self.sb + idx[3]
+ if result > 0x7FFF:
+ raise ValueError('ERROR! rcnb overflow')
+
+ result |= 0x8000 if reverse else 0
+ return result
+
+ def _encodeBytes(self, b) -> str:
+ result = []
+ for i in range(0, (len(b) >> 1)):
+ result.append(self._encodeShort((b[i * 2] << 8 | b[i * 2 + 1])))
+
+ if len(b) & 1 == 1:
+ result.append(self._encodeByte(b[-1]))
+
+ return ''.join(result)
+
+ def _encode(self, s: str, encoding: str = 'utf-8'):
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ return self._encodeBytes(s.encode(encoding))
+
+ def _decodeBytes(self, s: str):
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ if len(s) & 1:
+ raise ValueError('ERROR length')
+
+ result = []
+ for i in range(0, (len(s) >> 2)):
+ result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8]))
+ result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF
+ ]))
+
+ if (len(s) & 2) == 2:
+ result.append(bytes([self._decodeByte(s[-2:])]))
+
+ return b''.join(result)
+
+ def _decode(self, s: str, encoding: str = 'utf-8') -> str:
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ try:
+ return self._decodeBytes(s).decode(encoding)
+ except UnicodeDecodeError:
+ raise ValueError('Decoding failed')