ligbox-ops-platform/projects/ops-desk/api/app/purge_auth_codes.py
Ligbox Spec Hub a39618afb8 Add purge authorization codes for protected domains (myvexx.com).
Root generates single-use codes in Infra with root password; Serviços purge requires code plus root password for PURGE_EXTRA_AUTH_DOMAINS.
2026-06-19 22:20:04 +00:00

196 lines
5.3 KiB
Python

"""Códigos de autorização purge — domínios protegidos (Spec 017 extensão)."""
from __future__ import annotations
import secrets
import sqlite3
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
from app import auth, vm112_domains
DEFAULT_TTL_HOURS = int(__import__("os").getenv("PURGE_AUTH_CODE_TTL_HOURS", "24"))
def _now() -> datetime:
return datetime.now(timezone.utc)
def _now_iso() -> str:
return _now().isoformat()
def init_purge_auth_schema(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS purge_auth_codes (
id TEXT PRIMARY KEY,
domain TEXT NOT NULL,
code_hash TEXT NOT NULL,
note TEXT,
created_by TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
used_at TEXT,
used_by TEXT
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_purge_auth_codes_domain_active
ON purge_auth_codes(domain, expires_at, used_at)
"""
)
conn.commit()
def normalize_code(code: str) -> str:
return code.upper().replace("-", "").replace(" ", "").strip()
def format_code_display(raw: str) -> str:
raw = normalize_code(raw)
if len(raw) == 8:
return f"{raw[:4]}-{raw[4:]}"
return raw
def generate_raw_code() -> str:
return secrets.token_hex(4).upper()
def requires_extra_auth(domain: str) -> bool:
return domain.lower().strip() in vm112_domains.PURGE_EXTRA_AUTH_DOMAINS
def create_code(
conn: sqlite3.Connection,
domain: str,
root_password: str,
created_by: str,
note: str = "",
ttl_hours: int | None = None,
) -> dict[str, Any]:
domain = domain.lower().strip()
if not vm112_domains.verify_root_password(conn, root_password):
raise ValueError("Senha Root incorrecta")
if domain in vm112_domains.PURGE_BLOCKLIST:
raise ValueError(f"Domínio {domain} está na blocklist absoluta — purge proibido")
if not requires_extra_auth(domain):
raise ValueError(f"Domínio {domain} não exige código extra de autorização")
ttl = ttl_hours if ttl_hours is not None else DEFAULT_TTL_HOURS
ttl = max(1, min(int(ttl), 168))
raw = generate_raw_code()
display = format_code_display(raw)
now = _now()
expires = now + timedelta(hours=ttl)
code_id = uuid.uuid4().hex[:16]
conn.execute(
"""
INSERT INTO purge_auth_codes (
id, domain, code_hash, note, created_by, created_at, expires_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
code_id,
domain,
auth.hash_password(normalize_code(raw)),
(note or "").strip() or None,
created_by,
now.isoformat(),
expires.isoformat(),
),
)
conn.commit()
return {
"ok": True,
"id": code_id,
"domain": domain,
"code": display,
"expires_at": expires.isoformat(),
"ttl_hours": ttl,
"note": note or None,
}
def validate_and_consume(
conn: sqlite3.Connection,
domain: str,
code: str,
used_by: str | None = None,
) -> bool:
domain = domain.lower().strip()
if not requires_extra_auth(domain):
return True
normalized = normalize_code(code or "")
if len(normalized) < 6:
return False
now = _now_iso()
rows = conn.execute(
"""
SELECT id, code_hash FROM purge_auth_codes
WHERE domain = ? AND used_at IS NULL AND expires_at > ?
ORDER BY created_at DESC
""",
(domain, now),
).fetchall()
for row in rows:
if auth.verify_password(normalized, row["code_hash"]):
conn.execute(
"""
UPDATE purge_auth_codes
SET used_at = ?, used_by = ?
WHERE id = ? AND used_at IS NULL
""",
(now, used_by, row["id"]),
)
conn.commit()
return True
return False
def list_codes(conn: sqlite3.Connection, active_only: bool = True, limit: int = 50) -> list[dict[str, Any]]:
limit = max(1, min(int(limit), 200))
now = _now_iso()
if active_only:
rows = conn.execute(
"""
SELECT id, domain, note, created_by, created_at, expires_at, used_at, used_by
FROM purge_auth_codes
WHERE used_at IS NULL AND expires_at > ?
ORDER BY created_at DESC
LIMIT ?
""",
(now, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, domain, note, created_by, created_at, expires_at, used_at, used_by
FROM purge_auth_codes
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [
{
"id": row["id"],
"domain": row["domain"],
"note": row["note"],
"created_by": row["created_by"],
"created_at": row["created_at"],
"expires_at": row["expires_at"],
"used_at": row["used_at"],
"used_by": row["used_by"],
"active": row["used_at"] is None and row["expires_at"] > now,
}
for row in rows
]