diff --git a/projects/ops-desk/api/app/auth.py b/projects/ops-desk/api/app/auth.py index 59dd105..2255214 100644 --- a/projects/ops-desk/api/app/auth.py +++ b/projects/ops-desk/api/app/auth.py @@ -13,7 +13,7 @@ from typing import Any from fastapi import Depends, Header, HTTPException, Request from jose import JWTError, jwt -from passlib.context import CryptContext +import bcrypt from app.totp_util import verify_code as verify_totp_code @@ -26,8 +26,6 @@ DESK_BOOTSTRAP_PASSWORD = os.getenv("DESK_BOOTSTRAP_PASSWORD", "805353") AUTH_LOGIN_RATE_LIMIT = int(os.getenv("AUTH_LOGIN_RATE_LIMIT", "5")) OPS_INTERNAL_TOKEN = os.getenv("OPS_INTERNAL_TOKEN", "") -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") - _login_attempts: dict[str, list[float]] = {} _mfa_pending: dict[str, tuple[str, float]] = {} MFA_TOKEN_TTL_SEC = 300 @@ -54,17 +52,24 @@ class DeskUser: def db() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(DB_PATH) + conn = sqlite3.connect(DB_PATH, timeout=30.0) conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") return conn def hash_password(password: str) -> str: - return pwd_context.hash(password) + return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: - return pwd_context.verify(plain, hashed) + if not plain or not hashed: + return False + try: + return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) + except (ValueError, TypeError): + return False def init_auth_schema(conn: sqlite3.Connection) -> None: diff --git a/projects/ops-desk/api/app/main.py b/projects/ops-desk/api/app/main.py index 9472085..f903e2b 100644 --- a/projects/ops-desk/api/app/main.py +++ b/projects/ops-desk/api/app/main.py @@ -133,8 +133,10 @@ TICKET_COLUMNS = "id,tenant_id,subject,status,payload,created_at,assigned_to,ass def db(): DB_PATH.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(DB_PATH) + conn = sqlite3.connect(DB_PATH, timeout=30.0) conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") return conn @@ -172,6 +174,11 @@ def init_db(): from app import billing_store migration_store.init_schema(conn) billing_store.init_schema(conn) + from app.vm112_purge_jobs import init_purge_jobs_schema + + init_purge_jobs_schema(conn) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") conn.commit() diff --git a/projects/ops-desk/api/app/vm112_domains_routes.py b/projects/ops-desk/api/app/vm112_domains_routes.py index dbc9ed0..18147f0 100644 --- a/projects/ops-desk/api/app/vm112_domains_routes.py +++ b/projects/ops-desk/api/app/vm112_domains_routes.py @@ -2,6 +2,8 @@ from __future__ import annotations +import sqlite3 + from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field @@ -9,7 +11,13 @@ from pydantic import BaseModel, Field from app import auth, vm112_domains from app.permissions import can_manage_vm112_domains from app.vm112_purge_stream import purge_sse_generator -from app.vm112_purge_jobs import get_job_public, list_jobs, recover_job, start_job +from app.vm112_purge_jobs import ( + PurgeJobConflictError, + get_job_public, + list_jobs, + recover_job, + start_job, +) router = APIRouter(prefix="/api/v1/vm112", tags=["vm112-domains"]) @@ -116,7 +124,17 @@ def start_purge_job( ): """Inicia purge em background; consultar GET /purge/jobs/{id} (recomendado via Traefik).""" domain = _validate_purge_request(domain, body) - job_id = start_job(domain, body.root_password, user.username) + try: + job_id = start_job(domain, body.root_password, user.username) + except PurgeJobConflictError as exc: + raise HTTPException(409, str(exc)) from exc + except sqlite3.OperationalError as exc: + if "locked" in str(exc).lower(): + raise HTTPException( + 503, + "Base de dados ocupada — aguarde o purge em curso ou tente novamente em alguns segundos", + ) from exc + raise HTTPException(500, f"Erro SQLite ao iniciar purge: {exc}") from exc return {"ok": True, "job_id": job_id, "domain": domain, "status": "running"} diff --git a/projects/ops-desk/api/app/vm112_purge_jobs.py b/projects/ops-desk/api/app/vm112_purge_jobs.py index 6d6a43f..6235fa6 100644 --- a/projects/ops-desk/api/app/vm112_purge_jobs.py +++ b/projects/ops-desk/api/app/vm112_purge_jobs.py @@ -3,7 +3,9 @@ from __future__ import annotations import json +import sqlite3 import threading +import time import traceback import uuid from datetime import datetime, timezone @@ -12,6 +14,11 @@ from typing import Any, Callable from app import auth, vm112_domains _lock = threading.Lock() +_schema_ready = False + + +class PurgeJobConflictError(Exception): + """Já existe purge queued/running para o domínio.""" def _now() -> str: @@ -36,17 +43,42 @@ def init_purge_jobs_schema(conn) -> None: ) """ ) + conn.execute( + """ + CREATE INDEX IF NOT EXISTS idx_vm112_purge_jobs_domain_status + ON vm112_purge_jobs(domain, status) + """ + ) conn.commit() def _ensure_schema() -> None: + global _schema_ready + if _schema_ready: + return conn = auth.db() try: init_purge_jobs_schema(conn) + _schema_ready = True finally: conn.close() +def _sqlite_retry(fn: Callable[[], Any], *, attempts: int = 8) -> Any: + last_err: Exception | None = None + for attempt in range(attempts): + try: + return fn() + except sqlite3.OperationalError as exc: + last_err = exc + if "locked" not in str(exc).lower() or attempt >= attempts - 1: + raise + time.sleep(min(0.05 * (2**attempt), 1.0)) + if last_err: + raise last_err + return None + + def _row_to_job(row) -> dict[str, Any]: return { "id": row["id"], @@ -66,64 +98,124 @@ def _row_to_job(row) -> dict[str, Any]: def _persist_job(job: dict[str, Any]) -> None: _ensure_schema() - conn = auth.db() - try: - job["updated_at"] = _now() - conn.execute( - """ - INSERT INTO vm112_purge_jobs ( - id, domain, status, timeline_json, elapsed_vm112, - desk_json, vm112_json, error, by_user, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - status = excluded.status, - timeline_json = excluded.timeline_json, - elapsed_vm112 = excluded.elapsed_vm112, - desk_json = excluded.desk_json, - vm112_json = excluded.vm112_json, - error = excluded.error, - by_user = excluded.by_user, - updated_at = excluded.updated_at - """, - ( - job["id"], - job["domain"], - job["status"], - json.dumps(job.get("timeline") or [], ensure_ascii=False), - int(job.get("elapsed_vm112") or 0), - json.dumps(job.get("desk") or {}, ensure_ascii=False), - json.dumps(job.get("vm112") or {}, ensure_ascii=False), - job.get("error"), - job.get("by"), - job.get("created_at") or _now(), - job["updated_at"], - ), - ) - conn.commit() - finally: - conn.close() + + def _write() -> None: + conn = auth.db() + try: + job["updated_at"] = _now() + conn.execute( + """ + INSERT INTO vm112_purge_jobs ( + id, domain, status, timeline_json, elapsed_vm112, + desk_json, vm112_json, error, by_user, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + status = excluded.status, + timeline_json = excluded.timeline_json, + elapsed_vm112 = excluded.elapsed_vm112, + desk_json = excluded.desk_json, + vm112_json = excluded.vm112_json, + error = excluded.error, + by_user = excluded.by_user, + updated_at = excluded.updated_at + """, + ( + job["id"], + job["domain"], + job["status"], + json.dumps(job.get("timeline") or [], ensure_ascii=False), + int(job.get("elapsed_vm112") or 0), + json.dumps(job.get("desk") or {}, ensure_ascii=False), + json.dumps(job.get("vm112") or {}, ensure_ascii=False), + job.get("error"), + job.get("by"), + job.get("created_at") or _now(), + job["updated_at"], + ), + ) + conn.commit() + finally: + conn.close() + + _sqlite_retry(_write) def _load_job(job_id: str) -> dict[str, Any] | None: _ensure_schema() - conn = auth.db() - try: - row = conn.execute( - "SELECT * FROM vm112_purge_jobs WHERE id = ?", (job_id,) - ).fetchone() - return _row_to_job(row) if row else None - finally: - conn.close() + + def _read() -> dict[str, Any] | None: + conn = auth.db() + try: + row = conn.execute( + "SELECT * FROM vm112_purge_jobs WHERE id = ?", (job_id,) + ).fetchone() + return _row_to_job(row) if row else None + finally: + conn.close() + + return _sqlite_retry(_read) + + +def _find_active_job_locked(conn: sqlite3.Connection, domain: str) -> sqlite3.Row | None: + return conn.execute( + """ + SELECT id, domain, status, created_at FROM vm112_purge_jobs + WHERE domain = ? AND status IN ('queued', 'running') + ORDER BY created_at DESC LIMIT 1 + """, + (domain.lower().strip(),), + ).fetchone() def _mutate_job(job_id: str, fn: Callable[[dict[str, Any]], None]) -> dict[str, Any] | None: with _lock: - job = _load_job(job_id) - if not job: - return None - fn(job) - _persist_job(job) - return dict(job) + def _mutate() -> dict[str, Any] | None: + conn = auth.db() + try: + row = conn.execute( + "SELECT * FROM vm112_purge_jobs WHERE id = ?", (job_id,) + ).fetchone() + if not row: + return None + job = _row_to_job(row) + fn(job) + job["updated_at"] = _now() + conn.execute( + """ + INSERT INTO vm112_purge_jobs ( + id, domain, status, timeline_json, elapsed_vm112, + desk_json, vm112_json, error, by_user, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + status = excluded.status, + timeline_json = excluded.timeline_json, + elapsed_vm112 = excluded.elapsed_vm112, + desk_json = excluded.desk_json, + vm112_json = excluded.vm112_json, + error = excluded.error, + by_user = excluded.by_user, + updated_at = excluded.updated_at + """, + ( + job["id"], + job["domain"], + job["status"], + json.dumps(job.get("timeline") or [], ensure_ascii=False), + int(job.get("elapsed_vm112") or 0), + json.dumps(job.get("desk") or {}, ensure_ascii=False), + json.dumps(job.get("vm112") or {}, ensure_ascii=False), + job.get("error"), + job.get("by"), + job.get("created_at") or _now(), + job["updated_at"], + ), + ) + conn.commit() + return dict(job) + finally: + conn.close() + + return _sqlite_retry(_mutate) def _upsert_step(job_id: str, step: dict[str, str]) -> None: @@ -143,12 +235,13 @@ def _set_job(job_id: str, **fields: Any) -> None: def create_job(domain: str, username: str) -> str: + domain = domain.lower().strip() job_id = uuid.uuid4().hex[:16] now = _now() job = { "id": job_id, "job_id": job_id, - "domain": domain.lower().strip(), + "domain": domain, "status": "queued", "timeline": [], "elapsed_vm112": 0, @@ -159,8 +252,43 @@ def create_job(domain: str, username: str) -> str: "created_at": now, "updated_at": now, } + + def _create() -> None: + conn = auth.db() + try: + active = _find_active_job_locked(conn, domain) + if active: + raise PurgeJobConflictError( + f"Purge já em curso para {domain} (job {active['id']}, status {active['status']})" + ) + conn.execute( + """ + INSERT INTO vm112_purge_jobs ( + id, domain, status, timeline_json, elapsed_vm112, + desk_json, vm112_json, error, by_user, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + job["id"], + job["domain"], + job["status"], + "[]", + 0, + "{}", + "{}", + None, + job["by"], + job["created_at"], + job["updated_at"], + ), + ) + conn.commit() + finally: + conn.close() + with _lock: - _persist_job(job) + _ensure_schema() + _sqlite_retry(_create) return job_id diff --git a/projects/ops-desk/frontend/assets/servicos.js b/projects/ops-desk/frontend/assets/servicos.js index ef5d527..1457f39 100644 --- a/projects/ops-desk/frontend/assets/servicos.js +++ b/projects/ops-desk/frontend/assets/servicos.js @@ -140,6 +140,16 @@ const DeskServices = (() => { let errText = typeof detail === 'string' ? detail : JSON.stringify(detail || `${res.status}`); if (res.status === 504) { errText = '504 Gateway Timeout — o purge pode demorar vários minutos. Verifique na VM112 se concluiu antes de repetir.'; + } else if (res.status === 409) { + errText = typeof detail === 'string' + ? detail + : 'Purge já em curso para este domínio — aguarde a conclusão no drawer lateral.'; + } else if (res.status === 503) { + errText = typeof detail === 'string' + ? detail + : 'Servidor ocupado — aguarde alguns segundos e tente novamente.'; + } else if (res.status === 500 && errText === '500') { + errText = 'Erro interno ao iniciar purge — verifique logs da API Desk (VM122).'; } throw new Error(errText); }