summaryrefslogtreecommitdiff
path: root/ATRI/plugins/utils/data_source.py
diff options
context:
space:
mode:
authorKyomotoi <[email protected]>2021-04-05 16:01:22 +0800
committerKyomotoi <[email protected]>2021-04-05 16:01:22 +0800
commit15dfd75ce84235b07b845fdfb42e269002b92c01 (patch)
treeb27a1aa34d3f9f0dfffd579b62ed2b011986577a /ATRI/plugins/utils/data_source.py
parent64a991e035e52e0a17e73d4e671a22ea9a7489da (diff)
downloadATRI-15dfd75ce84235b07b845fdfb42e269002b92c01.tar.gz
ATRI-15dfd75ce84235b07b845fdfb42e269002b92c01.tar.bz2
ATRI-15dfd75ce84235b07b845fdfb42e269002b92c01.zip
✨🐛⚡️ 一些修改
新增:转发信息伪造 新增:关键词回复/添加/删除(待更新) 新增:涩图检测(部署方式待更新) 新增:使用方法 新增:ATRI语加密/解密 新增:注入检测 新增:部分命令频率限制 移除:群垃圾检测 优化:提升了部分代码可读性 优化:对 Service 部分代码进行重构
Diffstat (limited to 'ATRI/plugins/utils/data_source.py')
-rw-r--r--ATRI/plugins/utils/data_source.py139
1 files changed, 139 insertions, 0 deletions
diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py
index f41a1d1..71da581 100644
--- a/ATRI/plugins/utils/data_source.py
+++ b/ATRI/plugins/utils/data_source.py
@@ -1,5 +1,7 @@
import re
import random
+from math import floor
+from typing import Union
def roll_dice(par: str) -> str:
@@ -32,3 +34,140 @@ def roll_dice(par: str) -> str:
result = f"{par}=({proc})={result}"
return result
+
+
+class Encrypt():
+ cr = 'ĀāĂ㥹ÀÁÂÃÄÅ'
+ cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ'
+ cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr'
+ cb = 'ĨĩĪīĬĭĮįİı'
+
+ sr = len(cr)
+ sc = len(cc)
+ sn = len(cn)
+ sb = len(cb)
+ src = sr * sc
+ snb = sn * sb
+ scnb = sc * snb
+
+ def _div(self, a: int, b: int) -> int:
+ return floor(a / b)
+
+ def _encodeByte(self, i) -> Union[str, None]:
+ if i > 0xFF:
+ raise ValueError('ERROR! rc/nb overflow')
+
+ if i > 0x7F:
+ i = i & 0x7F
+ return self.cn[self._div(i, self.sb) + int(self.cb[i % self.sb])]
+
+ return self.cr[self._div(i, self.sc) + int(self.cc[i % self.sc])]
+
+ def _encodeShort(self, i) -> str:
+ if i > 0xFFFF:
+ raise ValueError('ERROR! rcnb overflow')
+
+ reverse = False
+ if i > 0x7FFF:
+ reverse = True
+ i = i & 0x7FFF
+
+ char = [
+ self._div(i, self.scnb),
+ self._div(i % self.scnb, self.snb),
+ self._div(i % self.snb, self.sb), i % self.sb
+ ]
+ char = [
+ self.cr[char[0]], self.cc[char[1]], self.cn[char[2]],
+ self.cb[char[3]]
+ ]
+
+ if reverse:
+ return char[2] + char[3] + char[0] + char[1]
+
+ return ''.join(char)
+
+ def _decodeByte(self, c) -> int:
+ nb = False
+ idx = [self.cr.index(c[0]), self.cc.index(c[1])]
+ if idx[0] < 0 or idx[1] < 0:
+ idx = [self.cn.index(c[0]), self.cb.index(c[1])]
+ nb = True
+ raise ValueError('ERROR! rc/nb overflow')
+
+ result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1]
+ if result > 0x7F:
+ raise ValueError('ERROR! rc/nb overflow')
+
+ return result | 0x80 if nb else 0
+
+ def _decodeShort(self, c) -> int:
+ reverse = c[0] not in self.cr
+ if not reverse:
+ idx = [
+ self.cr.index(c[0]),
+ self.cc.index(c[1]),
+ self.cn.index(c[2]),
+ self.cb.index(c[3])
+ ]
+ else:
+ idx = [
+ self.cr.index(c[2]),
+ self.cc.index(c[3]),
+ self.cn.index(c[0]),
+ self.cb.index(c[1])
+ ]
+
+ if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0:
+ raise ValueError('ERROR! not rcnb')
+
+ result = idx[0] * self.scnb + idx[1] * self.snb + idx[
+ 2] * self.sb + idx[3]
+ if result > 0x7FFF:
+ raise ValueError('ERROR! rcnb overflow')
+
+ result |= 0x8000 if reverse else 0
+ return result
+
+ def _encodeBytes(self, b) -> str:
+ result = []
+ for i in range(0, (len(b) >> 1)):
+ result.append(self._encodeShort((b[i * 2] << 8 | b[i * 2 + 1])))
+
+ if len(b) & 1 == 1:
+ result.append(self._encodeByte(b[-1]))
+
+ return ''.join(result)
+
+ def encode(self, s: str, encoding: str = 'utf-8'):
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ return self._encodeBytes(s.encode(encoding))
+
+ def _decodeBytes(self, s: str):
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ if len(s) & 1:
+ raise ValueError('ERROR length')
+
+ result = []
+ for i in range(0, (len(s) >> 2)):
+ result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8]))
+ result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF
+ ]))
+
+ if (len(s) & 2) == 2:
+ result.append(bytes([self._decodeByte(s[-2:])]))
+
+ return b''.join(result)
+
+ def decode(self, s: str, encoding: str = 'utf-8') -> str:
+ if not isinstance(s, str):
+ raise ValueError('Please enter str instead of other')
+
+ try:
+ return self._decodeBytes(s).decode(encoding)
+ except UnicodeDecodeError:
+ raise ValueError('Decoding failed')