Specs stay at repo root (cross-VM). Move deploy and code into logical projects with README per domain, updated manifest.yaml, and symlinks at legacy paths for VM122 backward compatibility.
181 lines
6 KiB
Python
181 lines
6 KiB
Python
"""Cloudflare DNS records for domain management (read-only)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
CF_API = "https://api.cloudflare.com/client/v4"
|
|
|
|
EMAIL_PURPOSES = frozenset({"mx", "spf", "dkim", "dmarc", "mail-host", "autodiscover", "mail-alias"})
|
|
|
|
|
|
def _tokens() -> list[str]:
|
|
raw = os.getenv("CLOUDFLARE_API_TOKENS") or os.getenv("CLOUDFLARE_API_TOKEN") or ""
|
|
return [t.strip() for t in raw.replace(";", ",").split(",") if t.strip()]
|
|
|
|
|
|
def _parent_candidates(domain: str) -> list[str]:
|
|
domain = domain.lower().strip().rstrip(".")
|
|
parts = domain.split(".")
|
|
if len(parts) < 2:
|
|
return [domain] if domain else []
|
|
return [".".join(parts[i:]) for i in range(len(parts) - 1)]
|
|
|
|
|
|
def _headers(token: str) -> dict[str, str]:
|
|
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
|
|
def _classify_record(name: str, rtype: str, content: str) -> str:
|
|
n = (name or "").lower().rstrip(".")
|
|
c = (content or "").lower()
|
|
if rtype == "MX":
|
|
return "mx"
|
|
if rtype == "TXT":
|
|
if "v=spf1" in c:
|
|
return "spf"
|
|
if "_domainkey" in n or "v=dkim1" in c:
|
|
return "dkim"
|
|
if "_dmarc" in n or "v=dmarc1" in c:
|
|
return "dmarc"
|
|
if n.startswith("mail.") and rtype in ("A", "AAAA", "CNAME"):
|
|
return "mail-host"
|
|
if "autodiscover" in n or "autoconfig" in n:
|
|
return "autodiscover"
|
|
if rtype == "CNAME" and ("mail" in n or "autodiscover" in n):
|
|
return "mail-alias"
|
|
return "other"
|
|
|
|
|
|
def _record_belongs(name: str, domain: str) -> bool:
|
|
rn = (name or "").lower().rstrip(".")
|
|
d = domain.lower().strip().rstrip(".")
|
|
return rn == d or rn.endswith(f".{d}")
|
|
|
|
|
|
def _normalize_record(raw: dict, domain: str) -> dict[str, Any]:
|
|
name = raw.get("name", "")
|
|
rtype = raw.get("type", "")
|
|
content = raw.get("content", "")
|
|
purpose = _classify_record(name, rtype, content)
|
|
return {
|
|
"id": raw.get("id"),
|
|
"type": rtype,
|
|
"name": name.rstrip("."),
|
|
"content": content,
|
|
"priority": raw.get("priority"),
|
|
"proxied": raw.get("proxied"),
|
|
"ttl": raw.get("ttl"),
|
|
"purpose": purpose,
|
|
"email_related": purpose in EMAIL_PURPOSES,
|
|
"modified_on": raw.get("modified_on"),
|
|
"created_on": raw.get("created_on"),
|
|
}
|
|
|
|
|
|
async def _find_zone(client: httpx.AsyncClient, token: str, domain: str) -> dict | None:
|
|
for candidate in _parent_candidates(domain):
|
|
res = await client.get(
|
|
f"{CF_API}/zones",
|
|
headers=_headers(token),
|
|
params={"name": candidate, "status": "active"},
|
|
)
|
|
if res.status_code != 200:
|
|
continue
|
|
data = res.json()
|
|
if not data.get("success"):
|
|
continue
|
|
zones = data.get("result") or []
|
|
if zones:
|
|
z = zones[0]
|
|
return {"id": z.get("id"), "name": z.get("name"), "status": z.get("status")}
|
|
return None
|
|
|
|
|
|
async def _list_zone_records(client: httpx.AsyncClient, token: str, zone_id: str) -> list[dict]:
|
|
records: list[dict] = []
|
|
page = 1
|
|
while page <= 10:
|
|
res = await client.get(
|
|
f"{CF_API}/zones/{zone_id}/dns_records",
|
|
headers=_headers(token),
|
|
params={"per_page": 100, "page": page},
|
|
)
|
|
if res.status_code != 200:
|
|
break
|
|
data = res.json()
|
|
if not data.get("success"):
|
|
break
|
|
batch = data.get("result") or []
|
|
records.extend(batch)
|
|
info = data.get("result_info") or {}
|
|
if page >= (info.get("total_pages") or 1):
|
|
break
|
|
page += 1
|
|
return records
|
|
|
|
|
|
async def fetch_domain_dns(domain: str, *, email_service: bool | None = None) -> dict[str, Any]:
|
|
domain = domain.lower().strip().rstrip(".")
|
|
tokens = _tokens()
|
|
if not tokens:
|
|
return {
|
|
"domain": domain,
|
|
"zone": None,
|
|
"email_service": bool(email_service),
|
|
"service_type": "email_server" if email_service else None,
|
|
"records": [],
|
|
"email_records": [],
|
|
"summary": {"total": 0, "email_related": 0},
|
|
"error": "CLOUDFLARE_API_TOKEN não configurado no servidor",
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=20.0) as client:
|
|
zone = None
|
|
token_used = None
|
|
for token in tokens:
|
|
zone = await _find_zone(client, token, domain)
|
|
if zone:
|
|
token_used = token
|
|
break
|
|
|
|
if not zone or not token_used:
|
|
return {
|
|
"domain": domain,
|
|
"zone": None,
|
|
"email_service": bool(email_service),
|
|
"service_type": "email_server" if email_service else None,
|
|
"records": [],
|
|
"email_records": [],
|
|
"summary": {"total": 0, "email_related": 0},
|
|
"error": f"Zona Cloudflare não encontrada para {domain}",
|
|
}
|
|
|
|
raw_records = await _list_zone_records(client, token_used, zone["id"])
|
|
scoped = [_normalize_record(r, domain) for r in raw_records if _record_belongs(r.get("name", ""), domain)]
|
|
scoped.sort(key=lambda r: (0 if r["email_related"] else 1, r["type"], r["name"]))
|
|
|
|
email_records = [r for r in scoped if r["email_related"]]
|
|
is_email = email_service if email_service is not None else len(email_records) > 0
|
|
|
|
purposes: dict[str, int] = {}
|
|
for r in scoped:
|
|
purposes[r["purpose"]] = purposes.get(r["purpose"], 0) + 1
|
|
|
|
return {
|
|
"domain": domain,
|
|
"zone": zone,
|
|
"email_service": is_email,
|
|
"service_type": "email_server" if is_email else "other",
|
|
"records": scoped,
|
|
"email_records": email_records,
|
|
"summary": {
|
|
"total": len(scoped),
|
|
"email_related": len(email_records),
|
|
"by_purpose": purposes,
|
|
},
|
|
"error": None,
|
|
}
|