ligbox-ops-platform/projects/ops-desk/api/app/mfa_recovery_routes.py
Ligbox Spec Hub 821675ab4a Reorganize monorepo into projects/wizard, ops-desk, finance
Specs stay at repo root (cross-VM). Move deploy and code into logical
projects with README per domain, updated manifest.yaml, and symlinks at
legacy paths for VM122 backward compatibility.
2026-06-19 18:55:03 +00:00

138 lines
4.9 KiB
Python

"""MFA recovery API — perdi acesso ao autenticador."""
from __future__ import annotations
from fastapi import APIRouter, HTTPException, Query, Request
from pydantic import BaseModel, Field
from app import auth, backup_codes, mail_notify, mfa_recovery_store
from app.totp_util import otpauth_uri
router = APIRouter(prefix="/api/v1/auth/mfa-recovery", tags=["mfa-recovery"])
class SendEmailRequest(BaseModel):
mfa_token: str = Field(min_length=10)
class VerifyEmailRequest(BaseModel):
mfa_token: str = Field(min_length=10)
email_otp: str = Field(min_length=6, max_length=6)
class CompleteRecoveryRequest(BaseModel):
recovery_token: str = Field(min_length=10)
totp_code: str = Field(min_length=6, max_length=6)
@router.post("/send-email")
def send_recovery_email(body: SendEmailRequest, request: Request):
client_ip = request.client.host if request.client else "unknown"
auth.check_login_rate_limit(client_ip)
username = auth.peek_mfa_token(body.mfa_token)
if not username:
raise HTTPException(401, "sessão 2FA expirada — faça login novamente")
row = auth._user_row(username)
if not row or not row["active"] or not auth.user_requires_totp(row):
raise HTTPException(400, "recuperação não disponível para esta conta")
try:
with auth.db() as conn:
code, target = mfa_recovery_store.set_recovery_email_otp(conn, username)
except ValueError as exc:
raise HTTPException(400, str(exc)) from exc
sent = mail_notify.send_otp_email(target, code, "recuperação de 2FA (perdi autenticador)")
if not sent:
raise HTTPException(502, "falha ao enviar e-mail — verifique Postfix")
mail_notify.notify_mfa_recovery_started(username, target)
masked = mail_notify.mask_email(target)
return {
"ok": True,
"message": f"Código enviado para {masked}",
"email_hint": masked,
}
@router.post("/verify-email")
def verify_recovery_email(body: VerifyEmailRequest, request: Request):
client_ip = request.client.host if request.client else "unknown"
auth.check_login_rate_limit(client_ip)
username = auth.peek_mfa_token(body.mfa_token)
if not username:
raise HTTPException(401, "sessão 2FA expirada — faça login novamente")
try:
with auth.db() as conn:
session = mfa_recovery_store.start_recovery_session(
conn, username, body.email_otp, mfa_token=body.mfa_token
)
except ValueError as exc:
raise HTTPException(400, str(exc)) from exc
email = session["email"]
secret = session["totp_secret_pending"]
return {
"ok": True,
"recovery_token": session["recovery_token"],
"expires_in": session["expires_in"],
"email": mail_notify.mask_email(email),
"otpauth_uri": otpauth_uri(email, secret),
}
@router.get("/setup")
def recovery_setup(recovery_token: str = Query(..., min_length=10)):
with auth.db() as conn:
session = mfa_recovery_store.get_recovery_session(conn, recovery_token)
if not session:
raise HTTPException(401, "sessão de recuperação inválida ou expirada")
email = session["username"]
row = auth._user_row(session["username"])
if row and row.get("email"):
email = row["email"]
secret = session.get("totp_secret_pending") or ""
return {
"username": session["username"],
"otpauth_uri": otpauth_uri(email, secret) if secret else None,
}
@router.post("/complete")
def complete_recovery(body: CompleteRecoveryRequest, request: Request):
client_ip = request.client.host if request.client else "unknown"
auth.check_login_rate_limit(client_ip)
try:
with auth.db() as conn:
result = mfa_recovery_store.complete_recovery(
conn, body.recovery_token, body.totp_code
)
codes = backup_codes.generate_backup_codes()
backup_codes.store_backup_codes(conn, result["username"], codes)
conn.commit()
except ValueError as exc:
raise HTTPException(400, str(exc)) from exc
mfa_token = result.get("mfa_token")
if mfa_token:
auth.consume_mfa_token(mfa_token)
row = auth._user_row(result["username"])
if not row or not row["active"]:
raise HTTPException(401, "usuário inativo")
user = auth.DeskUser(
username=row["username"],
role=row["role"],
display_name=row["display_name"],
active=True,
)
auth.touch_last_login(user.username)
token, expires_in = auth.create_access_token(user)
mail_notify.notify_mfa_recovery_completed(result["username"])
return {
"ok": True,
"access_token": token,
"token_type": "bearer",
"expires_in": expires_in,
"username": user.username,
"role": user.role,
"display_name": user.display_name,
"backup_codes": codes,
"message": "2FA reconfigurado. Guarde os novos códigos de backup.",
}