1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import json
from pathlib import Path
from jieba import posseg
from random import choice, shuffle
from ATRI.log import log
from ATRI.utils import request
from ATRI.exceptions import ReadFileError, WriteFileError
CHAT_PATH = Path(".") / "data" / "plugins" / "kimo"
CHAT_PATH.mkdir(parents=True, exist_ok=True)
KIMO_URL = "https://jsd.imki.moe/gh/Kyomotoi/AnimeThesaurus/data.json"
class Kimo:
@staticmethod
async def _request(url: str) -> dict:
res = await request.get(url)
data = res.json()
return data
@classmethod
async def _generate_data(cls) -> None:
file_name = "kimo.json"
path = CHAT_PATH / file_name
if not path.is_file():
log.warning("插件 kimo 缺少资源, 装载中...")
data = await cls._request(KIMO_URL)
try:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
except Exception:
raise WriteFileError("Writing kimo words failed!")
log.success("插件 kimo 资源装载完成")
@classmethod
async def _load_data(cls) -> dict:
file_name = "kimo.json"
path = CHAT_PATH / file_name
if not path.is_file():
await cls._generate_data()
with open(path, "r", encoding="utf-8") as r:
data = json.loads(r.read())
return data
@classmethod
async def update_data(cls) -> None:
log.info("更新闲聊词库ing...")
file_name = "kimo.json"
path = CHAT_PATH / file_name
if not path.is_file():
await cls._generate_data()
updata_data = await cls._request(KIMO_URL)
data = json.loads(path.read_bytes())
for i in updata_data:
if i not in data:
data[i] = updata_data[i]
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
log.info("kimo词库更新完成")
@staticmethod
def name_is(user_id: str, new_name: str):
file_name = "users.json"
path = CHAT_PATH / file_name
if not path.is_file():
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = {}
data = json.loads(path.read_bytes())
data[user_id] = new_name
try:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
except Exception:
raise ReadFileError("Update user name failed!")
@staticmethod
def load_name(user_id: str) -> str:
file_name = "users.json"
path = CHAT_PATH / file_name
if not path.is_file():
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
return "你"
data = json.loads(path.read_bytes())
try:
result = data[user_id]
except Exception:
result = "你"
return result
@classmethod
async def deal(cls, msg: str, user_id: str) -> str:
keywords = posseg.lcut(msg)
shuffle(keywords)
data = await cls._load_data()
repo = str()
for i in keywords:
a = i.word
b = list(a)
try:
if b[0] == b[1]:
a = b[0]
except Exception:
pass
if a in data:
repo = data.get(a, str())
if not repo:
temp_data = list(data)
shuffle(temp_data)
for i in temp_data:
if i in msg:
repo = data.get(i, str())
a = choice(repo) if type(repo) is list else repo
user_name = cls.load_name(user_id)
repo = a.replace("你", user_name)
return repo
|