obsidian-vault/ligbox-ops-platform/app/collectors/dns.py
2026-06-19 17:26:42 +00:00

86 lines
2.4 KiB
Python

"""Public DNS checks via dig (read-only)."""
from __future__ import annotations
import subprocess
from typing import Any
def _dig(*args: str) -> list[str]:
try:
proc = subprocess.run(
["dig", "+short", *args],
capture_output=True,
text=True,
timeout=8,
)
if proc.returncode != 0:
return []
lines = [ln.strip().strip('"') for ln in proc.stdout.splitlines() if ln.strip()]
return lines
except Exception:
return []
def _result(check_id: str, label: str, status: str, message: str, evidence: dict | None = None) -> dict[str, Any]:
return {
"check_id": check_id,
"label": label,
"status": status,
"message": message,
"evidence": evidence or {},
}
def collect(domain: str, mail_public_ip: str | None = None) -> dict[str, dict[str, Any]]:
domain = domain.lower().strip()
mail_host = f"mail.{domain}"
results: dict[str, dict[str, Any]] = {}
mx = _dig(domain, "MX")
mx_ok = any(mail_host in line or domain in line for line in mx)
results["dns_mx"] = _result(
"dns_mx",
"MX record",
"pass" if mx_ok else "fail",
f"MX: {', '.join(mx[:3]) or 'none'}",
{"records": mx},
)
txt_root = _dig(domain, "TXT")
spf = [t for t in txt_root if t.lower().startswith("v=spf1")]
results["dns_spf"] = _result(
"dns_spf",
"SPF",
"pass" if spf else "fail",
spf[0][:120] if spf else "SPF TXT not found",
{"records": spf},
)
dkim_name = f"default._domainkey.{domain}"
dkim = _dig(dkim_name, "TXT")
results["dns_dkim"] = _result(
"dns_dkim",
"DKIM",
"pass" if dkim else "fail",
"DKIM TXT present" if dkim else f"{dkim_name} not found",
{"records": dkim[:2]},
)
dmarc_name = f"_dmarc.{domain}"
dmarc = _dig(dmarc_name, "TXT")
results["dns_dmarc"] = _result(
"dns_dmarc",
"DMARC",
"pass" if dmarc else "warn",
dmarc[0][:120] if dmarc else "DMARC TXT not found",
{"records": dmarc},
)
if mail_public_ip:
a_mail = _dig(mail_host, "A")
if mail_public_ip not in a_mail and results["dns_mx"]["status"] == "pass":
results["dns_mx"]["status"] = "warn"
results["dns_mx"]["message"] += f" (A {mail_host}: {a_mail or 'none'})"
return results