import json import time import sqlite3 from nonebot import session import psutil from pathlib import Path from datetime import datetime import nonebot from nonebot import on_command, CommandSession import config # type: ignore bot = nonebot.get_bot() master = config.MASTER() file = Path('.') / 'ATRI' / 'plugins' / 'noobList' / 'noobList.json' file1 = Path('.') / 'ATRI' / 'plugins' / 'noobList' / 'noobGroup.json' @on_command('data_list', aliases = ['数据总量'], only_to_me = False) async def _(session: CommandSession): user = session.event.user_id group = session.event.group_id try: with open(file1, 'r') as f: data = json.load(f) except: data = {} try: with open(file, 'r') as f: data1 = json.load(f) except: data1 = {} if str(group) in data.keys(): pass else: if str(user) in data1.keys(): pass else: con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'sqlite' / 'setu' / 'normal.db') # setu-normal cur = con.cursor() cur.execute("select * from normal") data_normal = len(cur.fetchall()) con.close() con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'sqlite' / 'setu' / 'nearR18.db') # setu-nearR18 cur = con.cursor() cur.execute("select * from nearR18") data_nearR18 = len(cur.fetchall()) con.close() con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'sqlite' / 'setu' / 'r18.db') # setu-r18 cur = con.cursor() cur.execute("select * from r18") data_r18 = len(cur.fetchall()) con.close() con = sqlite3.connect(Path('.') / 'ATRI' / 'data' / 'sqlite' / 'cloudmusic' / 'cloudmusic.db') # cloudmusic cur = con.cursor() cur.execute("select * from cloudmusic") data_cloudmusic = len(cur.fetchall()) con.close() with open(Path('.') / 'ATRI' / 'plugins' / 'LearnRepo' / 'LearnRepo.json', 'r') as f: data = json.load(f) data_repo = len(data) await session.send( f"""目前螃蟹™数据库收录了: 涩图: normal: {data_normal} nearR18: {data_nearR18} r18:{data_r18} 网抑云语录:{data_cloudmusic} 词汇量:{data_repo}""" ) @on_command('look_noobList', aliases = ['查看黑名单'], only_to_me = False) async def _(session: CommandSession): start = time.perf_counter() try: with open(file, 'r') as f: data = json.load(f) except: data = {} try: with open(file1, 'r') as f: data0 = json.load(f) except: data0 = {} msg = f'被ATRI列入黑名单有以下列表:\n' try: msg1 = f'=====[用户]=====\n' msg += msg1 for i in data.keys(): msg0 = f'{i}\n' msg += msg0 except: end = time.perf_counter() msg0 = f"==============\nDone time: {round(end - start, 3)}s" msg += msg0 await session.send(msg) return try: msg1 = f'======[群]======\n' msg += msg1 for i in data0.keys(): msg0 = f'{i}\n' msg += msg0 except: end = time.perf_counter() msg0 = f"==============\nDone time: {round(end - start, 3)}s" msg += msg0 await session.send(msg) return end = time.perf_counter() msg0 = f"==============\nDone time: {round(end - start, 3)}s" msg += msg0 await session.send(msg) @on_command('look_power', aliases = ['查看权限组'], only_to_me = False) async def _(session: CommandSession): start = time.perf_counter() try: with open(Path('.') / 'ATRI' / 'plugins' / 'UploadSqlite' / 'sepi.json', 'r') as f: data = json.load(f) except: data = {} try: with open(Path('.') / 'ATRI' / 'plugins' / 'UploadSqlite' / 'cloudmusic.json', 'r') as f: data0 = json.load(f) except: data0 = {} msg = f'主人: {master}\n' try: msg1 = f'=====[涩批]=====\n' msg += msg1 for i in data.keys(): msg0 = f'{i}\n' msg += msg0 except: end = time.perf_counter() msg0 = f"==============\nDone time: {round(end - start, 3)}s" msg += msg0 await session.send(msg) return try: msg1 = f'====[网抑云]====\n' msg += msg1 for i in data0.keys(): msg0 = f'{i}\n' msg += msg0 except: end = time.perf_counter() msg0 = f"==============\nDone time: {round(end - start, 3)}s" msg += msg0 await session.send(msg) return end = time.perf_counter() msg0 = f"==============\nDone time: {round(end - start, 3)}s" msg += msg0 await session.send(msg) @on_command('check_status', patterns = [r"检查状态|检查运行|检查身体"]) async def _(session: CommandSession): user = session.event.user_id group = session.event.group_id try: with open(file1, 'r') as f: data = json.load(f) except: data = {} try: with open(file, 'r') as f: data1 = json.load(f) except: data1 = {} if str(group) in data.keys(): pass else: if str(user) in data1.keys(): pass else: cpu = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory().percent disk = psutil.disk_usage('/').percent inteSENT = psutil.net_io_counters().bytes_sent # type: ignore inteRECV = psutil.net_io_counters().bytes_recv # type: ignore status = 'アトリは、高性能ですから!' if cpu > 80: status = 'ATRI感觉头有点晕...' if memory > 80: status = 'ATRI感觉有点头晕并且有点累...' elif disk > 80: status = 'ATRI感觉身体要被塞满了...' await session.send(f"""ATRI Status: * cpu: {cpu}% * mem: {memory}% * disk: {disk}% * BytesSENT: {inteSENT} * BytesRECV: {inteRECV} {status}""".strip())