summaryrefslogtreecommitdiff
path: root/ATRI
diff options
context:
space:
mode:
Diffstat (limited to 'ATRI')
-rw-r--r--ATRI/plugins/geoip.py68
1 files changed, 45 insertions, 23 deletions
diff --git a/ATRI/plugins/geoip.py b/ATRI/plugins/geoip.py
index bbe8d60..cc822a9 100644
--- a/ATRI/plugins/geoip.py
+++ b/ATRI/plugins/geoip.py
@@ -1,33 +1,55 @@
+from geoip2.webservice import AsyncClient
+
from nonebot.params import ArgStr
-import geoip2.webservice
+
+from ATRI import conf, driver
+from ATRI.log import log
from ATRI.service import Service
-from ATRI import conf
+from ATRI.message import MessageBuilder
-geoip = Service("GEOIP查询").document("search ip in MaxMind GEOIP databases")
+LANG = "zh-CN"
-query_geoip = geoip.on_command("ip查询", "查询IP的地理位置", aliases={"IP查询", "查询IP"})
-LANG = "zh-CN"
+geoip = Service("IP查询").document("通过 geoip 查询 IP 信息")
+
+
+query_geoip = geoip.on_command("ip查询", "查询IP的地理位置", aliases={"IP查询", "查询IP"})
@query_geoip.got("ip_address", prompt="地址是?(支持ipv4/ipv6)")
async def _(ip_address: str = ArgStr()):
- with geoip2.webservice.Client(
- conf.GeoIP.account_id, conf.GeoIP.license_key, host="geolite.info"
- ) as client:
- await query_geoip.send("正在查询...请稍候")
- response = client.city(ip_address)
- country = response.country.names[LANG]
- city = response.city.names[LANG]
- org = response.traits.autonomous_system_organization
- network = str(response.traits.network)
- subdivision = ""
- if subs := response.subdivisions:
- subdivision = subs[0].names[LANG]
- await query_geoip.finish(
- f"IP: {ip_address}\n"
- f"{country}{subdivision}{city}\n"
- f"运营商{org}\n"
- f"网段{network}"
- )
+ await query_geoip.send("正在查询...请稍候")
+
+ try:
+ async with AsyncClient(
+ conf.GeoIP.account_id, conf.GeoIP.license_key, host="geolite.info"
+ ) as client:
+ resp = await client.city(ip_address)
+
+ country = resp.country.names[LANG]
+ city = resp.city.names[LANG]
+ org = resp.traits.autonomous_system_organization
+ network = resp.traits.network
+ subd = str()
+ if subs := resp.subdivisions:
+ subd = subs[0].names[LANG]
+
+ result = (
+ MessageBuilder(f"IP: {ip_address}")
+ .text(f"{country}{subd}{city}")
+ .text(f"运营商: {org}")
+ .text(f"网段: {network}")
+ )
+ except Exception:
+ result = "查询失败..."
+
+ await query_geoip.finish(result)
+
+
+def _check_need():
+ if not conf.GeoIP.account_id or not conf.GeoIP.license_key:
+ log.warning("插件 IP查询 所需的设置未配置, 将被全局禁用, 填写后请手动启用")
+
+
+driver().on_startup(_check_need)