summaryrefslogtreecommitdiff
path: root/ATRI/plugins/kimo/data_source.py
blob: 26272e5bed629fd7d03b7709c1b54d87bcc111a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import json
from pathlib import Path
from jieba import posseg
from random import choice, shuffle

from ATRI.log import log
from ATRI.utils import request
from ATRI.exceptions import ReadFileError, WriteFileError


CHAT_PATH = Path(".") / "data" / "plugins" / "kimo"
CHAT_PATH.mkdir(parents=True, exist_ok=True)
KIMO_URL = "https://jsd.imki.moe/gh/Kyomotoi/AnimeThesaurus/data.json"


class Kimo:
    @staticmethod
    async def _request(url: str) -> dict:
        res = await request.get(url)
        data = res.json()
        return data

    @classmethod
    async def _generate_data(cls) -> None:
        file_name = "kimo.json"
        path = CHAT_PATH / file_name
        if not path.is_file():
            log.warning("插件 kimo 缺少资源, 装载中...")
            data = await cls._request(KIMO_URL)
            try:
                with open(path, "w", encoding="utf-8") as w:
                    w.write(json.dumps(data, indent=4))
            except Exception:
                raise WriteFileError("Writing kimo words failed!")

        log.success("插件 kimo 资源装载完成")

    @classmethod
    async def _load_data(cls) -> dict:
        file_name = "kimo.json"
        path = CHAT_PATH / file_name
        if not path.is_file():
            await cls._generate_data()

        with open(path, "r", encoding="utf-8") as r:
            data = json.loads(r.read())
        return data

    @classmethod
    async def update_data(cls) -> None:
        log.info("更新闲聊词库ing...")
        file_name = "kimo.json"
        path = CHAT_PATH / file_name
        if not path.is_file():
            await cls._generate_data()

        updata_data = await cls._request(KIMO_URL)
        data = json.loads(path.read_bytes())
        for i in updata_data:
            if i not in data:
                data[i] = updata_data[i]

        with open(path, "w", encoding="utf-8") as w:
            w.write(json.dumps(data, indent=4))
        log.info("kimo词库更新完成")

    @staticmethod
    def name_is(user_id: str, new_name: str):
        file_name = "users.json"
        path = CHAT_PATH / file_name
        if not path.is_file():
            with open(path, "w", encoding="utf-8") as w:
                w.write(json.dumps({}))
            data = {}

        data = json.loads(path.read_bytes())
        data[user_id] = new_name
        try:
            with open(path, "w", encoding="utf-8") as w:
                w.write(json.dumps(data, indent=4))
        except Exception:
            raise ReadFileError("Update user name failed!")

    @staticmethod
    def load_name(user_id: str) -> str:
        file_name = "users.json"
        path = CHAT_PATH / file_name
        if not path.is_file():
            with open(path, "w", encoding="utf-8") as w:
                w.write(json.dumps({}))
            return "你"

        data = json.loads(path.read_bytes())
        try:
            result = data[user_id]
        except Exception:
            result = "你"
        return result

    @classmethod
    async def deal(cls, msg: str, user_id: str) -> str:
        keywords = posseg.lcut(msg)
        shuffle(keywords)

        data = await cls._load_data()

        repo = str()
        for i in keywords:
            a = i.word
            b = list(a)
            try:
                if b[0] == b[1]:
                    a = b[0]
            except Exception:
                pass
            if a in data:
                repo = data.get(a, str())

        if not repo:
            temp_data = list(data)
            shuffle(temp_data)
            for i in temp_data:
                if i in msg:
                    repo = data.get(i, str())

        a = choice(repo) if type(repo) is list else repo
        user_name = cls.load_name(user_id)
        repo = a.replace("你", user_name)
        return repo