"""Cloudflare DNS records for domain management (read-only).""" from __future__ import annotations import os from typing import Any import httpx CF_API = "https://api.cloudflare.com/client/v4" EMAIL_PURPOSES = frozenset({"mx", "spf", "dkim", "dmarc", "mail-host", "autodiscover", "mail-alias"}) def _tokens() -> list[str]: raw = os.getenv("CLOUDFLARE_API_TOKENS") or os.getenv("CLOUDFLARE_API_TOKEN") or "" return [t.strip() for t in raw.replace(";", ",").split(",") if t.strip()] def _parent_candidates(domain: str) -> list[str]: domain = domain.lower().strip().rstrip(".") parts = domain.split(".") if len(parts) < 2: return [domain] if domain else [] return [".".join(parts[i:]) for i in range(len(parts) - 1)] def _headers(token: str) -> dict[str, str]: return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def _classify_record(name: str, rtype: str, content: str) -> str: n = (name or "").lower().rstrip(".") c = (content or "").lower() if rtype == "MX": return "mx" if rtype == "TXT": if "v=spf1" in c: return "spf" if "_domainkey" in n or "v=dkim1" in c: return "dkim" if "_dmarc" in n or "v=dmarc1" in c: return "dmarc" if n.startswith("mail.") and rtype in ("A", "AAAA", "CNAME"): return "mail-host" if "autodiscover" in n or "autoconfig" in n: return "autodiscover" if rtype == "CNAME" and ("mail" in n or "autodiscover" in n): return "mail-alias" return "other" def _record_belongs(name: str, domain: str) -> bool: rn = (name or "").lower().rstrip(".") d = domain.lower().strip().rstrip(".") return rn == d or rn.endswith(f".{d}") def _normalize_record(raw: dict, domain: str) -> dict[str, Any]: name = raw.get("name", "") rtype = raw.get("type", "") content = raw.get("content", "") purpose = _classify_record(name, rtype, content) return { "id": raw.get("id"), "type": rtype, "name": name.rstrip("."), "content": content, "priority": raw.get("priority"), "proxied": raw.get("proxied"), "ttl": raw.get("ttl"), "purpose": purpose, "email_related": purpose in EMAIL_PURPOSES, "modified_on": raw.get("modified_on"), "created_on": raw.get("created_on"), } async def _find_zone(client: httpx.AsyncClient, token: str, domain: str) -> dict | None: for candidate in _parent_candidates(domain): res = await client.get( f"{CF_API}/zones", headers=_headers(token), params={"name": candidate, "status": "active"}, ) if res.status_code != 200: continue data = res.json() if not data.get("success"): continue zones = data.get("result") or [] if zones: z = zones[0] return {"id": z.get("id"), "name": z.get("name"), "status": z.get("status")} return None async def _list_zone_records(client: httpx.AsyncClient, token: str, zone_id: str) -> list[dict]: records: list[dict] = [] page = 1 while page <= 10: res = await client.get( f"{CF_API}/zones/{zone_id}/dns_records", headers=_headers(token), params={"per_page": 100, "page": page}, ) if res.status_code != 200: break data = res.json() if not data.get("success"): break batch = data.get("result") or [] records.extend(batch) info = data.get("result_info") or {} if page >= (info.get("total_pages") or 1): break page += 1 return records async def fetch_domain_dns(domain: str, *, email_service: bool | None = None) -> dict[str, Any]: domain = domain.lower().strip().rstrip(".") tokens = _tokens() if not tokens: return { "domain": domain, "zone": None, "email_service": bool(email_service), "service_type": "email_server" if email_service else None, "records": [], "email_records": [], "summary": {"total": 0, "email_related": 0}, "error": "CLOUDFLARE_API_TOKEN não configurado no servidor", } async with httpx.AsyncClient(timeout=20.0) as client: zone = None token_used = None for token in tokens: zone = await _find_zone(client, token, domain) if zone: token_used = token break if not zone or not token_used: return { "domain": domain, "zone": None, "email_service": bool(email_service), "service_type": "email_server" if email_service else None, "records": [], "email_records": [], "summary": {"total": 0, "email_related": 0}, "error": f"Zona Cloudflare não encontrada para {domain}", } raw_records = await _list_zone_records(client, token_used, zone["id"]) scoped = [_normalize_record(r, domain) for r in raw_records if _record_belongs(r.get("name", ""), domain)] scoped.sort(key=lambda r: (0 if r["email_related"] else 1, r["type"], r["name"])) email_records = [r for r in scoped if r["email_related"]] is_email = email_service if email_service is not None else len(email_records) > 0 purposes: dict[str, int] = {} for r in scoped: purposes[r["purpose"]] = purposes.get(r["purpose"], 0) + 1 return { "domain": domain, "zone": zone, "email_service": is_email, "service_type": "email_server" if is_email else "other", "records": scoped, "email_records": email_records, "summary": { "total": len(scoped), "email_related": len(email_records), "by_purpose": purposes, }, "error": None, }