1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
import json
from nonebot.params import ArgPlainText
from nonebot.adapters.onebot.v11 import PrivateMessageEvent, GroupMessageEvent
from ATRI.config import BotSelfConfig
from ATRI.message import MessageBuilder
from ATRI.exceptions import WriteFileError
from .data_source import Console, CONSOLE_DIR
from .models import AuthData
gen_console_key = Console().cmd_as_group("auth", "获取进入网页后台的凭证")
@gen_console_key.got("is_pub_n", "咱的运行环境是否有公网(y/n)")
async def _(event: PrivateMessageEvent, is_pub_n: str = ArgPlainText("is_pub_n")):
data_path = CONSOLE_DIR / "data.json"
if not data_path.is_file():
with open(data_path, "w", encoding="utf-8") as w:
w.write(json.dumps(dict()))
if is_pub_n != "y":
host = str(await Console().get_host_ip(False))
await gen_console_key.send("没有公网吗...嗯知道了")
else:
host = str(await Console().get_host_ip(True))
port = BotSelfConfig.port
token = Console().get_random_str(20)
data = json.loads(data_path.read_bytes())
data["data"] = AuthData(token=token).dict()
with open(data_path, "w", encoding="utf-8") as w:
w.write(json.dumps(data))
msg = (
MessageBuilder("控制台信息已生成!")
.text(f"请访问: {host}:{port}")
.text(f"Token: {token}")
.text("该 token 有效时间为 15min")
)
await gen_console_key.finish(msg)
@gen_console_key.handle()
async def _(event: GroupMessageEvent):
await gen_console_key.finish("请私戳咱获取(")
del_console_key = Console().cmd_as_group("del", "销毁进入网页后台的凭证")
@del_console_key.got("is_sure_d", "...你确定吗(y/n)")
async def _(is_sure: str = ArgPlainText("is_sure_d")):
if is_sure != "y":
await del_console_key.finish("反悔了呢...")
df = CONSOLE_DIR / "data.json"
if not df.is_file():
await del_console_key.finish("你还没向咱索取凭证呢.../con.auth 以获取")
try:
data: dict = json.loads(df.read_bytes())
del data["data"]
with open(df, "w", encoding="utf-8") as w:
w.write(json.dumps(data))
except Exception:
await del_console_key.send("销毁失败了...请至此处自行删除文件:\n" + str(df))
raise WriteFileError("Writing / Reading file: " + str(df) + " failed!")
await del_console_key.finish("销毁成功!如需再次获取: /con.auth")
from ATRI import driver as dr
from .data_source import init_resource
from .driver import init_driver
dr().on_startup(init_resource)
dr().on_startup(init_driver)
|