diff options
-rw-r--r-- | ATRI/plugins/geoip.py | 68 |
1 files changed, 45 insertions, 23 deletions
diff --git a/ATRI/plugins/geoip.py b/ATRI/plugins/geoip.py index bbe8d60..cc822a9 100644 --- a/ATRI/plugins/geoip.py +++ b/ATRI/plugins/geoip.py @@ -1,33 +1,55 @@ +from geoip2.webservice import AsyncClient + from nonebot.params import ArgStr -import geoip2.webservice + +from ATRI import conf, driver +from ATRI.log import log from ATRI.service import Service -from ATRI import conf +from ATRI.message import MessageBuilder -geoip = Service("GEOIP查询").document("search ip in MaxMind GEOIP databases") +LANG = "zh-CN" -query_geoip = geoip.on_command("ip查询", "查询IP的地理位置", aliases={"IP查询", "查询IP"}) -LANG = "zh-CN" +geoip = Service("IP查询").document("通过 geoip 查询 IP 信息") + + +query_geoip = geoip.on_command("ip查询", "查询IP的地理位置", aliases={"IP查询", "查询IP"}) @query_geoip.got("ip_address", prompt="地址是?(支持ipv4/ipv6)") async def _(ip_address: str = ArgStr()): - with geoip2.webservice.Client( - conf.GeoIP.account_id, conf.GeoIP.license_key, host="geolite.info" - ) as client: - await query_geoip.send("正在查询...请稍候") - response = client.city(ip_address) - country = response.country.names[LANG] - city = response.city.names[LANG] - org = response.traits.autonomous_system_organization - network = str(response.traits.network) - subdivision = "" - if subs := response.subdivisions: - subdivision = subs[0].names[LANG] - await query_geoip.finish( - f"IP: {ip_address}\n" - f"{country}{subdivision}{city}\n" - f"运营商{org}\n" - f"网段{network}" - ) + await query_geoip.send("正在查询...请稍候") + + try: + async with AsyncClient( + conf.GeoIP.account_id, conf.GeoIP.license_key, host="geolite.info" + ) as client: + resp = await client.city(ip_address) + + country = resp.country.names[LANG] + city = resp.city.names[LANG] + org = resp.traits.autonomous_system_organization + network = resp.traits.network + subd = str() + if subs := resp.subdivisions: + subd = subs[0].names[LANG] + + result = ( + MessageBuilder(f"IP: {ip_address}") + .text(f"{country}{subd}{city}") + .text(f"运营商: {org}") + .text(f"网段: {network}") + ) + except Exception: + result = "查询失败..." + + await query_geoip.finish(result) + + +def _check_need(): + if not conf.GeoIP.account_id or not conf.GeoIP.license_key: + log.warning("插件 IP查询 所需的设置未配置, 将被全局禁用, 填写后请手动启用") + + +driver().on_startup(_check_need) |