go_scamalytics_py/ipchecker/ipchecker.py

159 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ipchecker.ipchecker
提供 CheckIP(ip, user_agents_list) -> dict 功能。
目标:向 https://scamalytics.com/ip/{ip} 请求页面(随机或指定 User-Agent
解析页面中显示的 "IP Fraud Risk API" JSON 块并返回一个字典,至少包含:
{"ip": "...", "score": "...", "risk": "...", ...}
如果解析失败,会返回 {'ip': ip, 'error': '...'} 格式的结果。
"""
from __future__ import annotations
import requests
import random
import re
import json
from typing import List, Dict, Any, Optional
_DEFAULT_USER_AGENTS = [
# 提供若干常见 UA 供随机选择CLI 也允许用户传入自定义列表。
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1.1 Safari/605.1.1",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
]
_REQUEST_TIMEOUT = 15 # seconds
def _choose_user_agent(user_agents: Optional[List[str]]) -> str:
if user_agents:
return random.choice(user_agents)
return random.choice(_DEFAULT_USER_AGENTS)
def _fetch_page(ip: str, user_agent: str, session: Optional[requests.Session] = None) -> str:
url = f"https://scamalytics.com/ip/{ip}"
headers = {
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
s = session or requests.Session()
resp = s.get(url, headers=headers, timeout=_REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.text
def _extract_json_block_from_text(text: str) -> Optional[str]:
"""
在页面文本中寻找 'IP Fraud Risk API' 后面紧跟的 JSON 对象块。
我们尽量用“找到第一个 '{' 并用括号配对提取完整 JSON” 的方式,
以便处理多行格式化 JSON。
返回 JSON 字符串(如果成功),否则 None。
"""
# 定位关键词
marker_pos = text.find("IP Fraud Risk API")
if marker_pos == -1:
# 备用:有些页面可能直接包含 `"ip":"...","score":"..."` 但缺关键词
marker_pos = 0
# 从 marker_pos 向后找第一个 '{'
start = text.find("{", marker_pos)
if start == -1:
return None
# 用括号计数器向后扫描直到匹配闭合
depth = 0
i = start
end = None
while i < len(text):
ch = text[i]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
end = i + 1
break
i += 1
if end is None:
return None
candidate = text[start:end]
# 清理 HTML 实体 / 多余的省略符号(网站示例有时会显示 "..."
# 将 HTML 标签移除(如果误包含)
candidate = re.sub(r"<[^>]+>", "", candidate)
# 有些页面会在片段中显示 "..." 或省略信息,我们不能解析含有 "..." 的 JSON
# 将出现的三点替换为 null 或将其删除(如果存在)
candidate = candidate.replace("...", "")
# 进一步清理:去掉单行注释或尾随逗号(尝试修复小错误以便 json.loads 成功)
# 去掉 JavaScript 注释
candidate = re.sub(r"//.*?$", "", candidate, flags=re.MULTILINE)
# 删除尾随逗号 (e.g., {"a":1,})
candidate = re.sub(r",\s*}", "}", candidate)
candidate = re.sub(r",\s*]", "]", candidate)
return candidate
def _safe_json_loads(s: str) -> Optional[Dict[str, Any]]:
try:
return json.loads(s)
except Exception:
# 如果解析失败,尝试使用更宽松的替换(例如将单引号换成双引号)
try:
s2 = s.replace("'", "\"")
return json.loads(s2)
except Exception:
return None
def CheckIP(ip: str, user_agents_list: Optional[List[str]] = None, session: Optional[requests.Session] = None) -> Dict[str, Any]:
"""
查询 scamalytics.com 对单个 IP 的页面并解析出结果。
返回一个 dict至少包含 ip 字段;在成功时返回 "score""risk" 等(字符串)。
失败时返回 {'ip': ip, 'error': '...'}。
"""
ua = _choose_user_agent(user_agents_list)
try:
text = _fetch_page(ip, ua, session=session)
except requests.RequestException as e:
return {"ip": ip, "error": f"http_error: {str(e)}"}
json_block = _extract_json_block_from_text(text)
if not json_block:
# 作为降级尝试:在页面中直接用正则找到 "ip":"...", "score":"...", "risk":"..."
m_ip = re.search(r'"ip"\s*:\s*"([^"]+)"', text)
m_score = re.search(r'"score"\s*:\s*"([^"]+)"', text)
m_risk = re.search(r'"risk"\s*:\s*"([^"]+)"', text)
if m_ip or m_score or m_risk:
result = {"ip": ip}
if m_ip:
result["ip"] = m_ip.group(1)
if m_score:
result["score"] = m_score.group(1)
if m_risk:
result["risk"] = m_risk.group(1)
return result
return {"ip": ip, "error": "no_json_block_found"}
parsed = _safe_json_loads(json_block)
if not parsed:
# 返回原始 json_block 以便用户调试
return {"ip": ip, "error": "json_parse_failed", "raw": json_block[:200]}
# 保证返回至少 ip, score, risk 三个字段
out = {"ip": parsed.get("ip", ip)}
if "score" in parsed:
out["score"] = parsed.get("score")
if "risk" in parsed:
out["risk"] = parsed.get("risk")
# 复制其他常见字段(如果存在)
for k in ("is_blacklisted_external", "operator", "hostname", "asn"):
if k in parsed:
out[k] = parsed[k]
# 将剩余字段以 flat 的方式并入(可选)
# 出于简洁默认不全部并入;用户可以从 parsed 获取全部信息。
out["_raw_parsed"] = parsed # 如果用户需要全部信息可查看这个键
return out