"""MFA recovery API — perdi acesso ao autenticador.""" from __future__ import annotations from fastapi import APIRouter, HTTPException, Query, Request from pydantic import BaseModel, Field from app import auth, backup_codes, mail_notify, mfa_recovery_store from app.totp_util import otpauth_uri router = APIRouter(prefix="/api/v1/auth/mfa-recovery", tags=["mfa-recovery"]) class SendEmailRequest(BaseModel): mfa_token: str = Field(min_length=10) class VerifyEmailRequest(BaseModel): mfa_token: str = Field(min_length=10) email_otp: str = Field(min_length=6, max_length=6) class CompleteRecoveryRequest(BaseModel): recovery_token: str = Field(min_length=10) totp_code: str = Field(min_length=6, max_length=6) @router.post("/send-email") def send_recovery_email(body: SendEmailRequest, request: Request): client_ip = request.client.host if request.client else "unknown" auth.check_login_rate_limit(client_ip) username = auth.peek_mfa_token(body.mfa_token) if not username: raise HTTPException(401, "sessão 2FA expirada — faça login novamente") row = auth._user_row(username) if not row or not row["active"] or not auth.user_requires_totp(row): raise HTTPException(400, "recuperação não disponível para esta conta") try: with auth.db() as conn: code, target = mfa_recovery_store.set_recovery_email_otp(conn, username) except ValueError as exc: raise HTTPException(400, str(exc)) from exc sent = mail_notify.send_otp_email(target, code, "recuperação de 2FA (perdi autenticador)") if not sent: raise HTTPException(502, "falha ao enviar e-mail — verifique Postfix") mail_notify.notify_mfa_recovery_started(username, target) masked = mail_notify.mask_email(target) return { "ok": True, "message": f"Código enviado para {masked}", "email_hint": masked, } @router.post("/verify-email") def verify_recovery_email(body: VerifyEmailRequest, request: Request): client_ip = request.client.host if request.client else "unknown" auth.check_login_rate_limit(client_ip) username = auth.peek_mfa_token(body.mfa_token) if not username: raise HTTPException(401, "sessão 2FA expirada — faça login novamente") try: with auth.db() as conn: session = mfa_recovery_store.start_recovery_session( conn, username, body.email_otp, mfa_token=body.mfa_token ) except ValueError as exc: raise HTTPException(400, str(exc)) from exc email = session["email"] secret = session["totp_secret_pending"] return { "ok": True, "recovery_token": session["recovery_token"], "expires_in": session["expires_in"], "email": mail_notify.mask_email(email), "otpauth_uri": otpauth_uri(email, secret), } @router.get("/setup") def recovery_setup(recovery_token: str = Query(..., min_length=10)): with auth.db() as conn: session = mfa_recovery_store.get_recovery_session(conn, recovery_token) if not session: raise HTTPException(401, "sessão de recuperação inválida ou expirada") email = session["username"] row = auth._user_row(session["username"]) if row and row.get("email"): email = row["email"] secret = session.get("totp_secret_pending") or "" return { "username": session["username"], "otpauth_uri": otpauth_uri(email, secret) if secret else None, } @router.post("/complete") def complete_recovery(body: CompleteRecoveryRequest, request: Request): client_ip = request.client.host if request.client else "unknown" auth.check_login_rate_limit(client_ip) try: with auth.db() as conn: result = mfa_recovery_store.complete_recovery( conn, body.recovery_token, body.totp_code ) codes = backup_codes.generate_backup_codes() backup_codes.store_backup_codes(conn, result["username"], codes) conn.commit() except ValueError as exc: raise HTTPException(400, str(exc)) from exc mfa_token = result.get("mfa_token") if mfa_token: auth.consume_mfa_token(mfa_token) row = auth._user_row(result["username"]) if not row or not row["active"]: raise HTTPException(401, "usuário inativo") user = auth.DeskUser( username=row["username"], role=row["role"], display_name=row["display_name"], active=True, ) auth.touch_last_login(user.username) token, expires_in = auth.create_access_token(user) mail_notify.notify_mfa_recovery_completed(result["username"]) return { "ok": True, "access_token": token, "token_type": "bearer", "expires_in": expires_in, "username": user.username, "role": user.role, "display_name": user.display_name, "backup_codes": codes, "message": "2FA reconfigurado. Guarde os novos códigos de backup.", }