1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
import json
from pathlib import Path
from jieba import posseg
from random import choice, shuffle
from ATRI.service import Service
from ATRI.rule import to_bot, is_in_service
from ATRI.log import logger as log
from ATRI.utils import request
from ATRI.exceptions import ReadFileError, WriteFileError
CHAT_PATH = Path(".") / "data" / "plugins" / "kimo"
CHAT_PATH.mkdir(parents=True, exist_ok=True)
KIMO_URL = "https://fastly.jsdelivr.net/gh/Kyomotoi/AnimeThesaurus/data.json"
class Kimo(Service):
def __init__(self):
Service.__init__(
self, "kimo", "好像有点涩?", rule=to_bot() & is_in_service("kimo"), priority=5
)
@staticmethod
async def _request(url: str) -> dict:
res = await request.get(url)
data = res.json()
return data
@classmethod
async def _generate_data(cls) -> None:
file_name = "kimo.json"
path = CHAT_PATH / file_name
if not path.is_file():
log.warning("未检测到词库,生成中")
data = await cls._request(KIMO_URL)
try:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
log.info("生成完成")
except Exception:
raise WriteFileError("Writing kimo words failed!")
@classmethod
async def _load_data(cls) -> dict:
file_name = "kimo.json"
path = CHAT_PATH / file_name
if not path.is_file():
await cls._generate_data()
with open(path, "r", encoding="utf-8") as r:
data = json.loads(r.read())
return data
@classmethod
async def update_data(cls) -> None:
log.info("更新闲聊词库ing...")
file_name = "kimo.json"
path = CHAT_PATH / file_name
if not path.is_file():
await cls._generate_data()
updata_data = await cls._request(KIMO_URL)
data = json.loads(path.read_bytes())
for i in updata_data:
if i not in data:
data[i] = updata_data[i]
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
log.info("kimo词库更新完成")
@staticmethod
def name_is(user_id: str, new_name: str):
file_name = "users.json"
path = CHAT_PATH / file_name
if not path.is_file():
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
data = {}
data = json.loads(path.read_bytes())
data[user_id] = new_name
try:
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps(data, indent=4))
except Exception:
raise ReadFileError("Update user name failed!")
@staticmethod
def load_name(user_id: str) -> str:
file_name = "users.json"
path = CHAT_PATH / file_name
if not path.is_file():
with open(path, "w", encoding="utf-8") as w:
w.write(json.dumps({}))
return "你"
data = json.loads(path.read_bytes())
try:
result = data[user_id]
except Exception:
result = "你"
return result
@classmethod
async def deal(cls, msg: str, user_id: str) -> str:
keywords = posseg.lcut(msg)
shuffle(keywords)
data = await cls._load_data()
repo = str()
for i in keywords:
a = i.word
b = list(a)
try:
if b[0] == b[1]:
a = b[0]
except Exception:
pass
if a in data:
repo = data.get(a, str())
if not repo:
temp_data = list(data)
shuffle(temp_data)
for i in temp_data:
if i in msg:
repo = data.get(i, str())
a = choice(repo) if type(repo) is list else repo
user_name = cls.load_name(user_id)
repo = a.replace("你", user_name)
return repo
|