159 lines
5.9 KiB
Python
159 lines
5.9 KiB
Python
"""
|
||
ipchecker.ipchecker
|
||
|
||
提供 CheckIP(ip, user_agents_list) -> dict 功能。
|
||
目标:向 https://scamalytics.com/ip/{ip} 请求页面(随机或指定 User-Agent),
|
||
解析页面中显示的 "IP Fraud Risk API" JSON 块并返回一个字典,至少包含:
|
||
{"ip": "...", "score": "...", "risk": "...", ...}
|
||
如果解析失败,会返回 {'ip': ip, 'error': '...'} 格式的结果。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
import requests
|
||
import random
|
||
import re
|
||
import json
|
||
from typing import List, Dict, Any, Optional
|
||
|
||
_DEFAULT_USER_AGENTS = [
|
||
# 提供若干常见 UA 供随机选择;CLI 也允许用户传入自定义列表。
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.3",
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1.1 Safari/605.1.1",
|
||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
]
|
||
|
||
_REQUEST_TIMEOUT = 15 # seconds
|
||
|
||
def _choose_user_agent(user_agents: Optional[List[str]]) -> str:
|
||
if user_agents:
|
||
return random.choice(user_agents)
|
||
return random.choice(_DEFAULT_USER_AGENTS)
|
||
|
||
def _fetch_page(ip: str, user_agent: str, session: Optional[requests.Session] = None) -> str:
|
||
url = f"https://scamalytics.com/ip/{ip}"
|
||
headers = {
|
||
"User-Agent": user_agent,
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
}
|
||
s = session or requests.Session()
|
||
resp = s.get(url, headers=headers, timeout=_REQUEST_TIMEOUT)
|
||
resp.raise_for_status()
|
||
return resp.text
|
||
|
||
def _extract_json_block_from_text(text: str) -> Optional[str]:
|
||
"""
|
||
在页面文本中寻找 'IP Fraud Risk API' 后面紧跟的 JSON 对象块。
|
||
我们尽量用“找到第一个 '{' 并用括号配对提取完整 JSON” 的方式,
|
||
以便处理多行格式化 JSON。
|
||
返回 JSON 字符串(如果成功),否则 None。
|
||
"""
|
||
# 定位关键词
|
||
marker_pos = text.find("IP Fraud Risk API")
|
||
if marker_pos == -1:
|
||
# 备用:有些页面可能直接包含 `"ip":"...","score":"..."` 但缺关键词
|
||
marker_pos = 0
|
||
|
||
# 从 marker_pos 向后找第一个 '{'
|
||
start = text.find("{", marker_pos)
|
||
if start == -1:
|
||
return None
|
||
|
||
# 用括号计数器向后扫描直到匹配闭合
|
||
depth = 0
|
||
i = start
|
||
end = None
|
||
while i < len(text):
|
||
ch = text[i]
|
||
if ch == "{":
|
||
depth += 1
|
||
elif ch == "}":
|
||
depth -= 1
|
||
if depth == 0:
|
||
end = i + 1
|
||
break
|
||
i += 1
|
||
|
||
if end is None:
|
||
return None
|
||
|
||
candidate = text[start:end]
|
||
|
||
# 清理 HTML 实体 / 多余的省略符号(网站示例有时会显示 "...")
|
||
# 将 HTML 标签移除(如果误包含)
|
||
candidate = re.sub(r"<[^>]+>", "", candidate)
|
||
|
||
# 有些页面会在片段中显示 "..." 或省略信息,我们不能解析含有 "..." 的 JSON
|
||
# 将出现的三点替换为 null 或将其删除(如果存在)
|
||
candidate = candidate.replace("...", "")
|
||
|
||
# 进一步清理:去掉单行注释或尾随逗号(尝试修复小错误以便 json.loads 成功)
|
||
# 去掉 JavaScript 注释
|
||
candidate = re.sub(r"//.*?$", "", candidate, flags=re.MULTILINE)
|
||
# 删除尾随逗号 (e.g., {"a":1,})
|
||
candidate = re.sub(r",\s*}", "}", candidate)
|
||
candidate = re.sub(r",\s*]", "]", candidate)
|
||
|
||
return candidate
|
||
|
||
def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]:
|
||
try:
|
||
return json.loads(s)
|
||
except Exception:
|
||
# 如果解析失败,尝试使用更宽松的替换(例如将单引号换成双引号)
|
||
try:
|
||
s2 = s.replace("'", "\"")
|
||
return json.loads(s2)
|
||
except Exception:
|
||
return None
|
||
|
||
def CheckIP(ip: str, user_agents_list: Optional[List[str]] = None, session: Optional[requests.Session] = None) -> Dict[str, Any]:
|
||
"""
|
||
查询 scamalytics.com 对单个 IP 的页面并解析出结果。
|
||
返回一个 dict,至少包含 ip 字段;在成功时返回 "score" 和 "risk" 等(字符串)。
|
||
失败时返回 {'ip': ip, 'error': '...'}。
|
||
"""
|
||
ua = _choose_user_agent(user_agents_list)
|
||
try:
|
||
text = _fetch_page(ip, ua, session=session)
|
||
except requests.RequestException as e:
|
||
return {"ip": ip, "error": f"http_error: {str(e)}"}
|
||
|
||
json_block = _extract_json_block_from_text(text)
|
||
if not json_block:
|
||
# 作为降级尝试:在页面中直接用正则找到 "ip":"...", "score":"...", "risk":"..."
|
||
m_ip = re.search(r'"ip"\s*:\s*"([^"]+)"', text)
|
||
m_score = re.search(r'"score"\s*:\s*"([^"]+)"', text)
|
||
m_risk = re.search(r'"risk"\s*:\s*"([^"]+)"', text)
|
||
if m_ip or m_score or m_risk:
|
||
result = {"ip": ip}
|
||
if m_ip:
|
||
result["ip"] = m_ip.group(1)
|
||
if m_score:
|
||
result["score"] = m_score.group(1)
|
||
if m_risk:
|
||
result["risk"] = m_risk.group(1)
|
||
return result
|
||
return {"ip": ip, "error": "no_json_block_found"}
|
||
|
||
parsed = _safe_json_loads(json_block)
|
||
if not parsed:
|
||
# 返回原始 json_block 以便用户调试
|
||
return {"ip": ip, "error": "json_parse_failed", "raw": json_block[:200]}
|
||
|
||
# 保证返回至少 ip, score, risk 三个字段
|
||
out = {"ip": parsed.get("ip", ip)}
|
||
if "score" in parsed:
|
||
out["score"] = parsed.get("score")
|
||
if "risk" in parsed:
|
||
out["risk"] = parsed.get("risk")
|
||
# 复制其他常见字段(如果存在)
|
||
for k in ("is_blacklisted_external", "operator", "hostname", "asn"):
|
||
if k in parsed:
|
||
out[k] = parsed[k]
|
||
|
||
# 将剩余字段以 flat 的方式并入(可选)
|
||
# 出于简洁默认不全部并入;用户可以从 parsed 获取全部信息。
|
||
out["_raw_parsed"] = parsed # 如果用户需要全部信息可查看这个键
|
||
|
||
return out
|