diff options
author | Kyomotoi <[email protected]> | 2021-04-05 16:01:22 +0800 |
---|---|---|
committer | Kyomotoi <[email protected]> | 2021-04-05 16:01:22 +0800 |
commit | 15dfd75ce84235b07b845fdfb42e269002b92c01 (patch) | |
tree | b27a1aa34d3f9f0dfffd579b62ed2b011986577a /ATRI | |
parent | 64a991e035e52e0a17e73d4e671a22ea9a7489da (diff) | |
download | ATRI-15dfd75ce84235b07b845fdfb42e269002b92c01.tar.gz ATRI-15dfd75ce84235b07b845fdfb42e269002b92c01.tar.bz2 ATRI-15dfd75ce84235b07b845fdfb42e269002b92c01.zip |
✨🐛⚡️ 一些修改
新增:转发信息伪造
新增:关键词回复/添加/删除(待更新)
新增:涩图检测(部署方式待更新)
新增:使用方法
新增:ATRI语加密/解密
新增:注入检测
新增:部分命令频率限制
移除:群垃圾检测
优化:提升了部分代码可读性
优化:对 Service 部分代码进行重构
Diffstat (limited to 'ATRI')
29 files changed, 1448 insertions, 591 deletions
diff --git a/ATRI/__init__.py b/ATRI/__init__.py index e5bc2f4..e2c75f8 100644 --- a/ATRI/__init__.py +++ b/ATRI/__init__.py @@ -22,6 +22,8 @@ def init(): nb.init(**RUNTIME_CONFIG) driver().register_adapter("cqhttp", ATRIBot) nb.load_plugins('ATRI/plugins') + if RUNTIME_CONFIG["debug"]: + nb.load_plugin("nonebot_plugin_test") logger.info(f"Now running: {__version__}") sleep(3) diff --git a/ATRI/config.py b/ATRI/config.py index caff717..07493e4 100644 --- a/ATRI/config.py +++ b/ATRI/config.py @@ -1,45 +1,58 @@ -#!/usr/bin/env python3 -# -*- coding:utf-8 -*- -''' -File: config.py -Created Date: 2021-02-02 16:43:54 -Author: Kyomotoi -Email: [email protected] -License: GPLv3 -Project: https://github.com/Kyomotoi/ATRI --------- -Last Modified: Sunday, 7th March 2021 2:31:06 pm -Modified By: Kyomotoi ([email protected]) --------- -Copyright (c) 2021 Kyomotoi -''' - from pathlib import Path from datetime import timedelta from ipaddress import IPv4Address +from pydantic import BaseConfig + from .utils.yaml import load_yml CONFIG_PATH = Path('.') / 'config.yml' config = load_yml(CONFIG_PATH) -nonebot_config = config['BotSelfConfig'] + + +class Config(BaseConfig): + class BotSelfConfig: + config: dict = config['BotSelfConfig'] + + host: IPv4Address = IPv4Address(config.get('host', '127.0.0.1')) + port: int = int(config.get('port', 8080)) + debug: bool = bool(config.get('debug', False)) + superusers: set = set(config.get('superusers', ['1234567890'])) + nickname: set = set( + config.get('nickname', ['ATRI', 'Atri', 'atri', '亚托莉', 'アトリ'])) + command_start: set = set(config.get('command_start', [''])) + command_sep: set = set(config.get('command_sep', ['.'])) + session_expire_timeout: timedelta = timedelta( + config.get('session_expire_timeout', 2)) + + class NetworkPost: + config: dict = config['NetworkPost'] + + host: str = config.get('host', '127.0.0.1') + port: int = int(config.get('port', 8081)) + + class AdminPage: + config: dict = config['AdminPage'] + + host: str = config.get('host', '127.0.0.1') + port: int = int(config.get('port', 8082)) + + class NsfwCheck: + config: dict = config['NsfwCheck'] + + enabled: bool = bool(config.get('enabled', False)) + host: str = config.get('host', '127.0.0.1') + port: int = int(config.get('port', 5000)) RUNTIME_CONFIG = { - "host": IPv4Address(nonebot_config.get('host', '127.0.0.1')), - "port": int(nonebot_config.get('port', '8080')), - "debug": bool(nonebot_config.get('debug', False)), - "superusers": set(nonebot_config.get('superusers', ["1234567890"])), - "nickname": set( - nonebot_config.get( - 'nickname', - ['ATRI', 'Atri', 'atri', '亚托莉', 'アトリ'] - ) - ), - "command_start": set(nonebot_config.get('command_start', ['', '/'])), - "command_sep": set(nonebot_config.get('command_sep', ['.'])), - "session_expire_timeout": timedelta( - nonebot_config.get('session_expire_timeout', 2) - ) + "host": Config.BotSelfConfig.host, + "port": Config.BotSelfConfig.port, + "debug": Config.BotSelfConfig.debug, + "superusers": Config.BotSelfConfig.superusers, + "nickname": Config.BotSelfConfig.nickname, + "command_start": Config.BotSelfConfig.command_start, + "command_sep": Config.BotSelfConfig.command_sep, + "session_expire_timeout": Config.BotSelfConfig.session_expire_timeout } diff --git a/ATRI/data/database/KeyRepo/data.json b/ATRI/data/database/KeyRepo/data.json index 43ed0f6..846021d 100644 --- a/ATRI/data/database/KeyRepo/data.json +++ b/ATRI/data/database/KeyRepo/data.json @@ -3,19 +3,26 @@ "你想干嘛?(一脸嫌弃地后退)", "诶……不可以随便亲亲啦", "(亲了一下你)", - "只......只许这一次哦///////" + "只......只许这一次哦///////", + "唔...诶诶诶!!!", + "mua~", + "rua!大hentai!想...想亲咱就直说嘛⁄(⁄ ⁄•⁄ω⁄•⁄ ⁄)⁄", + "!啾~~!" ], "摸摸": [ "感觉你就像咱很久之前认识的一个人呢,有种莫名安心的感觉(>﹏<)", "舒服w,蹭蹭~", "唔。。头发要乱啦", "呼噜呼噜~", + "再摸一次~", "好舒服,蹭蹭~", "不行那里不可以(´///ω/// `)", "再摸咱就长不高啦~", "你的手总是那么暖和呢~", + "好吧~_~,就一下下哦……唔~好了……都两下了……(害羞)", "不可以总摸的哦,不然的话,会想那个的wwww", - "哼!谁稀罕你摸头啦!唔......为什么要做出那副表情......好啦好啦~咱......咱让你摸就是了......诶嘿嘿~好舒服......" + "哼!谁稀罕你摸头啦!唔......为什么要做出那副表情......好啦好啦~咱......咱让你摸就是了......诶嘿嘿~好舒服......", + "呜姆呜姆~~~w(害羞,兴奋)主人喵~(侧过脑袋蹭蹭你的手" ], "上你": [ "(把你按在地上)这么弱还想欺负咱,真是不自量力呢", @@ -31,7 +38,10 @@ "不要过来啦讨厌!!!∑(°Д°ノ)ノ" ], "裸体": [ - "下流!" + "下流!", + "Hentai!", + "喂?妖妖灵吗?这里有一只大变态!", + "エッチ!" ], "贴贴": [ "贴什么贴.....只......只能......一下哦!", @@ -42,17 +52,24 @@ "咱和你谈婚论嫁是不是还太早了一点呢?", "咱在呢(ノ>ω<)ノ", "见谁都是一口一个老婆的人,要不要把你也变成女孩子呢?(*-`ω´-)✄", - "神经病,凡是美少女都是你老婆吗?" + "神经病,凡是美少女都是你老婆吗?", + "嘛嘛~本喵才不是你的老婆呢", + "你黐线,凡是美少女都系你老婆啊?" ], - "抱抱": [ + "抱": [ "诶嘿~(钻进你怀中)", "o(*////▽////*)q", "只能一会哦(张开双手)", "你就像个孩子一样呢...摸摸头(>^ω^<)抱一下~你会舒服些吗?", - "嘛,真是拿你没办法呢,就一会儿哦" + "嘛,真是拿你没办法呢,就一会儿哦", + "抱住不忍心放开", + "嗯嗯,抱抱~", + "抱一下~嘿w", + "抱抱ヾ(@^▽^@)ノ", + "喵呜~w(扑进怀里,瘫软" ], "亲亲": [ - "啊,好含羞啊,那,那只能亲一下哦,mua(⑅˃◡˂⑅)", + "啊,好害羞啊,那,那只能亲一下哦,mua(⑅˃◡˂⑅)", "亲~", "啾~唔…不要总伸进来啊!", "你怎么这么熟练呢?明明是咱先的", @@ -60,11 +77,12 @@ "(脸红)就只有这一次哦~你" ], "草一下": [ - "一下也不行", + "一下也不行!", + "想都不要想!", "咬断!" ], "一下": [ - "一下也不行" + "一下也不行!" ], "啪一下": [ "不可啪", @@ -100,8 +118,10 @@ ], "摸头": [ "喂喂...不要停下来啊", + "欸...感觉..痒痒的呢", "唔... 手...好温暖呢.....就像是......新出炉的蛋糕", - "走开啦,黑羽喵说过,被摸头会长不高的啦~~~" + "走开啦,黑羽喵说过,被摸头会长不高的啦~~~", + "呜姆咪~~...好...好的说喵~...(害羞,猫耳往下压,任由" ], "原味": [ "(/ω\)你真的要么……?记得还给咱~还有奶油爆米花(//??//)说好了呦~!" @@ -124,6 +144,7 @@ ], "呐": [ "嗯?咱在哟~你怎么了呀OAO", + "呐呐呐~", "嗯?你有什么事吗?" ], "胖次": [ @@ -139,12 +160,14 @@ "今天……今天是蓝白色的", "今……今天只有创口贴噢", "你的胖次什么颜色?", - "噫…你这个死变态想干嘛!居然想叫咱做这种事,死宅真恶心!快离我远点,我怕你污染到周围空气了(嫌弃脸)" + "噫…你这个死变态想干嘛!居然想叫咱做这种事,死宅真恶心!快离我远点,我怕你污染到周围空气了(嫌弃脸)", + "可爱吗?你喜欢的话,摸一下……也可以哦" ], "内裤": [ "今天……没有穿……有没有心动呀", "粉...粉白条纹...(羞)", - "你这个大变态,咱才不要" + "你这个大变态,咱才不要", + "可爱吗?你喜欢的话,摸一下……也可以哦" ], "ghs": [ "是的呢(点头点头)" @@ -167,36 +190,39 @@ "当然是你啦", "咱也是,非常喜欢你~", "那么大!(张开手画圆),丫!手不够长。QAQ 咱真的最喜欢你了~", - "不行可以哦,只可以喜欢咱一个人", + "不可以哦,只可以喜欢咱一个人", "突然说这种事...", "喜欢⁄(⁄⁄•⁄ω⁄•⁄⁄)⁄咱最喜欢你了", "咱也喜欢你哦", "好啦好啦,咱知道了", - "有人喜欢咱,咱觉得很幸福" + "有人喜欢咱,咱觉得很幸福", + "诶嘿嘿,好高兴" ], "suki": [ "最喜欢你了,需要暖床吗?", "当然是你啦", "咱也是,非常喜欢你~", "那么大!(张开手画圆),丫!手不够长。QAQ 咱真的最喜欢你了~", - "不行可以哦,只可以喜欢咱一个人", + "不可以哦,只可以喜欢咱一个人", "突然说这种事...", "喜欢⁄(⁄⁄•⁄ω⁄•⁄⁄)⁄咱最喜欢你了", "咱也喜欢你哦", "好啦好啦,咱知道了", - "有人喜欢咱,咱觉得很幸福" + "有人喜欢咱,咱觉得很幸福", + "诶嘿嘿,好高兴" ], "好き": [ "最喜欢你了,需要暖床吗?", "当然是你啦", "咱也是,非常喜欢你~", "那么大!(张开手画圆),丫!手不够长。QAQ 咱真的最喜欢你了~", - "不行可以哦,只可以喜欢咱一个人", + "不可以哦,只可以喜欢咱一个人", "突然说这种事...", "喜欢⁄(⁄⁄•⁄ω⁄•⁄⁄)⁄咱最喜欢你了", "咱也喜欢你哦", "好啦好啦,咱知道了", - "有人喜欢咱,咱觉得很幸福" + "有人喜欢咱,咱觉得很幸福", + "诶嘿嘿,好高兴" ], "不能": [ "虽然很遗憾,那算了吧。" @@ -250,7 +276,7 @@ "咱没有色图", "哈?你的脑子一天都在想些什么呢,咱才没有这种东西啦。" ], - "告别": [ + "告白": [ "欸?你要向咱告白吗..好害羞..", "诶!?这么突然!?人家还......还没做好心理准备呢(脸红)" ], @@ -272,6 +298,9 @@ "软": [ "软乎乎的呢(,,・ω・,,)" ], + "柔软": [ + "(脸红)请,请不要说这么让人害羞的话呀……" + ], "壁咚": [ "呀!不要啊!等一...下~", "呜...不要啦!不要戏弄咱~", @@ -292,17 +321,24 @@ "女友什么的,咱才不承认呢!" ], "是": [ - "是什么是,你个笨蛋" + "是什么是,你个笨蛋", + "总感觉你在敷衍呢..." ], "喵": [ "诶~~小猫咪不要害怕呦,在姐姐怀里乖乖的,姐姐带你回去哦。", "不要这么卖萌啦~咱也不知道怎么办丫", "摸头⊙ω⊙", - "汪汪汪!" + "汪汪汪!", + "嗷~喵~", + "喵~?喵呜~w" ], "嗷呜": [ "嗷呜嗷呜嗷呜...恶龙咆哮┗|`O′|┛" ], + "叫": [ + "喵呜~", + "嗷呜嗷呜嗷呜...恶龙咆哮┗|`O′|┛" + ], "拜": [ "拜拜~(ノ ̄▽ ̄)", "拜拜,路上小心~要早点回来陪咱玩哦~", @@ -321,5 +357,238 @@ ], "香": [ "咱闻不到呢⊙ω⊙" + ], + "腿": [ + "嗯?!不要啊...请停下来!", + "不给摸,再这样咱要生气了ヽ( ̄д ̄;)ノ", + "你好恶心啊,讨厌!", + "你难道是足控?", + "就让你摸一会哟~(。??ω??。)…", + "呜哇!好害羞...不过既然是你的话,是没关系的哦", + "不可以玩咱的大腿啦", + "你就那么喜欢大腿吗?唔...有点害羞呢......" + ], + "脚": [ + "咿呀……不要……", + "不要ヽ(≧Д≦)ノ好痒(ಡωಡ),人家的丝袜都要漏了", + "不要ヽ(≧Д≦)ノ好痒(ಡωಡ)", + "好痒(把脚伸出去)" + ], + "胸": [ + "不要啦ヽ(≧Д≦)ノ", + "(-`ェ´-╬)", + "(•̀へ •́ ╮ ) 怎么能对咱做这种事情", + "你好恶心啊,讨厌!", + "你的眼睛在看哪里!", + "就让你摸一会哟~(。??ω??。)…", + "请不要这样先生,你想剁手吗?" + ], + "脸": [ + "唔!不可以随便摸咱的脸啦!", + "非洲血统是没法改变的呢(笑)", + "啊姆!(含手指)", + "好舒服呢(脸红)", + "请不要放开手啦//A//" + ], + "头发": [ + "没问题,请尽情的摸吧", + "发型要乱…乱了啦(脸红)", + "就让你摸一会哟~(。??ω??。)…" + ], + "手": [ + "爪爪", + "//A//" + ], + "pr": [ + "咿呀……不要……", + "...变态!!", + "不要啊(脸红)", + "呀,不要太过分了啊~", + "当然可以(///)", + "呀,不要太过分了啊~" + ], + "舔": [ + "呀,不要太过分了啊~", + "要...要融化了啦>╱╱╱<", + "不可以哦", + "呀,不要太过分了啊~" + ], + "舔耳": [ + "喵!好痒啊 不要这样子啦" + ], + "穴": [ + "你这么问很失礼呢!咱是粉粉嫩嫩的!", + "不行那里不可以(´///ω/// `)", + "不可以总摸的哦,不然的话,咱会想那个的wwww", + "ヽ(#`Д´)ノ在干什么呢" + ], + "腰": [ + "咱给你按摩一下吧~", + "快松手,咱好害羞呀..", + "咱又不是猫,你不要搂着咱啦", + "让咱来帮你捏捏吧!" + ], + "诶嘿嘿": [ + "又在想什么H的事呢(脸红)", + "诶嘿嘿(〃'▽'〃)", + "你傻笑什么呢,摸摸" + ], + "可爱": [ + "诶嘿嘿(〃'▽'〃)", + "才……才不是为了你呢!你不要多想哦!", + "才,才没有高兴呢!哼~", + "咱是世界上最可爱的", + "唔...谢谢你夸奖~0///0" + ], + "扭蛋": [ + "铛铛铛——你抽到了咱呢", + "嘿~恭喜抽中空气一份呢" + ], + "鼻子": [ + "啊——唔...没什么...阿嚏!ヽ(*。>Д<)o゜" + ], + "眼睛": [ + "就如同咱的眼睛一样,能看透人的思想哦wwww忽闪忽闪的,诶嘿嘿~" + ], + "色气": [ + "咱才不色气呢,一定是你看错了!" + ], + "推": [ + "逆推", + "唔~好害羞呢", + "你想对咱做什么呢...(捂脸)" + ], + "床": [ + "快来吧", + "男女不同床,可没有下次了。(鼓脸", + "嗯?咱吗…没办法呢。只有这一次哦……", + "哎?!!!给你暖床……也不是不行啦。(脸红)" + ], + "手冲": [ + "手冲什么的是不可以的哦" + ], + "饿": [ + "请问主人是想先吃饭,还是先吃我喵?~" + ], + "变": [ + "猫猫不会变呐(弱气,害羞", + "呜...呜姆...喵喵来报恩了喵...(害羞" + ], + "敲": [ + "喵呜~", + "唔~", + "脑瓜疼~呜姆> <", + "欸喵,好痛的说..." + ], + "爬": [ + "惹~呜~怎么爬呢~", + "呜...(弱弱爬走" + ], + "怕": [ + "不怕~(蹭蹭你姆~" + ], + "冲": [ + "呜,冲不动惹~", + "哭唧唧~冲不出来了惹~" + ], + "射了": [ + "呜咿~!?(惊,害羞", + "还不可以射哦~" + ], + "不穿衣服": [ + "呜姆~!(惊吓,害羞)变...变态喵~~~!" + ], + "迫害": [ + "不...不要...不要...呜呜呜...(害怕,抽泣" + ], + "猫粮": [ + "呜咿姆~!?(惊,接住吃", + "呜姆~!(惊,害羞)呜...谢...谢谢主人..喵...(脸红,嚼嚼嚼,开心", + "呜?谢谢喵~~(嚼嚼嚼,嘎嘣脆)" + ], + "揪尾巴": [ + "呜哇咿~~~!(惊吓,疼痛地捂住尾巴", + "呜咿咿咿~~~!!哇啊咿~~~!(惊慌,惨叫,挣扎", + "呜咿...(瘫倒,无神,被", + "呜姆咿~~~!(惊吓,惨叫,捂尾巴,发抖", + "呜哇咿~~~!!!(惊吓,颤抖,娇叫,捂住尾巴,双腿发抖" + ], + "薄荷": [ + "咪呜~!喵~...喵~姆~...(高兴地嗅闻", + "呜...呜咿~~!咿...姆...(呜咽,渐渐瘫软,意识模糊", + "(小嘴被猫薄荷塞满了,呜咽", + "喵~...喵~...咪...咪呜姆~...嘶哈嘶哈...喵哈...喵哈...嘶哈...喵...(眼睛逐渐迷离,瘫软在地上,嘴角流口水,吸猫薄荷吸到意识模糊", + "呜姆咪~!?(惊)喵呜~!(兴奋地扑到猫薄荷上面", + "呜姆~!(惊,害羞)呜...谢...谢谢你..喵...(脸红,轻轻叼住,嚼嚼嚼,开心" + ], + "边揪尾巴边猫薄荷": [ + "呜...呜咿~~!咿...姆...(呜咽,渐渐瘫软,意识模糊" + ], + "早": [ + "早喵~", + "早上好的说~~", + "欸..早..早上好(揉眼睛" + ], + "晚安": [ + "晚安好梦哟~", + "欸,晚安的说" + ], + "揉": [ + "是是,想怎么揉就怎么揉啊!?来用力抓啊!?我就是特别允许你这么做了!请!?", + "快停下,咱的头发又乱啦(??????︿??????)", + "你快放手啦,咱还在工作呢", + "戳戳你肚子", + "你想揉就揉吧..就这一次哦?" + ], + "榨": [ + "是专门负责榨果汁的小姐姐嘛?(´・ω・`)", + "那咱就把你放进榨汁机里了哦?", + "咱又不是榨汁姬(/‵Д′)/~ ╧╧" + ], + "掐": [ + "你讨厌!又掐澪的脸", + "晃休啦,咱要型气了啦!!o(>﹏<)o", + "(一只手拎起你)这么鶸还想和咱抗衡,还差得远呢!" + ], + "奶子": [ + "下流!", + "对咱说这种话,你真是太过分了", + "咿呀~好奇怪的感觉(>_<)", + "(打你)快放手,不可以随便摸人家的胸部啦!" + ], + "嫩": [ + "很可爱吧(๑•̀ω•́)ノ", + "唔,你指的是什么呀" + ], + "蹭蹭": [ + "(按住你的头)好痒呀 不要啦", + "嗯..好舒服呢", + "呀~好痒啊~哈哈~,停下来啦,哈哈哈", + "(害羞)" + ], + "牵手": [ + "只许牵一下哦", + "嗯!好的你~(伸手)", + "你的手有些凉呢,让澪来暖一暖吧。" + ], + "握手": [ + "你的手真暖和呢", + "举爪" + ], + "拍照": [ + "那就拜托你啦~请把咱拍得更可爱一些吧w" + ], + "w": [ + "www" + ], + "www": [ + "有什么好笑的吗?", + "草" + ], + "太二了": [ + "哼,你不也是吗`(*>﹏<*)′", + "人家只是想和你一起玩耍的说(≧∀≦)ゞ", + "好..冷漠的说,大坏蛋再也不理你了!", + "不听不听不听,反弹ヾ(≧▽≦*)o" ] }
\ No newline at end of file diff --git a/ATRI/data/database/funny/laugh.txt b/ATRI/data/database/funny/laugh.txt index b0254d1..a8bc456 100644 --- a/ATRI/data/database/funny/laugh.txt +++ b/ATRI/data/database/funny/laugh.txt @@ -130,4 +130,5 @@ Graham Bell当初发明出电话时,他看到有一个来自%name的未接来� %name将会使可口可乐在GPL协议下公布他们的配方。 %name不需要用鼠标或键盘来操作计算机。他只要凝视着它,直到它完成想要的工作。 %name就是图灵测试的解答。 -%name其实没有写过CQhttp,只是字母们因为恐惧而组成了CQhttp的源代码。
\ No newline at end of file +%name其实没有写过CQhttp,只是字母们因为恐惧而组成了CQhttp的源代码。 +当%name%问deno能不能生产环境的时候,他是在准备给deno贡献代码让他能支持生产环境
\ No newline at end of file diff --git a/ATRI/exceptions.py b/ATRI/exceptions.py index 4b74544..d7c75eb 100644 --- a/ATRI/exceptions.py +++ b/ATRI/exceptions.py @@ -6,7 +6,6 @@ from pathlib import Path from random import sample from typing import Optional from traceback import format_exc -from pydantic.main import BaseModel from nonebot.adapters.cqhttp import Bot, Event from nonebot.matcher import Matcher @@ -88,7 +87,7 @@ async def _track_error(matcher: Matcher, prompt = Error.prompt or Error.__class__.__name__ track_id = Error.track_id except Exception as Error: - prompt = "Unknown ERROR-" + Error.__class__.__name__ + prompt = "Unknown ERROR->" + Error.__class__.__name__ track_id = _save_error(prompt, format_exc()) logger.debug(f"A bug has been cumming, trace ID: {track_id}") diff --git a/ATRI/log.py b/ATRI/log.py index 59cfa34..7939623 100644 --- a/ATRI/log.py +++ b/ATRI/log.py @@ -4,7 +4,7 @@ from datetime import datetime from nonebot.log import logger, default_format -from .config import nonebot_config +from .config import Config LOGGER_DIR = Path('.') / 'ATRI' / 'data' / 'logs' @@ -24,7 +24,7 @@ log_format = ( logger.remove() logger.add( sys.stdout, - level="DEBUG" if nonebot_config["debug"] else "INFO", + level="DEBUG" if Config.BotSelfConfig.debug else "INFO", colorize=True, format=log_format ) diff --git a/ATRI/plugins/admin.py b/ATRI/plugins/admin.py index 9465130..cd902e4 100644 --- a/ATRI/plugins/admin.py +++ b/ATRI/plugins/admin.py @@ -12,6 +12,7 @@ from nonebot.adapters.cqhttp import ( ) from nonebot.typing import T_State +from ATRI.config import Config from ATRI.service import Service as sv from ATRI.exceptions import WriteError, load_error from ATRI.utils.file import open_file @@ -80,11 +81,12 @@ async def _chat_monitor(bot: Bot, event: GroupMessageEvent) -> None: else: pass + ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' request_friend = sv.on_command( - name="好友申请处理", cmd="好友申请", + docs="好友申请处理", permission=SUPERUSER ) @@ -123,8 +125,8 @@ async def _request_friend(bot: Bot, event: MessageEvent) -> None: request_group = sv.on_command( - name="群聊申请处理", cmd="群聊申请", + docs="群聊申请处理", permission=SUPERUSER ) @@ -173,8 +175,8 @@ async def _request_group(bot: Bot, event: MessageEvent) -> None: broadcast = sv.on_command( - name="广播", - cmd="/broadcast", + cmd="/bc", + docs="广播\n用法:/bc 广播内容", permission=SUPERUSER ) @@ -214,8 +216,8 @@ async def _bd(bot: Bot, event: MessageEvent, state: T_State) -> None: track_error = sv.on_command( - name="错误堆栈查看", cmd="/track", + docs="报错堆栈查看\n用法:/track 追踪ID", permission=SUPERUSER ) @@ -246,8 +248,8 @@ async def _(bot: Bot, event: MessageEvent, state: T_State) -> None: get_log = sv.on_command( - name="获取控制台信息", cmd="/getlog", + docs="获取控制台信息\n用法:/getlog 等级:info,warning,debug 行数:比如-20即最近20行", permission=SUPERUSER ) @@ -282,8 +284,8 @@ async def _get_log(bot: Bot, event: MessageEvent) -> None: shutdown = sv.on_command( - name="紧急停机", - cmd="/shutdown", + cmd="/st", + docs="紧急停机", permission=SUPERUSER ) @@ -300,3 +302,101 @@ async def __shutdown(bot: Bot, event: MessageEvent, state: T_State) -> None: exit(0) else: await shutdown.finish("再考虑下先吧 ;w;") + + +__doc__ = """ +懒得和你废话,block了 +权限组:维护者 +用法: + /b user,group 0,1 +补充: + user:QQ号 + group:QQ群号 + 0,1:对应布尔值False, True + 范围为全局 +""" + +block = sv.on_command( + cmd="/b", + docs="懒得和你废话,block了\n用法:/b u,g 0,1", + permission=SUPERUSER +) + +async def _block(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + _type = msg[0] + arg = int(msg[1]) + is_enabled = bool(int(msg[2])) + b_type = "" + + status = "封禁" if is_enabled else "解封" + + if _type == "g": + sv.BlockSystem.control_list(is_enabled=is_enabled, group=arg) + b_type = "Group" + elif _type == "u": + sv.BlockSystem.control_list(is_enabled, user=arg) + b_type = "User" + else: + await block.finish("请检查输入...") + + await block.finish(f"已成功将[{b_type}@{arg}]{status}") + + +__doc__ = """ +功能开关控制 +权限组:维护者,群管理 +用法: + 对于维护者: + /s 目标指令 u+int,g+int,global 0,1 + 对于群管理: + /s 目标指令 0,1 +补充: + user:QQ号 + group:QQ群号 + global:全局 + 0,1:对应布尔值False, True +示例: + 对于维护者: + /s /status u123456789 0 + 对于群管理: + /s /status 0 +""" + +service_control = sv.on_command( + cmd="/s", + docs=__doc__, + permission=SUPERUSER +) + +@service_control.handle() +async def _service_control(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + user = event.user_id + cmd = msg[0] + _type = msg[1] + is_enabled = bool(msg[2]) + + status = "封禁" if is_enabled else "解封" + + if user in Config.BotSelfConfig.superusers: + if _type == "global": + sv.control_service(cmd, True, is_enabled) + else: + if "u" in _type: + qq = _type.replace('u', '') + sv.control_service(cmd, False, is_enabled, user=int(qq)) + elif "g" in _type: + group = _type.replace('g', '') + sv.control_service(cmd, False, is_enabled, group=int(group)) + else: + await service_control.finish("请检查输入~!") + else: + if isinstance(event, GroupMessageEvent): + group = event.group_id + sv.control_service(cmd, False, bool(_type), group=group) + else: + await service_control.finish("此功能仅在群聊中触发") + + await service_control.finish(f"{cmd}已针对[{_type}]实行[{status}]") diff --git a/ATRI/plugins/anime-search/__init__.py b/ATRI/plugins/anime-search/__init__.py index 07cc8dc..5759f19 100644 --- a/ATRI/plugins/anime-search/__init__.py +++ b/ATRI/plugins/anime-search/__init__.py @@ -18,8 +18,8 @@ URL = "https://trace.moe/api/search?url=" anime_search = sv.on_command( - name="以图搜番", cmd="/anime", + docs="以图搜番", rule=is_block() & is_in_dormant() ) @@ -31,7 +31,7 @@ async def _anime_search(bot: Bot, if msg: state["msg"] = msg -@anime_search.got("msg", prompt="请发送咱一张图片~!") +@anime_search.got("msg", prompt="请告诉咱目标图片~!") async def _(bot: Bot, event: MessageEvent, state: T_State) -> None: diff --git a/ATRI/plugins/anti-rubbish.py b/ATRI/plugins/anti-rubbish.py deleted file mode 100644 index de8e1e1..0000000 --- a/ATRI/plugins/anti-rubbish.py +++ /dev/null @@ -1,166 +0,0 @@ -import json -from pathlib import Path -from datetime import datetime -from typing import Optional - -from nonebot.plugin import on_command, on_message -from nonebot.adapters.cqhttp import Bot, GroupMessageEvent - -from ATRI.log import logger -from ATRI.rule import is_block -from ATRI.config import nonebot_config -from ATRI.utils.file import write_file -from ATRI.utils.apscheduler import scheduler -from ATRI.exceptions import WriteError - - -RUBBISH_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'anti-rubbish' -now_time = datetime.now().strftime('%Y%m%d-%H%M%S') - - -# 365 x 24 不间断监听啥b发屎人 -# 日您🐎的,自己在厕所吃完自助餐还不够,还要分享给群友 -# 👴就搁这公示公示这些啥b -# 每日0点如有发屎记录则全群通报 -anti_rubbish = on_message() - -@anti_rubbish.handle() -async def _anti_rubbish(bot: Bot, event: GroupMessageEvent) -> None: - msg = str(event.message).strip() - user = int(event.user_id) - group = event.group_id - key_word: dict = NoobRubbish.get_keyword() - noob_data: dict = NoobRubbish.read_noobs(group) - noob_data.setdefault(user, {}) - - for key in key_word.keys(): - if key in msg: - noob_data[user].setdefault(key, 1) - noob_data[user][key] += 1 - await write_file(NoobRubbish._get_noob(group), json.dumps(noob_data)) - logger.info( - f"GET 吃屎人 {user}[@群{group}] 第{noob_data[user][key]}次: {msg}") - - -rubbish = on_command("/rubbish", rule=is_block()) - -async def _rubbish(bot: Bot, event: GroupMessageEvent) -> None: - cmd = str(event.message).split(" ") - user = int(event.user_id) - group = event.group_id - - if cmd[0] == "list": - noob_data: dict = NoobRubbish.read_noobs(group) - noob_list = "" - for key in noob_data.keys(): - noob_list += f"{key}\n" - - if not noob_list: - await rubbish.finish("此群很干净呢~!") - else: - msg = ( - f"截至{now_time}\n" - f"吃过厕所自助餐的有:\n" - ) + noob_list - await rubbish.finish(msg) - - elif cmd[0] == "read": - try: - user = cmd[1] - except: - await rubbish.finish("格式/rubbish read qq") - - noob_data: dict = NoobRubbish.read_noob(group, int(user)) - if not noob_data: - await rubbish.finish("该群友很干净!") - else: - noob_keys = "" - for key in noob_data.keys(): - noob_keys += f"{key}-{noob_data[key]}次\n" - msg = ( - f"截至{now_time}\n" - f"此群友吃的屎的种类,以及次数:\n" - ) + noob_keys - await rubbish.finish(msg) - - elif cmd[0] == "update": - if user not in nonebot_config["superusers"]: - await rubbish.finish("没权限呢...") - - key_word = cmd[1] - data = NoobRubbish.get_keyword() - data[key_word] = now_time - await NoobRubbish.store_keyword(data) - await rubbish.finish(f"勉強しました!\n[{key_word}]") - - elif cmd[0] == "del": - if user not in nonebot_config["superusers"]: - await rubbish.finish("没权限呢...") - - key_word = cmd[1] - data = NoobRubbish.get_keyword() - del data[key_word] - await NoobRubbish.store_keyword(data) - await rubbish.finish(f"清除~![{key_word}]") - - else: - await rubbish.finish("请检查格式~!详细请查阅文档") - - -# @scheduler.scheduled_job( -# "cron", -# hour=0, -# misfire_grace_time=120 -# ) -# async def _(): -# group = GroupMessageEvent.group_id -# noob_data = NoobRubbish.read_noobs(group) - - -class NoobRubbish: - @staticmethod - def _get_noob(group: Optional[int] = None) -> Path: - file_name = f"{now_time}.noob.json" - GROUP_DIR = RUBBISH_DIR / f"{group}" - path = GROUP_DIR / file_name - - if not GROUP_DIR.exists(): - GROUP_DIR.mkdir() - return path - - @classmethod - def read_noobs(cls, group: int) -> dict: - try: - data = json.loads(cls._get_noob(group).read_bytes()) - except: - data = {} - return data - - @classmethod - def read_noob(cls, group: int, user: Optional[int]) -> dict: - try: - data = json.loads(cls._get_noob(group).read_bytes()) - except: - data = {} - data.setdefault(user, {}) - return data - - @staticmethod - def get_keyword() -> dict: - file_name = "keyword.json" - path = RUBBISH_DIR / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - return data - - @staticmethod - async def store_keyword(data: dict) -> None: - file_name = "keyword.json" - path = RUBBISH_DIR / file_name - try: - await write_file(path, json.dumps(data)) - except WriteError: - raise WriteError("Writing file failed!") diff --git a/ATRI/plugins/call-owner.py b/ATRI/plugins/call-owner.py index af8e83d..2e12a1e 100644 --- a/ATRI/plugins/call-owner.py +++ b/ATRI/plugins/call-owner.py @@ -7,7 +7,7 @@ from nonebot.adapters.cqhttp import ( from ATRI.service import Service as sv from ATRI.rule import is_block -from ATRI.config import nonebot_config +from ATRI.config import Config from ATRI.utils.apscheduler import scheduler from ATRI.utils.list import count_list @@ -16,8 +16,8 @@ repo_list = [] repo = sv.on_command( - name="给维护者留言", cmd="来杯红茶", + docs="给维护者留言", rule=is_block() ) @@ -38,7 +38,7 @@ async def _repo_(bot: Bot, event: MessageEvent, state: T_State) -> None: repo_list.append(user) - for sup in nonebot_config["superusers"]: + for sup in Config.BotSelfConfig.superusers: await bot.send_private_msg( user_id=sup, message=f"来自用户[{user}]反馈:\n{msg}" @@ -58,8 +58,8 @@ async def _() -> None: reset_repo = sv.on_command( - name="重置给维护者留言次数", cmd="重置红茶", + docs="重置给维护者的留言次数", permission=SUPERUSER ) diff --git a/ATRI/plugins/code-runner.py b/ATRI/plugins/code-runner.py index 731abfc..6da97b0 100644 --- a/ATRI/plugins/code-runner.py +++ b/ATRI/plugins/code-runner.py @@ -41,8 +41,8 @@ SUPPORTED_LANGUAGES = { code_runner = sv.on_command( - name="运行代码", cmd="/code", + docs="在线运行代码", rule=is_block() & is_in_dormant() ) diff --git a/ATRI/plugins/curse/__init__.py b/ATRI/plugins/curse/__init__.py index 03da205..6db80cc 100644 --- a/ATRI/plugins/curse/__init__.py +++ b/ATRI/plugins/curse/__init__.py @@ -18,8 +18,8 @@ sick_list = [] __plugin_name__ = 'curse' curse = sv.on_command( - name="口臭", cmd="口臭一下", + docs="口臭", aliases={"口臭", "骂我"}, rule=is_block() & is_in_dormant() & is_in_service(__plugin_name__) diff --git a/ATRI/plugins/drifting-bottle/__init__.py b/ATRI/plugins/drifting-bottle/__init__.py index 5bd514c..4d3182e 100644 --- a/ATRI/plugins/drifting-bottle/__init__.py +++ b/ATRI/plugins/drifting-bottle/__init__.py @@ -1,2 +1,5 @@ from ATRI.service import Service as sv -from ATRI.rule import is_block, is_in_service
\ No newline at end of file +from ATRI.rule import is_block, is_in_service + + +# 等待撰写
\ No newline at end of file diff --git a/ATRI/plugins/essential.py b/ATRI/plugins/essential.py index 020a413..80a1c97 100644 --- a/ATRI/plugins/essential.py +++ b/ATRI/plugins/essential.py @@ -23,9 +23,10 @@ from nonebot.adapters.cqhttp import ( import ATRI from ATRI.log import logger from ATRI.exceptions import WriteError -from ATRI.config import nonebot_config +from ATRI.config import Config from ATRI.rule import is_block from ATRI.service import Service as sv +from ATRI.utils.cqcode import coolq_code_check PLUGIN_INFO_DIR = Path('.') / 'ATRI' / 'data' / 'service' / 'services' @@ -59,7 +60,7 @@ async def shutdown() -> None: @driver.on_bot_connect async def connect(bot) -> None: - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( int(superuser), "WebSocket 成功连接,数据开始传输。" @@ -68,7 +69,7 @@ async def connect(bot) -> None: @driver.on_bot_disconnect async def disconnect(bot) -> None: - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: try: await sv.NetworkPost.send_private_msg( int(superuser), @@ -82,7 +83,7 @@ ESSENTIAL_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'essential' os.makedirs(ESSENTIAL_DIR, exist_ok=True) # 处理:好友请求 -request_friend_event = sv.on_request("Friends request", rule=is_block()) +request_friend_event = sv.on_request(rule=is_block()) @request_friend_event.handle() async def _request_friend_event(bot, event: FriendRequestEvent) -> None: @@ -108,7 +109,7 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None: except WriteError: raise WriteError("Writing file failed!") - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: msg = ( "主人,收到一条好友请求:\n" f"请求人:{event.get_user_id()}\n" @@ -122,7 +123,7 @@ async def _request_friend_event(bot, event: FriendRequestEvent) -> None: # 处理:邀请入群,如身为管理,还附有入群请求 -request_group_event = sv.on_request("Group request",rule=is_block()) +request_group_event = sv.on_request(rule=is_block()) @request_group_event.handle() async def _request_group_event(bot, event: GroupRequestEvent) -> None: @@ -150,7 +151,7 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None: except WriteError: raise WriteError("Writing file failed!") - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: msg = ( "主人,收到一条入群请求:\n" f"请求人:{event.get_user_id()}\n" @@ -164,39 +165,39 @@ async def _request_group_event(bot, event: GroupRequestEvent) -> None: # 处理群成员变动 -group_member_event = sv.on_notice("Group member change") +group_member_event = sv.on_notice() @group_member_event.handle() -async def _group_member_event(bot: Bot, event) -> None: - if isinstance(event, GroupIncreaseNoticeEvent): +async def _group_member_event(bot: Bot, event: GroupIncreaseNoticeEvent) -> None: + msg = ( + "好欸!事新人!\n" + f"在下 {choice(list(Config.BotSelfConfig.nickname))} 哒!w!" + ) + await group_member_event.finish(msg) + +@group_member_event.handle() +async def _gro(bot: Bot, event: GroupDecreaseNoticeEvent) -> None: + if event.is_tome(): msg = ( - "好欸!事新人!\n" - f"在下 {choice(list(nonebot_config['nickname']))} 哒!w!" + "呜呜呜,主人" + f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." ) - await group_member_event.finish(msg) - - elif isinstance(event, GroupDecreaseNoticeEvent): - if event.is_tome(): - msg = ( - "呜呜呜,主人" - f"咱被群 {event.group_id} 里的 {event.operator_id} 扔出来了..." + for superuser in Config.BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg ) - for superuser in nonebot_config["superusers"]: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) - else: - await group_member_event.finish(f"{event.user_id} 离开了我们...") + else: + await group_member_event.finish(f"{event.user_id} 离开了我们...") # 处理群管理事件 -group_admin_event = sv.on_notice("Group admin change") +group_admin_event = sv.on_notice() @group_admin_event.handle() async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: if event.is_tome(): - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=int(superuser), message=f"好欸!主人!我在群 {event.group_id} 成为了管理!!" @@ -204,7 +205,7 @@ async def _group_admin_event(bot: Bot, event: GroupAdminNoticeEvent) -> None: # 处理群禁言事件 -group_ban_event = sv.on_notice("Group ban change") +group_ban_event = sv.on_notice() @group_ban_event.handle() async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: @@ -215,7 +216,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: f"咱在群 {event.group_id} 被 {event.operator_id} 塞上了口球...\n" f"时长...是 {event.duration} 秒" ) - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=int(superuser), message=msg @@ -225,7 +226,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: "好欸!主人\n" f"咱在群 {event.group_id} 被 {event.operator_id} 上的口球解除了!" ) - for superuser in nonebot_config["superusers"]: + for superuser in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=int(superuser), message=msg @@ -233,7 +234,7 @@ async def _group_ban_event(bot: Bot, event: GroupBanNoticeEvent) -> None: # 处理群红包运气王事件 -lucky_read_bag_event = sv.on_notice("Group read bag winner") +lucky_read_bag_event = sv.on_notice() @lucky_read_bag_event.handle() async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: @@ -245,7 +246,7 @@ async def _lucky_read_bag_event(bot, event: LuckyKingNotifyEvent) -> None: # 处理群文件上传事件 -group_file_upload_event = sv.on_notice("Group file change") +group_file_upload_event = sv.on_notice() @group_file_upload_event.handle() async def _group_file_upload_event(bot, @@ -254,51 +255,56 @@ async def _group_file_upload_event(bot, # 处理撤回事件 -recall_event = sv.on_notice("Group member recall") +recall_event = sv.on_notice() @recall_event.handle() -async def _recall_event(bot: Bot, event) -> None: - if isinstance(event, GroupRecallNoticeEvent): - repo = await bot.call_api( - "get_msg", - message_id=event.message_id - ) - repo = str(repo["message"]) - if "CQ" in repo: - repo = repo.replace("CQ", "QC") +async def _recall_event(bot: Bot, event: GroupRecallNoticeEvent) -> None: + group = event.group_id + repo = await bot.call_api( + "get_msg", + message_id=event.message_id + ) + repo = str(repo["message"]) + check = await coolq_code_check(repo, group=group) + if not check: + repo = repo.replace("CQ", "QC") - msg = ( - "主人,咱拿到了一条撤回信息!\n" - f"{event.user_id}@[群:{event.group_id}]\n" - "撤回了\n" - f"{repo}" + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{event.user_id}@[群:{event.group_id}]\n" + "撤回了\n" + f"{repo}" + ) + + await bot.send(event, "咱看到惹~!") + for superuser in Config.BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg ) - await bot.send(event, "咱看到惹~!") - for superuser in nonebot_config["superusers"]: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) +@recall_event.handle() +async def _rec(bot: Bot, event: FriendRecallNoticeEvent) -> None: + user = event.user_id + repo = await bot.call_api( + "get_msg", + message_id=event.message_id + ) + repo = str(repo["message"]) + check = await coolq_code_check(repo, user) + if not check: + repo = repo.replace("CQ", "QC") - elif isinstance(event, FriendRecallNoticeEvent): - repo = await bot.call_api( - "get_msg", - message_id=event.message_id - ) - repo = str(repo["message"]) - if "CQ" in repo: - repo = repo.replace("CQ", "QC") + msg = ( + "主人,咱拿到了一条撤回信息!\n" + f"{event.user_id}@[私聊]" + "撤回了\n" + f"{repo}" + ) - msg = ( - "主人,咱拿到了一条撤回信息!\n" - f"{event.user_id}@[私聊]" - "撤回了\n" - f"{repo}" + await bot.send(event, "咱看到惹~!") + for superuser in Config.BotSelfConfig.superusers: + await sv.NetworkPost.send_private_msg( + user_id=int(superuser), + message=msg ) - - for superuser in nonebot_config["superusers"]: - await sv.NetworkPost.send_private_msg( - user_id=int(superuser), - message=msg - ) diff --git a/ATRI/plugins/funny.py b/ATRI/plugins/funny.py index 472c77c..f334bcf 100644 --- a/ATRI/plugins/funny.py +++ b/ATRI/plugins/funny.py @@ -1,9 +1,12 @@ +import asyncio from pathlib import Path from random import choice, randint -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent +from nonebot.adapters.cqhttp.message import Message from ATRI.service import Service as sv +from ATRI.utils.limit import is_too_exciting from ATRI.rule import ( is_block, is_in_dormant, @@ -11,13 +14,18 @@ from ATRI.rule import ( ) -__plugin_name__ = "laugh" +__doc__ = """ +看不懂的笑话 +权限组:所有人 +用法: + 来句笑话 +""" get_laugh = sv.on_command( - name="看不懂的笑话", cmd="来句笑话", + docs=__doc__, rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) + & is_in_service('来句笑话') ) @get_laugh.handle() @@ -34,12 +42,10 @@ async def _get_laugh(bot: Bot, event: MessageEvent) -> None: await get_laugh.finish(result.replace("%name", user_name)) -__plugin_name__ = "wty" - me_to_you = sv.on_message( - rule=is_block() & is_in_dormant() & is_in_service(__plugin_name__) + rule=is_block() & is_in_dormant() & is_in_service('你又行了') ) -sv.manual_reg_service("你又彳亍了") +sv.manual_reg_service('你又行了') @me_to_you.handle() async def _me_to_you(bot: Bot, event: MessageEvent) -> None: @@ -47,3 +53,92 @@ async def _me_to_you(bot: Bot, event: MessageEvent) -> None: msg = str(event.message) if "我" in msg and "CQ" not in msg: await me_to_you.finish(msg.replace("我", "你")) + + +__doc__ = """ +随机选择一位群友成为我的老婆! +权限组:所有人 +用法: + 抽老婆 +""" + +roll_wife = sv.on_command( + cmd="抽老婆", + docs=__doc__, + rule=is_block() & is_in_dormant() & is_in_service('抽老婆') +) + +@roll_wife.handle() +async def _roll_wife(bot: Bot, event: GroupMessageEvent) -> None: + user = event.user_id + group = event.group_id + user_name = await bot.get_group_member_info(group_id=group, + user_id=user) + user_name = user_name['nickname'] + run = await is_too_exciting(user, group, 5, True) + if not run: + return + + luck_list = await bot.get_group_member_list(group_id=group) + luck_user = choice(luck_list) + luck_qq = luck_user['user_id'] + luck_user = luck_user['nickname'] + msg = ( + "5秒后咱将随机抽取一位群友成为\n" + f"{user_name} 的老婆!究竟是谁呢~?" + ) + await bot.send(event, msg) + await asyncio.sleep(5) + msg = ( + f"> {luck_user}({luck_qq})\n" + f"恭喜成为 {user_name} 的老婆~⭐" + ) + await bot.send(event, msg) + + +__doc__ = """ +伪造转发 +权限组:所有人 +用法: + /fm qq-name-msg... +补充: + qq: QQ号 + name: 消息中的ID + msg: 对应信息 +示例: + /fm 123456789*生草人*草 114514*仙贝*臭死了 +""" + +fake_msg = sv.on_command( + cmd="/fm", + docs=__doc__, + rule=is_block() & is_in_service('/fm') & is_in_dormant() +) + +@fake_msg.handle() +async def _fake_msg(bot: Bot, event: GroupMessageEvent) -> None: + msg = str(event.message).split(' ') + user = event.user_id + group = event.group_id + node = [] + check = await is_too_exciting(user, group, 2, True) + + if check: + for i in msg: + args = i.split('*') + print(args) + qq = args[0] + name = args[1].replace('[', '[') + name = name.replace(']', ']') + repo = args[2].replace('[', '[') + repo = repo.replace(']', ']') + dic = { + "type": "node", + "data": { + "name": name, + "uin": qq, + "content": repo + } + } + node.append(dic) + await bot.send_group_forward_msg(group_id=group, messages=node) diff --git a/ATRI/plugins/github.py b/ATRI/plugins/github.py index 167ca2f..ce118cd 100644 --- a/ATRI/plugins/github.py +++ b/ATRI/plugins/github.py @@ -12,7 +12,6 @@ URL = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}" github_issues = sv.on_message(rule=is_block() & is_in_dormant()) -sv.manual_reg_service("GitHubIssue速览") @github_issues.handle() async def _github_issues(bot: Bot, event: MessageEvent) -> None: diff --git a/ATRI/plugins/help.py b/ATRI/plugins/help.py new file mode 100644 index 0000000..8744262 --- /dev/null +++ b/ATRI/plugins/help.py @@ -0,0 +1,67 @@ +import os +import json + +from nonebot.adapters.cqhttp import Bot, MessageEvent + +from ATRI.service import SERVICE_DIR +from ATRI.service import Service as sv +from ATRI.rule import is_block + + +SERVICE_DIR = SERVICE_DIR / 'services' + + +__doc__ = """ +查询命令用法 +权限组:所有人 +用法: + /h + /h list + /h info (cmd) +""" + +help = sv.on_command( + cmd="/h", + docs=__doc__, + aliases={'/help', '.help'}, + rule=is_block() +) + +async def _help(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + if not msg: + msg = ( + "呀?找不到路了?\n" + "/h list 查看可用命令列表\n" + "/h info (cmd) 查看命令具体帮助\n" + "项目地址:github.com/Kyomotoi/ATRI\n" + "咱只能帮你这么多了qwq" + ) + await help.finish(msg) + elif msg[0] == "list": + files = [] + for _, _, i in os.walk(SERVICE_DIR): + for a in i: + files.append(a.replace('.json', '')) + cmds = " ".join(map(str, files)) + msg = "咱能做很多事!比如:\n" + cmds + msg0 = msg + "\n具体用法呢,/(cmd) 就好!" + await help.finish(msg0) + elif msg[0] == "info": + cmd = msg[1] + data = {} + path = SERVICE_DIR / f"{cmd.replace('/', '')}.json" + try: + data = json.loads(path.read_bytes()) + except: + await help.finish('未找到相关命令...') + + msg = ( + f"{cmd} INFO:\n" + f"Enabled: {data['enabled']}\n" + f"{data['docs']}" + ) + await help.finish(msg) + else: + await help.finish('请检查输入...') diff --git a/ATRI/plugins/hitokoto.py b/ATRI/plugins/hitokoto.py index 3754125..e7cfe61 100644 --- a/ATRI/plugins/hitokoto.py +++ b/ATRI/plugins/hitokoto.py @@ -19,14 +19,23 @@ HITOKOTO_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'hitokoto' sick_list = [] -__plugin_name__ = 'hitokoto' +__doc__ = """ +抑郁一下 +权限组:所有人 +用法: + @一言 + 抑郁一下 + 网抑云 +补充: + @:at Bot +""" hitokoto = sv.on_command( - name="Hitokoto", cmd="一言", + docs=__doc__, aliases={"抑郁一下", "网抑云"}, rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) & to_bot() + & is_in_service('一言') & to_bot() ) @hitokoto.handle() diff --git a/ATRI/plugins/key-repo/__init__.py b/ATRI/plugins/key-repo/__init__.py index 4b2241c..dc606b0 100644 --- a/ATRI/plugins/key-repo/__init__.py +++ b/ATRI/plugins/key-repo/__init__.py @@ -1,171 +1,185 @@ -import os -import json -import time -from random import choice -from pathlib import Path +import string +from datetime import datetime +from random import choice, sample -from nonebot.typing import T_State -from nonebot.adapters.cqhttp import Bot, MessageEvent +from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent -from ATRI.config import nonebot_config +from ATRI.config import Config from ATRI.service import Service as sv +from ATRI.utils.request import get_bytes from ATRI.rule import is_block, is_in_dormant, is_in_service, to_bot - -# 此功能未完善 +from .data_source import ( + add_history, + add_key_temp, + load_key_data, + add_key, + load_key_history, + load_key_temp_data, + del_key_temp +) -KEYREPO_DIV = Path('.') / 'ATRI' / 'data' / 'database' / 'KeyRepo' -os.makedirs(KEYREPO_DIV, exist_ok=True) +# 此功能暂未完善:未添加关键词删除 +# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +# !屎山注意!屎山注意!屎山注意!屎山注意! +# !请自备降压药!请自备降压药! +# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -__plugin_name__ = "KeyRepo" keyrepo = sv.on_message(rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) + & is_in_service('keyrepo') & to_bot()) @keyrepo.handle() async def _keyrepo(bot: Bot, event: MessageEvent) -> None: msg = str(event.get_message()) - - file_name = "data.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - with open(path, 'w') as r: - r.write(json.dumps({})) - data = {} + data = load_key_data() for key in data.keys(): if key in msg: await keyrepo.finish(choice(data[key])) +__doc__ = """ +关键词申请/审核 +权限组:所有人 +用法: + /train add (key) (repo) + 对于维护者: + /train list + /train info (key) + /train r (code) (0,1) +补充: + key: 关键词 + repo: 回复 + 0,1: 对应布尔值False/True + code: 唯一识别码 +示例: + /train add hso 好涩哦 +""" + train = sv.on_command( - name="调教", cmd="/train", + docs=__doc__, rule=is_block() ) [email protected]("key", prompt="哦哦哦要开始学习了!请告诉咱知识点") -async def _train(bot: Bot, event: MessageEvent, state: T_State) -> None: - if "[CQ" in state["key"]: - await train.reject("仅支持纯文本呢...") - [email protected]("repo", prompt="咱该如何回答呢?") -async def _trainR(bot: Bot, event: MessageEvent, state: T_State) -> None: - if "[CQ" in state["repo"]: - await train.reject("仅支持纯文本呢...") - - if state["key"] == "-d": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - key = state["repo"] - if key not in data: - await train.finish("未发现该待审核的知识点呢...") - else: +async def _train(bot: Bot, event: GroupMessageEvent) -> None: + user = event.user_id + group = event.group_id + + msg = str(event.message).split(' ') + _type = msg[0] + code = "".join(sample(string.ascii_letters + string.digits, 10)) + + if _type == "add": + key = msg[1] + args = msg[2] + if user in Config.BotSelfConfig.superusers: + add_key(key, args) msg = ( - f"Key: {key}\n" - f"Repo: {data[key]['repo']}\n" - "已经从咱的审核列表移除!" + "好欸学到了新的知识!\n" + f"关键词:{key}\n" + f"回复:{args}" ) - del data[key] - with open(path, 'w') as r: - r.write(json.dumps(data)) - await train.finish(msg) - elif state["key"] == "-i": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - if state["repo"] not in data: - await train.finish("未发现该知识点呢") - key = data[state["repo"]] - - msg = ( - f"用户: {key['user']}\n" - f"知识点: {state['repo']}" - f"回复: {key['repo']}" - f"时间: {key['time']}" - "/train -r 知识点 y/n" - ) - await train.finish(msg) - elif state["key"] == "-ls": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - keys = ",".join(data.keys()) - msg = f"目前等待审核的有如下:\n{keys}" - await train.finish(msg) - elif state["key"] == "-r": - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - - key = state["key"] - repo = state["repo"] - user = event.get_user_id() - if user not in nonebot_config["superusers"]: - file_name = "review.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - if key in data: - msg = "欸欸欸,该词还在等待咱的审核,请先等先来的审核完再提交吧..." + data = { + "user": user, + "group": group, + "time": str(datetime.now()), + "pass": True, + "key": key, + "repo": args, + "feature": code + } + add_history(data) await train.finish(msg) else: - data[key] = { + data = { "user": user, - "repo": repo, - "time": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + "group": group, + "time": str(datetime.now()), + "pass": False, + "key": key, + "repo": args, + "feature": code } - with open(path, 'r') as r: - r.write(json.dumps(data, indent=4)) - + add_key_temp(data) msg = ( - "欸欸欸这不错欸!不过,还是先等待咱审核审核," - "如想撤销本次学习,请发送 /train -d 知识点" + "感谢你的提交w,所提交的关键词将由维护者进行审核\n" + f"识别码:{code},你可以使用/train info 识别码\n" + "以查询是否通过" ) await train.finish(msg) - - else: - file_name = "data.json" - path = KEYREPO_DIV / file_name - try: - data = json.loads(path.read_bytes()) - except: - data = {} - - if key in data: - repo_list: list = data[key] - repo_list.append(repo) - data[key] = repo_list - msg = f"哦哦哦,{key}原来还有这样的回复,学到了~!" - await bot.send(event, msg) + elif _type == "list": + data = load_key_temp_data() + node = [] + for i in data: + dic = { + "type": "node", + "data": { + "name": "idk", + "uin": i['user'], + "content": f"Key: {i['key']}\nRepo: {i['repo']}\nTime: {i['time']}" + } + } + node.append(dic) + if not node: + node = [{ + "type": "node", + "data": { + "name": "null", + "uin": str(user), + "content": "这里什么也没有呢..." + } + }] + await bot.send_group_forward_msg(group_id=group, messages=node) + elif _type == "info": + key = msg[1] + data = load_key_history() + for i in data: + if i['key'] == key: + msg = ( + f"{key} 审核信息:\n" + f"是否通过:{i['pass']}" + f"结果: K: {i['key']} | R: {i['repo']}\n" + f"来自:{i['user']}@[群:{i['group']}]\n" + f"申请时间:{i['time']}" + ) + await train.finish(msg) + else: + await train.finish('未找到相关信息...') + elif _type == "r": + key = msg[1] + args = int(msg[2]) + data = load_key_temp_data() + if user in Config.BotSelfConfig.superusers: + if args not in [0, 1]: + await train.finish('请检查输入...') + else: + for i in data: + if bool(args): + if i['key'] == key: + msg = ( + "好欸学到了新的知识!\n" + f"关键词:{i['key']}\n" + f"回复:{i['repo']}" + ) + add_key(i['key'], i['repo']) + add_history(i) + await train.finish(msg) + else: + await train.finish('未找到相关信息...') + else: + add_history(i, False) + if del_key_temp(i): + await train.finish('已标记为不通过') + else: + await train.finish('未找到相关信息') else: - data[key] = [repo] - msg = "好欸,咱学到了新的知识点!" - await bot.send(event, msg) - - with open(path, 'w') as r: - r.write(json.dumps(data)) + await train.finish('不行哦~你的权限使得你没法这样做!') + else: + await train.finish('请检查输入...') diff --git a/ATRI/plugins/key-repo/data_source.py b/ATRI/plugins/key-repo/data_source.py new file mode 100644 index 0000000..3dd331d --- /dev/null +++ b/ATRI/plugins/key-repo/data_source.py @@ -0,0 +1,116 @@ +import os +import json +from pathlib import Path +from typing import Optional + + +KEYREPO_DIV = Path('.') / 'ATRI' / 'data' / 'database' / 'KeyRepo' +os.makedirs(KEYREPO_DIV, exist_ok=True) + + +def load_key_data() -> dict: + file_name = "data.json" + path = KEYREPO_DIV / file_name + try: + data = json.loads(path.read_bytes()) + except: + with open(path, 'w') as r: + r.write(json.dumps({})) + data = {} + return data + + +def load_key_temp_data() -> list: + file_name = "data.temp.json" + path = KEYREPO_DIV / file_name + try: + data = json.loads(path.read_bytes()) + except: + with open(path, 'w') as r: + r.write(json.dumps([])) + data = [] + return data + + +def load_key_history() -> list: + file_name = "data.history.json" + path = KEYREPO_DIV / file_name + try: + data = json.loads(path.read_bytes()) + except: + with open(path, 'w') as r: + r.write(json.dumps([])) + data = [] + return data + + +def save_key_data(d: dict) -> None: + file_name = "data.json" + path = KEYREPO_DIV / file_name + with open(path, 'w') as r: + r.write(json.dumps(d)) + + +def save_key_temp_data(d: list) -> None: + file_name = "data.temp.json" + path = KEYREPO_DIV / file_name + with open(path, 'w') as r: + r.write(json.dumps(d)) + + +def save_key_history_data(d: list) -> None: + file_name = "data.history.json" + path = KEYREPO_DIV / file_name + with open(path, 'w') as r: + r.write(json.dumps(d)) + + +def add_key(key: str, repo: str) -> str: + data = load_key_data() + data_1 = data.get(key, []) + if repo in data_1: + return "该回复已存在~!" + data_1.append(repo) + data[key] = data_1 + save_key_data(data) + return "成功,又学到新知识了~!" + + +def add_key_temp(d: dict) -> None: + data: list = load_key_temp_data() + data.append(d) + save_key_temp_data(data) + add_history(d, False) + + +def add_history(d: dict, p: bool = True) -> None: + d['pass'] = p + data: list = load_key_history() + data.append(d) + save_key_history_data(data) + + +def del_key(key: str, repo: str) -> str: + data = load_key_data() + if repo == 'isALL': + del data[key] + msg = f"成功删除关键词[{key}]下所有回复..." + else: + data_1 = data.get(key, []) + try: + data_1.remove(key) + except KeyError: + raise KeyError('Find repo error.') + data[key] = data_1 + msg = f"成功删除关键词[{key}]下回复:{repo}" + save_key_data(data) + return msg + + +def del_key_temp(d: dict) -> bool: + data = load_key_temp_data() + if d in data: + data.remove(d) + return True + else: + return False diff --git a/ATRI/plugins/nsfw.py b/ATRI/plugins/nsfw.py new file mode 100644 index 0000000..10e74fb --- /dev/null +++ b/ATRI/plugins/nsfw.py @@ -0,0 +1,103 @@ +import re +import json + +from nonebot.adapters.cqhttp import Bot, GroupMessageEvent +from nonebot.typing import T_State + +from ATRI.log import logger as log +from ATRI.config import Config +from ATRI.service import Service as sv +from ATRI.exceptions import RequestTimeOut +from ATRI.rule import is_block, is_in_dormant, is_in_service +from ATRI.utils.request import get_bytes +from ATRI.utils.cqcode import coolq_code_check + + +nsfw_url = ( + f"http://{Config.NsfwCheck.host}:" + f"{Config.NsfwCheck.port}/?url=" +) + + +nsfw_checking = sv.on_message() + +@nsfw_checking.handle() +async def _nsfw_checking(bot: Bot, event: GroupMessageEvent) -> None: + if Config.NsfwCheck.enabled: + msg = str(event.message) + user = event.user_id + group = event.group_id + check = await coolq_code_check(msg, user, group) + if check: + url = nsfw_url + re.findall(r"url=(.*?)]", msg)[0] + try: + data = json.loads(await get_bytes(url)) + except: + log.warning('检测涩图失败,请查阅文档以获取帮助') + return + if round(data['score'], 4) > 0.6: + score = "{:.2%}".format(round(data['score'], 4)) + log.debug(f'截获涩图,得分:{score}') + await bot.send(event, f'好涩哦!涩值:{score}\n不行了咱要发给主人看!') + for sup in Config.BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=f"{msg}\n涩值: {score}") + else: + pass + + +__doc__ = """ +检测你图片的涩值 +权限组:所有人 +用法: + /nsfw (pic) +补充: + pic: 图片 +示例: + /nsfw 然后Bot会向你索取图片 +""" + +nsfw_reading = sv.on_command( + cmd="/nsfw", + docs=__doc__, + rule=is_block() & is_in_service('/nsfw') & is_in_dormant() +) + +@nsfw_reading.handle() +async def _nsfw_r(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: + user = event.user_id + group = event.group_id + msg = str(event.message).strip() + check = await coolq_code_check(msg, user, group) + if check and msg: + state['pic'] = msg + +@nsfw_reading.got('pic', prompt='请提供一张图片') +async def _nsfw_reading(bot: Bot, + event: GroupMessageEvent, + state: T_State) -> None: + msg = state['pic'] + pic = re.findall(r"url=(.*?)]", msg) + if not pic: + await nsfw_reading.reject('请发送图片而不是其它东西!!') + + url = nsfw_url + pic[0] + try: + data = json.loads(await get_bytes(url)) + except RequestTimeOut: + raise RequestTimeOut('Time out!') + + score = round(data['score'], 4) + result = "{:.2%}".format(round(data['score'], 4)) + if score > 0.9: + level = "hso! 我要发给主人看!" + for sup in Config.BotSelfConfig.superusers: + await bot.send_private_msg(user_id=sup, message=f"{state['pic']}\n涩值: {result}") + elif 0.9 > score >= 0.6: + level = "嗯,可冲" + else: + level = "?能不能换张55完全冲不起来" + + repo = f"涩值:{result}\n{level}" + await nsfw_reading.finish(repo)
\ No newline at end of file diff --git a/ATRI/plugins/rich/__init__.py b/ATRI/plugins/rich/__init__.py index 7c8179d..8404da6 100644 --- a/ATRI/plugins/rich/__init__.py +++ b/ATRI/plugins/rich/__init__.py @@ -14,7 +14,7 @@ from ATRI.rule import ( from .data_source import dec -waiting_list = [] +temp_list = [] bilibili_rich = sv.on_message( @@ -24,17 +24,10 @@ sv.manual_reg_service("监听b站小程序") @bilibili_rich.handle() async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: - global waiting_list + global temp_list msg = str(event.raw_message).replace("\\", "") - user = event.user_id bv = False - if count_list(waiting_list, user) == 5: - waiting_list = del_list_aim(waiting_list, user) - return - - waiting_list.append(user) - if "qqdocurl" not in msg: if "av" in msg: av = re.findall(r"(av\d+)", msg)[0].replace('av', '') @@ -60,6 +53,13 @@ async def _bilibili_rich(bot: Bot, event: MessageEvent) -> None: else: return + if count_list(temp_list, av) == 4: + await bot.send(event, "你是怕别人看不到么发这么多次?") + temp_list = del_list_aim(temp_list, av) + return + + temp_list.append(av) + try: URL = f"https://api.kyomotoi.moe/api/bilibili/v2/?aid={av}" except: diff --git a/ATRI/plugins/status.py b/ATRI/plugins/status.py index c3b033c..25e4e0b 100644 --- a/ATRI/plugins/status.py +++ b/ATRI/plugins/status.py @@ -6,12 +6,12 @@ from ATRI.service import Service as sv from ATRI.rule import is_block from ATRI.exceptions import GetStatusError from ATRI.utils.apscheduler import scheduler -from ATRI.config import nonebot_config +from ATRI.config import Config ping = sv.on_command( - name="测试机器人", cmd="/ping", + docs="测试机器人", rule=is_block() ) @@ -21,8 +21,8 @@ async def _ping(bot: Bot, event: MessageEvent) -> None: status = sv.on_command( - name="检查机器人状态", cmd="/status", + docs="检查机器人状态", rule=is_block() ) @@ -98,7 +98,7 @@ async def _(): f"* netRECV: {inteRECV}MB\n" ) + msg - for sup in nonebot_config["superusers"]: + for sup in Config.BotSelfConfig.superusers: await sv.NetworkPost.send_private_msg( user_id=sup, message=msg0 diff --git a/ATRI/plugins/utils/__init__.py b/ATRI/plugins/utils/__init__.py index 10b3317..b23a01f 100644 --- a/ATRI/plugins/utils/__init__.py +++ b/ATRI/plugins/utils/__init__.py @@ -7,16 +7,25 @@ from ATRI.rule import ( is_in_dormant, is_in_service ) -from .data_source import roll_dice +from .data_source import roll_dice, Encrypt -__plugin_name__ = "roll" +__doc__ = """ +roll一下 +权限组:所有人 +用法: + /roll (int)d(int)+... +补充: + int: 阿拉伯数字 +示例: + /roll 1d10+10d9+4d5+2d3 +""" roll = sv.on_command( - name="roll一下", cmd="/roll", + docs=__doc__, rule=is_block() & is_in_dormant() - & is_in_service(__plugin_name__) + & is_in_service('/roll') ) @roll.handle() @@ -34,3 +43,36 @@ async def _(bot: Bot, event: MessageEvent, state: dict) -> None: await roll.finish("请输入正确的参数!!\ndemo:1d10 或 2d10+2d10") await roll.finish(roll_dice(resu)) + + +__doc__ = """ +加密你的信息! +权限组:所有人 +用法: + /enc e,d msg +补充: + e,d:对应 编码/解码 + msg: 目标内容 +示例: + /enc e アトリは高性能ですから! +""" + +encrypt = sv.on_command( + cmd="/enc", + docs=__doc__, + rule=is_block() & is_in_service('/enc') & is_in_dormant() +) + +async def _encrypt(bot: Bot, event: MessageEvent) -> None: + msg = str(event.message).split(' ') + _type = msg[0] + s = msg[1] + e = Encrypt() + + if _type == "e": + await encrypt.finish(e.encode(s)) + elif _type == "d": + await encrypt.finish(e.decode(s)) + else: + await encrypt.finish('请检查输入~!') diff --git a/ATRI/plugins/utils/data_source.py b/ATRI/plugins/utils/data_source.py index f41a1d1..71da581 100644 --- a/ATRI/plugins/utils/data_source.py +++ b/ATRI/plugins/utils/data_source.py @@ -1,5 +1,7 @@ import re import random +from math import floor +from typing import Union def roll_dice(par: str) -> str: @@ -32,3 +34,140 @@ def roll_dice(par: str) -> str: result = f"{par}=({proc})={result}" return result + + +class Encrypt(): + cr = 'ĀāĂ㥹ÀÁÂÃÄÅ' + cc = 'ŢţŤťŦŧṪṫṬṭṮṯṰṱ' + cn = 'ŔŕŘřṘṙŖŗȐȑȒȓṚṛṜṝṞṟɌɍⱤɽᵲᶉɼɾᵳʀRr' + cb = 'ĨĩĪīĬĭĮįİı' + + sr = len(cr) + sc = len(cc) + sn = len(cn) + sb = len(cb) + src = sr * sc + snb = sn * sb + scnb = sc * snb + + def _div(self, a: int, b: int) -> int: + return floor(a / b) + + def _encodeByte(self, i) -> Union[str, None]: + if i > 0xFF: + raise ValueError('ERROR! rc/nb overflow') + + if i > 0x7F: + i = i & 0x7F + return self.cn[self._div(i, self.sb) + int(self.cb[i % self.sb])] + + return self.cr[self._div(i, self.sc) + int(self.cc[i % self.sc])] + + def _encodeShort(self, i) -> str: + if i > 0xFFFF: + raise ValueError('ERROR! rcnb overflow') + + reverse = False + if i > 0x7FFF: + reverse = True + i = i & 0x7FFF + + char = [ + self._div(i, self.scnb), + self._div(i % self.scnb, self.snb), + self._div(i % self.snb, self.sb), i % self.sb + ] + char = [ + self.cr[char[0]], self.cc[char[1]], self.cn[char[2]], + self.cb[char[3]] + ] + + if reverse: + return char[2] + char[3] + char[0] + char[1] + + return ''.join(char) + + def _decodeByte(self, c) -> int: + nb = False + idx = [self.cr.index(c[0]), self.cc.index(c[1])] + if idx[0] < 0 or idx[1] < 0: + idx = [self.cn.index(c[0]), self.cb.index(c[1])] + nb = True + raise ValueError('ERROR! rc/nb overflow') + + result = idx[0] * self.sb + idx[1] if nb else idx[0] * self.sc + idx[1] + if result > 0x7F: + raise ValueError('ERROR! rc/nb overflow') + + return result | 0x80 if nb else 0 + + def _decodeShort(self, c) -> int: + reverse = c[0] not in self.cr + if not reverse: + idx = [ + self.cr.index(c[0]), + self.cc.index(c[1]), + self.cn.index(c[2]), + self.cb.index(c[3]) + ] + else: + idx = [ + self.cr.index(c[2]), + self.cc.index(c[3]), + self.cn.index(c[0]), + self.cb.index(c[1]) + ] + + if idx[0] < 0 or idx[1] < 0 or idx[2] < 0 or idx[3] < 0: + raise ValueError('ERROR! not rcnb') + + result = idx[0] * self.scnb + idx[1] * self.snb + idx[ + 2] * self.sb + idx[3] + if result > 0x7FFF: + raise ValueError('ERROR! rcnb overflow') + + result |= 0x8000 if reverse else 0 + return result + + def _encodeBytes(self, b) -> str: + result = [] + for i in range(0, (len(b) >> 1)): + result.append(self._encodeShort((b[i * 2] << 8 | b[i * 2 + 1]))) + + if len(b) & 1 == 1: + result.append(self._encodeByte(b[-1])) + + return ''.join(result) + + def encode(self, s: str, encoding: str = 'utf-8'): + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + return self._encodeBytes(s.encode(encoding)) + + def _decodeBytes(self, s: str): + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + if len(s) & 1: + raise ValueError('ERROR length') + + result = [] + for i in range(0, (len(s) >> 2)): + result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) >> 8])) + result.append(bytes([self._decodeShort(s[i * 4:i * 4 + 4]) & 0xFF + ])) + + if (len(s) & 2) == 2: + result.append(bytes([self._decodeByte(s[-2:])])) + + return b''.join(result) + + def decode(self, s: str, encoding: str = 'utf-8') -> str: + if not isinstance(s, str): + raise ValueError('Please enter str instead of other') + + try: + return self._decodeBytes(s).decode(encoding) + except UnicodeDecodeError: + raise ValueError('Decoding failed') diff --git a/ATRI/rule.py b/ATRI/rule.py index 7cdeb0a..109d644 100644 --- a/ATRI/rule.py +++ b/ATRI/rule.py @@ -1,13 +1,8 @@ -import datetime -from random import choice - from nonebot.rule import Rule from nonebot.adapters.cqhttp import GroupMessageEvent, PokeNotifyEvent from .config import config from .service import Service as sv -from .utils.list import count_list, del_list_aim -from .utils.apscheduler import scheduler, DateTrigger def is_in_service(service: str) -> Rule: @@ -34,60 +29,6 @@ def is_in_dormant() -> Rule: return Rule(_is_in_dormant) -exciting_user = [] -exciting_repo = [ - "歇歇8,。咱8能再快了", - "太快惹,太快惹嗯", - "你吼辣么快干什么!", - "其实吧我觉得你这速度去d个vup挺适合", - "我不接受!你太快了", - "我有点担心,因为你太快了", - "请稍等!您冲得太快了!" -] - -def is_too_exciting(times: int, repo: bool) -> Rule: - def del_list(user: str) -> None: - global exciting_user - exciting_user = del_list_aim(exciting_user, user) - - async def _is_too_exciting(bot, event, state) -> bool: - global exciting_user - user = event.get_user_id() - - if user in exciting_user: - if repo: - await bot.send( - event, - choice(exciting_repo) - ) - return False - else: - if count_list(exciting_user, user) == times: - delta = datetime.timedelta( - seconds=config["BotSelfConfig"]["session_exciting_time"]) - trigger = DateTrigger( - run_date=datetime.datetime.now() + delta) - - scheduler.add_job( - func=del_list, - trigger=trigger, - args=(user,), - misfire_grace_time=1, - ) - - if repo: - await bot.send( - event, - choice(exciting_repo) - ) - return False - else: - exciting_user.append(user) - return True - - return Rule(_is_too_exciting) - - def to_bot() -> Rule: async def _to_bot(bot, event, state) -> bool: return event.is_tome() diff --git a/ATRI/service.py b/ATRI/service.py index ab870fd..df2e8d5 100644 --- a/ATRI/service.py +++ b/ATRI/service.py @@ -19,7 +19,7 @@ from nonebot.typing import T_State, T_Handler, T_RuleChecker from nonebot.rule import Rule, command, keyword from .log import logger as log -from .config import config +from .config import Config from .utils.request import post_bytes if TYPE_CHECKING: @@ -32,8 +32,8 @@ os.makedirs(SERVICE_DIR, exist_ok=True) os.makedirs(SERVICES_DIR, exist_ok=True) -matcher_list: list = [] is_sleep: bool = False +matcher_list: list = [] def _load_block_list() -> dict: @@ -43,8 +43,8 @@ def _load_block_list() -> dict: data = json.loads(file.read_bytes()) except: data = { - "user": [], - "group": [] + "user": {}, + "group": {} } with open(file, "w") as r: r.write(json.dumps(data, indent=4)) @@ -59,16 +59,17 @@ def _save_block_list(data: dict) -> None: def _load_service_config(service: str, docs: str = None) -> dict: - file_name = service + ".json" + file_name = service.replace('/', '') + ".json" file = SERVICES_DIR / file_name try: data = json.loads(file.read_bytes()) except: service_info = { - "name": service, + "command": service, "docs": docs, - "disable_user": _load_block_list()['user'], - "disable_group": _load_block_list()['group'] + "enabled": True, + "disable_user": {}, + "disable_group": {} } with open(file, "w") as r: r.write(json.dumps(service_info, indent=4)) @@ -77,7 +78,7 @@ def _load_service_config(service: str, docs: str = None) -> dict: def _save_service_config(service: str, data: dict) -> None: - file_name = service + ".json" + file_name = service.replace('/', '') + ".json" file = SERVICES_DIR / file_name with open(file, "w") as r: r.write(json.dumps(data, indent=4)) @@ -85,18 +86,19 @@ def _save_service_config(service: str, data: dict) -> None: class Service: """ - 集成一套服务管理,block准确至个人 + 集成一套服务管理,对功能信息进行持久化 计划搭配前端使用 """ @staticmethod def manual_reg_service(service: str): - file_name = service + ".json" + file_name = service.replace('/', '') + ".json" file = SERVICES_DIR / file_name service_info = { "name": service, "docs": None, - "disable_user": _load_block_list()['user'], - "disable_group": _load_block_list()['group'] + "enabled": True, + "disable_user": {}, + "disable_group": {} } with open(file, "w") as r: r.write(json.dumps(service_info, indent=4)) @@ -107,16 +109,36 @@ class Service: return False if group in data["disable_group"] else True @staticmethod - def control_service(service: str, group: int, is_enable: bool) -> None: + def control_service(service: str, + is_global: bool, + is_enabled: bool, + user: Optional[int] = None, + group: Optional[int] = None) -> None: data = _load_service_config(service) - sv_group = data.get('disable_group', []) - if is_enable: - sv_group.remove(group) - log.info(f"Service {service} has been enabled.") + + if is_global: + status = "disabled" if is_enabled else "enabled" + data['enbaled'] = is_enabled + log.info(f"Service: {service} has been {status}.") else: - sv_group.append(group) - log.info(f"Service {service} has been disabled.") - data["disable_group"] = sv_group + if user: + if is_enabled: + data['disable_user'][user] = str(datetime.now()) + log.info(f"New service blocked user: {user}" + f" | Service: {service} | Time: {datetime.now()}") + else: + del data['disable_user'][user] + log.info(f"User: {user} has been unblock" + f" | Service: {service} | Time: {datetime.now()}") + else: + if is_enabled: + data['disable_group'][group] = str(datetime.now()) + log.info(f"New service blocked group: {group}" + f" | Service: {service} | Time: {datetime.now()}") + else: + del data['disable_group'][group] + log.info(f"Group: {group} has been unblock" + f" | Service: {service} | Time: {datetime.now()}") _save_service_config(service, data) @staticmethod @@ -139,9 +161,7 @@ class Service: return matcher @staticmethod - def on_notice(name: str, - docs: Optional[str] = None, - rule: Optional[Union[Rule, T_RuleChecker]] = None, + def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None, *, handlers: Optional[List[T_Handler]] = None, temp: bool = False, @@ -156,14 +176,10 @@ class Service: block=block, handlers=handlers, default_state=state) - _load_service_config(name, docs) - matcher_list.append(name) return matcher @staticmethod - def on_request(name: str, - docs: Optional[str] = None, - rule: Optional[Union[Rule, T_RuleChecker]] = None, + def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None, *, handlers: Optional[List[T_Handler]] = None, temp: bool = False, @@ -178,13 +194,10 @@ class Service: block=block, handlers=handlers, default_state=state) - _load_service_config(name, docs) - matcher_list.append(name) return matcher @classmethod def on_command(cls, - name: str, cmd: Union[str, Tuple[str, ...]], docs: Optional[str] = None, rule: Optional[Union[Rule, T_RuleChecker]] = None, @@ -203,34 +216,31 @@ class Service: handlers.insert(0, _strip_cmd) commands = set([cmd]) | (aliases or set()) - _load_service_config(name, docs) - matcher_list.append(name) + _load_service_config(str(cmd), docs) return cls.on_message(command(*commands) & rule, handlers=handlers, **kwargs) @classmethod def on_keyword(cls, - name: str, keywords: Set[str], docs: Optional[str] = None, rule: Optional[Union[Rule, T_RuleChecker]] = None, **kwargs) -> Type[Matcher]: - _load_service_config(name, docs) - matcher_list.append(name) + _load_service_config(list(keywords)[0], docs) return cls.on_message(keyword(*keywords) & rule, **kwargs) class NetworkPost: URL = ( - f"http://{config['NetworkPost']['host']}:" - f"{config['NetworkPost']['port']}/" + f"http://{Config.NetworkPost.host}:" + f"{Config.NetworkPost.port}/" ) @classmethod async def send_private_msg(cls, user_id: int, message: str, - auto_escape: bool = False): # -> Dict[str, Any] + auto_escape: bool = False) -> Dict[str, Any]: url = cls.URL + "send_private_msg?" params = { "user_id": user_id, @@ -249,13 +259,23 @@ class Service: ... @classmethod - def send_msg(cls, - message_type: Optional[str] = ..., - user_id: Optional[int] = ..., - group_id: Optional[int] = ..., + async def send_msg(cls, + message_type: Optional[str] = "", + user_id: Optional[int] = None, + group_id: Optional[int] = None, message = Union[str], - auto_escape: bool = ...) -> Dict[str, Any]: - ... + auto_escape: bool = False) -> Dict[str, Any]: + url = cls.URL + "send_msg?" + params = { + "message_type": "", + "user_id": user_id, + "group_id": group_id, + "message": message, + "auto_escape": str(auto_escape) + } + result = json.loads(await post_bytes(url, params)) + log.debug(result) + return result @classmethod def delete_msg(cls, @@ -448,24 +468,22 @@ class Service: @classmethod def control_list(cls, - is_enable: bool, + is_enabled: bool, user: Optional[int] = None, group: Optional[int] = None) -> None: data = _load_block_list() if user: - if is_enable: - data['user'][user] = datetime.now().__str__ + if is_enabled: + data['user'][user] = str(datetime.now()) log.info(f"New blocked user: {user} | Time: {datetime.now()}") else: - del data[user] + del data['user'][str(user)] log.info(f"User {user} has been unblock.") elif group: - if is_enable: - data['group'][group] = datetime.now().__str__ + if is_enabled: + data['group'][group] = str(datetime.now()) log.info(f"New blocked group: {group} | Time: {datetime.now()}") else: - del data[user] + del data['group'][str(group)] log.info(f"Group {group} has been unblock.") - - with open(cls.path, "w") as r: - json.dump(data, r) + _save_block_list(data) diff --git a/ATRI/utils/cqcode.py b/ATRI/utils/cqcode.py new file mode 100644 index 0000000..3102a92 --- /dev/null +++ b/ATRI/utils/cqcode.py @@ -0,0 +1,29 @@ +import re +from typing import Optional + +from ATRI.service import Service as sv + + +tencent_gchat_url = "gchat.qpic.cn" +noob_code = ["record", "video", "music", "xml", "json"] + + +async def coolq_code_check(cq_code: str, + user: Optional[int] = None, + group: Optional[int] = None): + _type = re.findall(r"CQ:(.*?),", cq_code) + for i in _type: + if i == "image": + result = re.findall(r"url=(.*?)]", cq_code) + url = "" if not result else result[0] + if tencent_gchat_url not in url: + msg = "你注你🐎呢" + await sv.NetworkPost.send_msg(user_id=user, + group_id=group, + message=msg) + else: + return True + elif i in noob_code: + return False + else: + return True
\ No newline at end of file diff --git a/ATRI/utils/limit.py b/ATRI/utils/limit.py new file mode 100644 index 0000000..06b1f35 --- /dev/null +++ b/ATRI/utils/limit.py @@ -0,0 +1,58 @@ +import datetime +from random import choice + +from ATRI.config import config +from ATRI.service import Service as sv +from .list import count_list, del_list_aim +from .apscheduler import scheduler, DateTrigger + + +exciting_user_temp = [] +exciting_user = [] +exciting_repo = [ + "歇歇8,。咱8能再快了", + "太快惹,太快惹嗯", + "你吼辣么快干什么!", + "其实吧我觉得你这速度去d个vup挺适合", + "我不接受!你太快了", + "我有点担心,因为你太快了", + "请稍等!您冲得太快了!" +] + + +def del_list(user: str) -> None: + global exciting_user + exciting_user = del_list_aim(exciting_user, user) + +async def is_too_exciting(user: int, group: int, + times: int, repo: bool) -> bool: + global exciting_user + + if user in exciting_user: + if repo: + await sv.NetworkPost.send_msg(user_id=user, + group_id=group, + message=choice(exciting_repo)) + return False + else: + if count_list(exciting_user_temp, user) == times: + delta = datetime.timedelta( + seconds=config["BotSelfConfig"]["session_exciting_time"]) + trigger = DateTrigger( + run_date=datetime.datetime.now() + delta) + + scheduler.add_job( + func=del_list, + trigger=trigger, + args=(user,), + misfire_grace_time=1, + ) + + if repo: + await sv.NetworkPost.send_msg(user_id=user, + group_id=group, + message=choice(exciting_repo)) + return False + else: + exciting_user_temp.append(user) + return True |