summaryrefslogtreecommitdiff
path: root/ATRI/plugins/anti-rubbish.py
diff options
context:
space:
mode:
✨ 更新插件,埋下bug
- 更新插件: - call-owner - code-runner - status - anime-search - tex(待修复) - 埋下bug: - service中limit作为机器人服务中的开关,目前写入文件亟待修复
Diffstat (limited to 'ATRI/plugins/anti-rubbish.py')
1 files changed, 166 insertions, 0 deletions
diff --git a/ATRI/plugins/anti-rubbish.py b/ATRI/plugins/anti-rubbish.py
new file mode 100644
index 0000000..b6ef12a
--- /dev/null
+++ b/ATRI/plugins/anti-rubbish.py
@@ -0,0 +1,166 @@
+import json
+from pathlib import Path
+from datetime import datetime
+from typing import Optional
+
+from nonebot.plugin import on_command, on_message
+from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
+
+from ATRI.log import logger
+from ATRI.rule import is_in_banlist
+from ATRI.config import nonebot_config
+from ATRI.utils.file import write_file
+from ATRI.utils.apscheduler import scheduler
+from ATRI.exceptions import WriteError
+
+
+RUBBISH_DIR = Path('.') / 'ATRI' / 'data' / 'database' / 'anti-rubbish'
+now_time = datetime.now().strftime('%Y%m%d-%H%M%S')
+
+
+# 365 x 24 不间断监听啥b发屎人
+# 日您🐎的,自己在厕所吃完自助餐还不够,还要分享给群友
+# 👴就搁这公示公示这些啥b
+# 每日0点如有发屎记录则全群通报
+anti_rubbish = on_message()
+
+@anti_rubbish.handle()
+async def _anti_rubbish(bot: Bot, event: GroupMessageEvent) -> None:
+ msg = str(event.message).strip()
+ user = int(event.user_id)
+ group = event.group_id
+ key_word: dict = NoobRubbish.get_keyword()
+ noob_data: dict = NoobRubbish.read_noobs(group)
+ noob_data.setdefault(user, {})
+
+ for key in key_word.keys():
+ if key in msg:
+ noob_data[user].setdefault(key, 1)
+ noob_data[user][key] += 1
+ await write_file(NoobRubbish._get_noob(group), json.dumps(noob_data))
+ logger.info(
+ f"GET 吃屎人 {user}[@群{group}] 第{noob_data[user][key]}次: {msg}")
+
+
+rubbish = on_command("/rubbish", rule=is_in_banlist())
+
+@rubbish.handle()
+async def _rubbish(bot: Bot, event: GroupMessageEvent) -> None:
+ cmd = str(event.message).split(" ")
+ user = int(event.user_id)
+ group = event.group_id
+
+ if cmd[0] == "list":
+ noob_data: dict = NoobRubbish.read_noobs(group)
+ noob_list = ""
+ for key in noob_data.keys():
+ noob_list += f"{key}\n"
+
+ if not noob_list:
+ await rubbish.finish("此群很干净呢~!")
+ else:
+ msg = (
+ f"截至{now_time}\n"
+ f"吃过厕所自助餐的有:\n"
+ ) + noob_list
+ await rubbish.finish(msg)
+
+ elif cmd[0] == "read":
+ try:
+ user = cmd[1]
+ except:
+ await rubbish.finish("格式/rubbish read qq")
+
+ noob_data: dict = NoobRubbish.read_noob(group, int(user))
+ if not noob_data:
+ await rubbish.finish("该群友很干净!")
+ else:
+ noob_keys = ""
+ for key in noob_data.keys():
+ noob_keys += f"{key}-{noob_data[key]}次\n"
+ msg = (
+ f"截至{now_time}\n"
+ f"此群友吃的屎的种类,以及次数:\n"
+ ) + noob_keys
+ await rubbish.finish(msg)
+
+ elif cmd[0] == "update":
+ if user not in nonebot_config["superusers"]:
+ await rubbish.finish("没权限呢...")
+
+ key_word = cmd[1]
+ data = NoobRubbish.get_keyword()
+ data[key_word] = now_time
+ await NoobRubbish.store_keyword(data)
+ await rubbish.finish(f"勉強しました!\n[{key_word}]")
+
+ elif cmd[0] == "del":
+ if user not in nonebot_config["superusers"]:
+ await rubbish.finish("没权限呢...")
+
+ key_word = cmd[1]
+ data = NoobRubbish.get_keyword()
+ del data[key_word]
+ await NoobRubbish.store_keyword(data)
+ await rubbish.finish(f"清除~![{key_word}]")
+
+ else:
+ await rubbish.finish("请检查格式~!详细请查阅文档")
+
+
+# @scheduler.scheduled_job(
+# "cron",
+# hour=0,
+# misfire_grace_time=120
+# )
+# async def _():
+# group = GroupMessageEvent.group_id
+# noob_data = NoobRubbish.read_noobs(group)
+
+
+class NoobRubbish:
+ @staticmethod
+ def _get_noob(group: Optional[int] = None) -> Path:
+ file_name = f"{now_time}.noob.json"
+ GROUP_DIR = RUBBISH_DIR / f"{group}"
+ path = GROUP_DIR / file_name
+
+ if not GROUP_DIR.exists():
+ GROUP_DIR.mkdir()
+ return path
+
+ @classmethod
+ def read_noobs(cls, group: int) -> dict:
+ try:
+ data = json.loads(cls._get_noob(group).read_bytes())
+ except:
+ data = {}
+ return data
+
+ @classmethod
+ def read_noob(cls, group: int, user: Optional[int]) -> dict:
+ try:
+ data = json.loads(cls._get_noob(group).read_bytes())
+ except:
+ data = {}
+ data.setdefault(user, {})
+ return data
+
+ @staticmethod
+ def get_keyword() -> dict:
+ file_name = "keyword.json"
+ path = RUBBISH_DIR / file_name
+ try:
+ data = json.loads(path.read_bytes())
+ except:
+ data = {}
+ return data
+
+ @staticmethod
+ async def store_keyword(data: dict) -> None:
+ file_name = "keyword.json"
+ path = RUBBISH_DIR / file_name
+ try:
+ await write_file(path, json.dumps(data))
+ except WriteError:
+ raise WriteError("Writing file failed!")