Root generates single-use codes in Infra with root password; Serviços purge requires code plus root password for PURGE_EXTRA_AUTH_DOMAINS.
196 lines
5.3 KiB
Python
196 lines
5.3 KiB
Python
"""Códigos de autorização purge — domínios protegidos (Spec 017 extensão)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import secrets
|
|
import sqlite3
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
from app import auth, vm112_domains
|
|
|
|
DEFAULT_TTL_HOURS = int(__import__("os").getenv("PURGE_AUTH_CODE_TTL_HOURS", "24"))
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return _now().isoformat()
|
|
|
|
|
|
def init_purge_auth_schema(conn: sqlite3.Connection) -> None:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS purge_auth_codes (
|
|
id TEXT PRIMARY KEY,
|
|
domain TEXT NOT NULL,
|
|
code_hash TEXT NOT NULL,
|
|
note TEXT,
|
|
created_by TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
expires_at TEXT NOT NULL,
|
|
used_at TEXT,
|
|
used_by TEXT
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_purge_auth_codes_domain_active
|
|
ON purge_auth_codes(domain, expires_at, used_at)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def normalize_code(code: str) -> str:
|
|
return code.upper().replace("-", "").replace(" ", "").strip()
|
|
|
|
|
|
def format_code_display(raw: str) -> str:
|
|
raw = normalize_code(raw)
|
|
if len(raw) == 8:
|
|
return f"{raw[:4]}-{raw[4:]}"
|
|
return raw
|
|
|
|
|
|
def generate_raw_code() -> str:
|
|
return secrets.token_hex(4).upper()
|
|
|
|
|
|
def requires_extra_auth(domain: str) -> bool:
|
|
return domain.lower().strip() in vm112_domains.PURGE_EXTRA_AUTH_DOMAINS
|
|
|
|
|
|
def create_code(
|
|
conn: sqlite3.Connection,
|
|
domain: str,
|
|
root_password: str,
|
|
created_by: str,
|
|
note: str = "",
|
|
ttl_hours: int | None = None,
|
|
) -> dict[str, Any]:
|
|
domain = domain.lower().strip()
|
|
if not vm112_domains.verify_root_password(conn, root_password):
|
|
raise ValueError("Senha Root incorrecta")
|
|
if domain in vm112_domains.PURGE_BLOCKLIST:
|
|
raise ValueError(f"Domínio {domain} está na blocklist absoluta — purge proibido")
|
|
if not requires_extra_auth(domain):
|
|
raise ValueError(f"Domínio {domain} não exige código extra de autorização")
|
|
|
|
ttl = ttl_hours if ttl_hours is not None else DEFAULT_TTL_HOURS
|
|
ttl = max(1, min(int(ttl), 168))
|
|
raw = generate_raw_code()
|
|
display = format_code_display(raw)
|
|
now = _now()
|
|
expires = now + timedelta(hours=ttl)
|
|
code_id = uuid.uuid4().hex[:16]
|
|
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO purge_auth_codes (
|
|
id, domain, code_hash, note, created_by, created_at, expires_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
code_id,
|
|
domain,
|
|
auth.hash_password(normalize_code(raw)),
|
|
(note or "").strip() or None,
|
|
created_by,
|
|
now.isoformat(),
|
|
expires.isoformat(),
|
|
),
|
|
)
|
|
conn.commit()
|
|
return {
|
|
"ok": True,
|
|
"id": code_id,
|
|
"domain": domain,
|
|
"code": display,
|
|
"expires_at": expires.isoformat(),
|
|
"ttl_hours": ttl,
|
|
"note": note or None,
|
|
}
|
|
|
|
|
|
def validate_and_consume(
|
|
conn: sqlite3.Connection,
|
|
domain: str,
|
|
code: str,
|
|
used_by: str | None = None,
|
|
) -> bool:
|
|
domain = domain.lower().strip()
|
|
if not requires_extra_auth(domain):
|
|
return True
|
|
normalized = normalize_code(code or "")
|
|
if len(normalized) < 6:
|
|
return False
|
|
|
|
now = _now_iso()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, code_hash FROM purge_auth_codes
|
|
WHERE domain = ? AND used_at IS NULL AND expires_at > ?
|
|
ORDER BY created_at DESC
|
|
""",
|
|
(domain, now),
|
|
).fetchall()
|
|
|
|
for row in rows:
|
|
if auth.verify_password(normalized, row["code_hash"]):
|
|
conn.execute(
|
|
"""
|
|
UPDATE purge_auth_codes
|
|
SET used_at = ?, used_by = ?
|
|
WHERE id = ? AND used_at IS NULL
|
|
""",
|
|
(now, used_by, row["id"]),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
def list_codes(conn: sqlite3.Connection, active_only: bool = True, limit: int = 50) -> list[dict[str, Any]]:
|
|
limit = max(1, min(int(limit), 200))
|
|
now = _now_iso()
|
|
if active_only:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, domain, note, created_by, created_at, expires_at, used_at, used_by
|
|
FROM purge_auth_codes
|
|
WHERE used_at IS NULL AND expires_at > ?
|
|
ORDER BY created_at DESC
|
|
LIMIT ?
|
|
""",
|
|
(now, limit),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, domain, note, created_by, created_at, expires_at, used_at, used_by
|
|
FROM purge_auth_codes
|
|
ORDER BY created_at DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
|
|
return [
|
|
{
|
|
"id": row["id"],
|
|
"domain": row["domain"],
|
|
"note": row["note"],
|
|
"created_by": row["created_by"],
|
|
"created_at": row["created_at"],
|
|
"expires_at": row["expires_at"],
|
|
"used_at": row["used_at"],
|
|
"used_by": row["used_by"],
|
|
"active": row["used_at"] is None and row["expires_at"] > now,
|
|
}
|
|
for row in rows
|
|
]
|