ligbox-ops-platform/app/cloudflare_dns.py
Ligbox Spec Hub 3a2c64834b Initial import: ligbox-ops-platform + specs + LAPTOP + obsidian merge (CT130)
Source: VM122 /opt + obsidian-infra + LAPTOP
Hub: CT130 spec-hub 10.10.10.130
2026-06-19 17:26:41 +00:00

181 lines
6 KiB
Python

"""Cloudflare DNS records for domain management (read-only)."""
from __future__ import annotations
import os
from typing import Any
import httpx
CF_API = "https://api.cloudflare.com/client/v4"
EMAIL_PURPOSES = frozenset({"mx", "spf", "dkim", "dmarc", "mail-host", "autodiscover", "mail-alias"})
def _tokens() -> list[str]:
raw = os.getenv("CLOUDFLARE_API_TOKENS") or os.getenv("CLOUDFLARE_API_TOKEN") or ""
return [t.strip() for t in raw.replace(";", ",").split(",") if t.strip()]
def _parent_candidates(domain: str) -> list[str]:
domain = domain.lower().strip().rstrip(".")
parts = domain.split(".")
if len(parts) < 2:
return [domain] if domain else []
return [".".join(parts[i:]) for i in range(len(parts) - 1)]
def _headers(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _classify_record(name: str, rtype: str, content: str) -> str:
n = (name or "").lower().rstrip(".")
c = (content or "").lower()
if rtype == "MX":
return "mx"
if rtype == "TXT":
if "v=spf1" in c:
return "spf"
if "_domainkey" in n or "v=dkim1" in c:
return "dkim"
if "_dmarc" in n or "v=dmarc1" in c:
return "dmarc"
if n.startswith("mail.") and rtype in ("A", "AAAA", "CNAME"):
return "mail-host"
if "autodiscover" in n or "autoconfig" in n:
return "autodiscover"
if rtype == "CNAME" and ("mail" in n or "autodiscover" in n):
return "mail-alias"
return "other"
def _record_belongs(name: str, domain: str) -> bool:
rn = (name or "").lower().rstrip(".")
d = domain.lower().strip().rstrip(".")
return rn == d or rn.endswith(f".{d}")
def _normalize_record(raw: dict, domain: str) -> dict[str, Any]:
name = raw.get("name", "")
rtype = raw.get("type", "")
content = raw.get("content", "")
purpose = _classify_record(name, rtype, content)
return {
"id": raw.get("id"),
"type": rtype,
"name": name.rstrip("."),
"content": content,
"priority": raw.get("priority"),
"proxied": raw.get("proxied"),
"ttl": raw.get("ttl"),
"purpose": purpose,
"email_related": purpose in EMAIL_PURPOSES,
"modified_on": raw.get("modified_on"),
"created_on": raw.get("created_on"),
}
async def _find_zone(client: httpx.AsyncClient, token: str, domain: str) -> dict | None:
for candidate in _parent_candidates(domain):
res = await client.get(
f"{CF_API}/zones",
headers=_headers(token),
params={"name": candidate, "status": "active"},
)
if res.status_code != 200:
continue
data = res.json()
if not data.get("success"):
continue
zones = data.get("result") or []
if zones:
z = zones[0]
return {"id": z.get("id"), "name": z.get("name"), "status": z.get("status")}
return None
async def _list_zone_records(client: httpx.AsyncClient, token: str, zone_id: str) -> list[dict]:
records: list[dict] = []
page = 1
while page <= 10:
res = await client.get(
f"{CF_API}/zones/{zone_id}/dns_records",
headers=_headers(token),
params={"per_page": 100, "page": page},
)
if res.status_code != 200:
break
data = res.json()
if not data.get("success"):
break
batch = data.get("result") or []
records.extend(batch)
info = data.get("result_info") or {}
if page >= (info.get("total_pages") or 1):
break
page += 1
return records
async def fetch_domain_dns(domain: str, *, email_service: bool | None = None) -> dict[str, Any]:
domain = domain.lower().strip().rstrip(".")
tokens = _tokens()
if not tokens:
return {
"domain": domain,
"zone": None,
"email_service": bool(email_service),
"service_type": "email_server" if email_service else None,
"records": [],
"email_records": [],
"summary": {"total": 0, "email_related": 0},
"error": "CLOUDFLARE_API_TOKEN não configurado no servidor",
}
async with httpx.AsyncClient(timeout=20.0) as client:
zone = None
token_used = None
for token in tokens:
zone = await _find_zone(client, token, domain)
if zone:
token_used = token
break
if not zone or not token_used:
return {
"domain": domain,
"zone": None,
"email_service": bool(email_service),
"service_type": "email_server" if email_service else None,
"records": [],
"email_records": [],
"summary": {"total": 0, "email_related": 0},
"error": f"Zona Cloudflare não encontrada para {domain}",
}
raw_records = await _list_zone_records(client, token_used, zone["id"])
scoped = [_normalize_record(r, domain) for r in raw_records if _record_belongs(r.get("name", ""), domain)]
scoped.sort(key=lambda r: (0 if r["email_related"] else 1, r["type"], r["name"]))
email_records = [r for r in scoped if r["email_related"]]
is_email = email_service if email_service is not None else len(email_records) > 0
purposes: dict[str, int] = {}
for r in scoped:
purposes[r["purpose"]] = purposes.get(r["purpose"], 0) + 1
return {
"domain": domain,
"zone": zone,
"email_service": is_email,
"service_type": "email_server" if is_email else "other",
"records": scoped,
"email_records": email_records,
"summary": {
"total": len(scoped),
"email_related": len(email_records),
"by_purpose": purposes,
},
"error": None,
}