"""Migration API routes — Spec 019.""" from __future__ import annotations from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field from app import auth from app.migration import gate, runner, store from app.permissions import can_manage_migration, can_read_migration router = APIRouter(prefix="/api/v1/migration", tags=["migration"]) class MailboxIn(BaseModel): email: str source_type: str = "imap" source_host: str | None = None source_user: str | None = None class CreateJobBody(BaseModel): domain: str = Field(..., min_length=3) tenant_id: int = 1 ticket_id: int | None = None source_server_label: str = "" dest_imap_host: str = "" notes: str = "" mailboxes: list[MailboxIn] = Field(default_factory=list) class ApproveGateBody(BaseModel): note: str = "" def _reader(user: auth.DeskUser = Depends(auth.get_current_user)) -> auth.DeskUser: if not can_read_migration(user.role): raise HTTPException(403, "permissão insuficiente") return user def _manager(user: auth.DeskUser = Depends(auth.get_current_user)) -> auth.DeskUser: if not can_manage_migration(user.role): raise HTTPException(403, "permissão insuficiente") return user @router.get("/jobs") def list_migration_jobs( domain: str = "", limit: int = Query(100, ge=1, le=500), user: auth.DeskUser = Depends(_reader), ): conn = auth.db() try: return store.list_jobs(conn, domain=domain.strip() or None, limit=limit) finally: conn.close() @router.post("/jobs") def create_migration_job(body: CreateJobBody, user: auth.DeskUser = Depends(_manager)): conn = auth.db() try: job = store.create_job( conn, domain=body.domain, tenant_id=body.tenant_id, ticket_id=body.ticket_id, source_server_label=body.source_server_label, dest_imap_host=body.dest_imap_host, notes=body.notes, mailboxes=[m.model_dump() for m in body.mailboxes], ) return job finally: conn.close() @router.get("/jobs/{job_id}") def get_migration_job(job_id: int, user: auth.DeskUser = Depends(_reader)): conn = auth.db() try: job = store.get_job(conn, job_id) finally: conn.close() if not job: raise HTTPException(404, "job não encontrado") return job @router.post("/jobs/{job_id}/preflight") def migration_preflight(job_id: int, user: auth.DeskUser = Depends(_manager)): conn = auth.db() try: return runner.run_preflight(conn, job_id, user.username) except ValueError as e: raise HTTPException(404, str(e)) from e finally: conn.close() @router.post("/jobs/{job_id}/sync") def migration_sync( job_id: int, run_type: str = Query("initial", pattern="^(initial|delta|final)$"), user: auth.DeskUser = Depends(_manager), ): conn = auth.db() try: return runner.run_sync(conn, job_id, user.username, run_type=run_type) except ValueError as e: raise HTTPException(404, str(e)) from e finally: conn.close() @router.get("/jobs/{job_id}/verify") def migration_verify(job_id: int, user: auth.DeskUser = Depends(_reader)): conn = auth.db() try: return runner.run_verify(conn, job_id, user.username) except ValueError as e: raise HTTPException(404, str(e)) from e finally: conn.close() @router.post("/jobs/{job_id}/approve-gate") def migration_approve_gate( job_id: int, body: ApproveGateBody, user: auth.DeskUser = Depends(_manager), ): conn = auth.db() try: job = store.get_job(conn, job_id) if not job: raise HTTPException(404, "job não encontrado") result = gate.approve_gate(conn, job_id, user.username) if body.note: store.update_job(conn, job_id, notes=(job.get("notes") or "") + f"\n[gate] {body.note}") return {"ok": True, **result, "job": store.get_job(conn, job_id)} finally: conn.close() @router.get("/gate") def migration_gate_lookup( domain: str = Query(..., min_length=3), user: auth.DeskUser | None = None, ): """VM112 consulta antes de DNS — auth opcional via query interna.""" conn = auth.db() try: return store.get_gate_for_domain(conn, domain) finally: conn.close()