summaryrefslogtreecommitdiff
path: root/ATRI/plugins/key-repo/data_source.py
blob: 3dd331d029d2827db7e6f7786035a6e51f11b0fa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import json
from pathlib import Path
from typing import Optional


KEYREPO_DIV = Path('.') / 'ATRI' / 'data' / 'database' / 'KeyRepo'
os.makedirs(KEYREPO_DIV, exist_ok=True)


def load_key_data() -> dict:
    file_name = "data.json"
    path = KEYREPO_DIV / file_name
    try:
        data = json.loads(path.read_bytes())
    except:
        with open(path, 'w') as r:
            r.write(json.dumps({}))
        data = {}
    return data


def load_key_temp_data() -> list:
    file_name = "data.temp.json"
    path = KEYREPO_DIV / file_name
    try:
        data = json.loads(path.read_bytes())
    except:
        with open(path, 'w') as r:
            r.write(json.dumps([]))
        data = []
    return data


def load_key_history() -> list:
    file_name = "data.history.json"
    path = KEYREPO_DIV / file_name
    try:
        data = json.loads(path.read_bytes())
    except:
        with open(path, 'w') as r:
            r.write(json.dumps([]))
        data = []
    return data


def save_key_data(d: dict) -> None:
    file_name = "data.json"
    path = KEYREPO_DIV / file_name
    with open(path, 'w') as r:
        r.write(json.dumps(d))


def save_key_temp_data(d: list) -> None:
    file_name = "data.temp.json"
    path = KEYREPO_DIV / file_name
    with open(path, 'w') as r:
        r.write(json.dumps(d))


def save_key_history_data(d: list) -> None:
    file_name = "data.history.json"
    path = KEYREPO_DIV / file_name
    with open(path, 'w') as r:
        r.write(json.dumps(d))


def add_key(key: str, repo: str) -> str:
    data = load_key_data()
    data_1 = data.get(key, [])
    if repo in data_1:
        return "该回复已存在~!"
    data_1.append(repo)
    data[key] = data_1
    save_key_data(data)
    return "成功,又学到新知识了~!"


def add_key_temp(d: dict) -> None:
    data: list = load_key_temp_data()
    data.append(d)
    save_key_temp_data(data)
    add_history(d, False)


def add_history(d: dict, p: bool = True) -> None:
    d['pass'] = p
    data: list = load_key_history()
    data.append(d)
    save_key_history_data(data)


def del_key(key: str, repo: str) -> str:
    data = load_key_data()
    if repo == 'isALL':
        del data[key]
        msg = f"成功删除关键词[{key}]下所有回复..."
    else:
        data_1 = data.get(key, [])
        try:
            data_1.remove(key)
        except KeyError:
            raise KeyError('Find repo error.')
        data[key] = data_1
        msg = f"成功删除关键词[{key}]下回复:{repo}"
    save_key_data(data)
    return msg


def del_key_temp(d: dict) -> bool:
    data = load_key_temp_data()
    if d in data:
        data.remove(d)
        return True
    else:
        return False