1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
from ATRI.database import ThesaurusStoragor, ThesaurusAuditList
class DBForTS:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def add_item(self, _id: str, group_id: int):
await ThesaurusStoragor.create(_id=_id, group_id=group_id)
async def update_item(self, _id: str, group_id: int, update_map: dict):
await ThesaurusStoragor.filter(_id=_id, group_id=group_id).update(**update_map)
async def del_item(self, query_map: dict):
await ThesaurusStoragor.filter(**query_map).delete()
async def get_item_list(self, query_map: dict) -> list:
return await ThesaurusStoragor.filter(**query_map)
async def get_all_items(self) -> list:
return await ThesaurusStoragor.all()
class DBForTAL:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def add_item(self, _id: str, group_id: int):
await ThesaurusAuditList.create(_id=_id, group_id=group_id)
async def update_item(self, _id: str, group_id: int, update_map: dict):
await ThesaurusAuditList.filter(_id=_id, group_id=group_id).update(**update_map)
async def del_item(self, query_map: dict):
await ThesaurusAuditList.filter(**query_map).delete()
async def get_item_list(self, query_map: dict) -> list:
return await ThesaurusAuditList.filter(**query_map)
async def get_all_items(self) -> list:
return await ThesaurusAuditList.all()
|