#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
'''
@File : __init__.py
@Time : 2020/11/07 14:24:57
@Author : Kyomotoi
@Contact : kyomotoiowo@gmail.com
@Github : https://github.com/Kyomotoi
@License : Copyright © 2018-2020 Kyomotoi, All Rights Reserved.
'''
__author__ = 'kyomotoi'
import os
import json
from pathlib import Path
from random import choice
from random import randint
from requests import exceptions
from nonebot.log import logger
from nonebot.rule import to_me
from nonebot.typing import Bot, Event
from nonebot.permission import SUPERUSER
from nonebot.plugin import on_command, on_message, on_notice, on_request
from ATRI.utils.utils_ban import ban
from ATRI.utils.utils_times import countX
from ATRI.utils.utils_yml import load_yaml
from ATRI.utils.utils_error import errorRepo
from ATRI.utils.utils_textcheck import Textcheck
from ATRI.utils.utils_history import saveMessage
from ATRI.utils.utils_request import request_api_text
from ATRI.utils.utils_rule import check_banlist, check_switch
CONFIG_PATH = Path('.') / 'config.yml'
config = load_yaml(CONFIG_PATH)['bot']
master = config['superusers']
# 收集 bot 所在群的聊天记录
MessageSave = on_message()
@MessageSave.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
group = str(event.group_id)
message = str(event.message)
message_id = str(event.id)
if group == "None":
saveMessage(message_id, message, user)
else:
saveMessage(message_id, message, user, group)
logger.opt(colors=True).info(
f"GROUP[{group}]: USER({user}) > Message: ({message}) Saved successfully"
)
# Call bot
callMe = on_message(rule=check_banlist())
@callMe.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
msg = str(event.raw_event['raw_message']).strip()
if "萝卜子" in msg:
await bot.send(event, "萝卜子是对咱的蔑称!!")
elif msg in config['nickname']:
await callMe.finish("叫咱有啥事吗w")
# 戳 一 戳
pokehah = on_command("戳一戳", rule=to_me() & check_banlist())
@pokehah.handle()
async def _poke(bot: Bot, event: Event, state: dict) -> None:
msg = choice([
"你再戳!", "?再戳试试?", "别戳了别戳了再戳就坏了555", "我爪巴爪巴,球球别再戳了", "你戳你🐎呢?!",
"那...那里...那里不能戳...绝对...", "(。´・ω・)ん?", "有事恁叫我,别天天一个劲戳戳戳!", "欸很烦欸!你戳🔨呢",
"?"
])
await pokehah.finish(msg)
async def poke_(bot: Bot, event: Event, state: dict) -> bool:
try:
return (event.raw_event['sub_type'] == 'poke'
and event.raw_event['target_id'] == int(event.self_id)
and event.raw_event['notice_type'] == 'notify')
except:
return False
poke = on_notice(rule=check_banlist() & poke_, block=True)
poke.handle()(_poke)
# 处理 进 / 退 群事件
groupEvent = on_notice()
@groupEvent.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
if event.raw_event['notice_type'] == 'group_increase':
if event.user_id != int(event.self_id):
await groupEvent.finish(
f'好欸!事新人[CQ:at,qq={event.raw_event["user_id"]}]')
elif event.user_id == int(event.self_id):
await groupEvent.finish("在下 ATRI,你可以叫我 亚托莉 或 アトリ !~w")
if event.raw_event['notice_type'] == 'group_decrease':
if event.user_id != int(event.self_id):
await groupEvent.finish(f'[{event.user_id}] 离开了我们...')
elif event.user_id == int(event.self_id):
for sup in master:
await bot.send_private_msg(
user_id=sup,
message=f'呜呜呜,主人,咱被群[{event.group_id}]扔出来了...')
# 处理 加好友 / 拉群 事件
selfEvent = on_request(rule=check_banlist())
FRIEND_ADD = 0
GROUP_INVITE = 0
@selfEvent.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
print(event.raw_event)
flag = event.raw_event['flag']
req_type = event.raw_event['request_type']
if req_type == 'friend':
for sup in master:
msg0 = '主人,收到一条好友请求:\n'
msg0 += f"请求人:{event.raw_event['user_id']}\n"
msg0 += f"申请信息:{event.raw_event['comment']}\n"
if FRIEND_ADD == 0:
msg0 += '由于主人未允许咱添加好友,已回拒'
await bot.set_friend_add_request(flag=flag, approve=False)
else:
msg0 += '由于主人已同意咱添加好友,已通过'
await bot.set_friend_add_request(flag=flag, approve=True)
await bot.send_private_msg(user_id=sup, message=msg0)
elif req_type == 'group' and event.raw_event['sub_type'] == 'invite':
for sup in master:
msg0 = '主人,收到一条群邀请:\n'
msg0 += f"邀请人:{event.raw_event['user_id']}\n"
msg0 += f"目标群:{event.raw_event['group_id']}\n"
if GROUP_INVITE == 0:
msg0 += '由于主人未允许咱添加群聊,已回拒'
await bot.set_group_add_request(
flag=flag,
sub_type=event.raw_event['sub_type'],
approve=False,
reason=f'ねね..ごんめね...\n主人不允许咱添加其他群聊...\n如需寻求帮助,请联系维护者:{sup}'
)
else:
msg0 += '由于主人已允许咱添加群聊,已同意'
await bot.set_group_add_request(
flag=flag,
sub_type=event.raw_event['sub_type'],
approve=True)
await bot.send_private_msg(user_id=sup, message=msg0)
# 控制 加好友 / 拉群 认证,默认关闭
controlSelfEvent = on_command('selfevent', permission=SUPERUSER)
@controlSelfEvent.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
args = str(event.message).strip()
msg0 = ''
global FRIEND_ADD, GROUP_INVITE
if not args:
msg0 = '-==ATRI INVITE Control System==-\n'
msg0 += 'Tips:\n'
msg0 += ' - For SUPERUSERS\n'
msg0 += ' - Normal all false\n'
msg0 += 'Usage:\n'
msg0 += ' - selfevent group-true/false\n'
msg0 += ' - selfevent friend-true/false\n'
await controlSelfEvent.finish(msg0)
if 'group-' in args:
if 'true' in args:
GROUP_INVITE = 1
elif 'friend-' in args:
if 'true' in args:
FRIEND_ADD = 1
else:
await controlSelfEvent.finish(msg0)
await controlSelfEvent.finish('DONE!')
# 口臭一下
fxxkMe = on_command('口臭一下',
aliases={'口臭', '骂我'},
rule=to_me() & check_banlist())
list_M = []
@fxxkMe.handle() # type: ignore
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
global list_M
if countX(list_M, user) == 3:
await bot.send(event,
"不是??你这么想被咱骂的嘛??被咱骂就这么舒服的吗?!该......你该不会是.....M吧!")
elif countX(list_M, user) == 6:
await bot.send(event, "给我适可而止阿!?")
list_M = list(set(list_M))
else:
list_M.append(user)
URL = "https://nmsl.shadiao.app/api.php?level=min&lang=zh_cn"
msg = ""
try:
msg = request_api_text(URL)
except exceptions:
await fxxkMe.finish(errorRepo("请求错误"))
await fxxkMe.finish(msg)
# Hitokoto
hitokoto = on_command('一言',
aliases={'抑郁一下', '网抑云'},
rule=to_me() & check_banlist())
list_Y = []
@hitokoto.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
user = str(event.user_id)
global list_Y
if countX(list_Y, user) == 3:
await bot.send(event, "额......需要咱安慰一下嘛~?")
elif countX(list_Y, user) == 6:
await bot.send(event, "如果心里感到难受就赶快去睡觉奥!别再憋自己了!")
list_Y = list(set(list_Y))
else:
list_Y.append(user)
URL = "https://api.imjad.cn/hitokoto/?cat=a&charset=utf-8&length=50&encode=json&fun=sync&source="
info = {}
try:
info = json.loads(request_api_text(URL))
except:
await hitokoto.finish(errorRepo("请求错误"))
await hitokoto.finish(info["hitokoto"])
# laughFunny = on_command('来句笑话', rule=check_banlist())
# @laughFunny.handle() #type: ignore
# async def _(bot: Bot, event: Event, state: dict) -> None:
# name = event.sender['nickname']
# result = []
# LAUGH_FILE = Path('.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'laugh.txt'
# with open(LAUGH_FILE, 'r', encoding='utf-8') as f:
# for line in f:
# result.append(line.strip('\n'))
# resu = choice(result)
# print(resu%name)
# 扔漂流瓶
plugin_name = 'drifting-bottle'
DRIFTING_BOTTLE_PATH = Path(
'.') / 'ATRI' / 'plugins' / 'plugin_chat' / 'drifting_bottle.json'
driftingBottle = on_command('扔漂流瓶',
rule=to_me() & check_banlist()
& check_switch(plugin_name, True))
@driftingBottle.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
args = str(event.message).strip()
if args:
state['args'] = args
@driftingBottle.got('args', prompt='请告诉咱瓶中内容~!')
async def _(bot: Bot, event: Event, state: dict) -> None:
args = state['args']
user = event.user_id
group = event.group_id
if not DRIFTING_BOTTLE_PATH.is_file():
with open(DRIFTING_BOTTLE_PATH, 'w') as f:
f.write(json.dumps({}))
with open(DRIFTING_BOTTLE_PATH, 'r') as f:
data = json.load(f)
num = len(data)
data[num + 1] = [user, group, args]
with open(DRIFTING_BOTTLE_PATH, 'w') as f:
f.write(json.dumps(data))
await driftingBottle.finish('漂流瓶已飘向远方...')
# 捡漂流瓶
getDriftingBottle = on_command('捞漂流瓶',
rule=to_me() & check_banlist()
& check_switch(plugin_name, True))
@getDriftingBottle.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
if not DRIFTING_BOTTLE_PATH.is_file():
with open(DRIFTING_BOTTLE_PATH, 'w') as f:
f.write(json.dumps({}))
with open(DRIFTING_BOTTLE_PATH, 'r') as f:
data = json.load(f)
num = len(data)
if not num:
await getDriftingBottle.finish('暂无漂流瓶可供打捞呢~(')
num = randint(1, num)
bottle = data[str(num)]
msg = bottle[2]
msg0 = f'[CQ:at,qq={event.user_id}]\n'
msg0 += f'漂流瓶[{num}]内容如下:\n'
msg0 += msg
await getDriftingBottle.finish(msg0)
# 清除漂流瓶
delDriftingBottle = on_command('清除漂流瓶',
rule=check_banlist(),
permission=SUPERUSER)
@delDriftingBottle.handle()
async def _(bot: Bot, event: Event, state: dict) -> None:
args = str(event.message).strip()
if not args:
msg0 = 'Drifting Bottle:\n'
msg0 += '*For SUPERUSERS'
msg0 += '- delall\n'
msg0 += '- del [num]\n'
msg0 += 'eg: 清除漂流瓶 del 123'
await delDriftingBottle.finish(msg0)
if not DRIFTING_BOTTLE_PATH.is_file():
with open(DRIFTING_BOTTLE_PATH, 'w') as f:
f.write(json.dumps({}))
await delDriftingBottle.finish('清除了个寂寞...')
with open(DRIFTING_BOTTLE_PATH, 'r') as f:
data = json.load(f)
if args[0] == 'delall':
os.remove(os.path.abspath(DRIFTING_BOTTLE_PATH))
elif args[0] == 'del':
try:
del data[args[1]]
except:
await delDriftingBottle.finish(errorRepo('清除失败了...'))
with open(DRIFTING_BOTTLE_PATH, 'w') as f:
f.write(json.dumps(data))
f.close()
result = args[1] if args[0] == 'del' else "ALL"
await delDriftingBottle.finish(
f'完成啦!成功清除漂流瓶[{result}],目前还剩余[{len(data)}]个~')
# 舆情监听
#publicOpinion = on_message(rule=check_banlist(True))
#ban_temp_list = []
#@publicOpinion.handle()
#async def _(bot: Bot, event: Event, state: dict) -> None:
#global ban_temp_list
#msg = str(event.message)
#user = str(event.user_id)
# 检查是否满足条件
#if countX(ban_temp_list,
#user) == Textcheck().get_times(str(Textcheck().check(msg))):
#ban_temp_list = list(set(ban_temp_list))
#ban(user)
#if Textcheck().check(msg) == "False":
#return
#if Textcheck().check(msg):
#if user in master:
#await publicOpinion.finish("主人你给我注意点阿?!你这可是在死亡边缘试探呢!!")
#ban_temp_list.append(int(user))
#await publicOpinion.finish(Textcheck().check(msg))