138 lines
4.9 KiB
Python
138 lines
4.9 KiB
Python
"""MFA recovery API — perdi acesso ao autenticador."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Request
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app import auth, backup_codes, mail_notify, mfa_recovery_store
|
|
from app.totp_util import otpauth_uri
|
|
|
|
router = APIRouter(prefix="/api/v1/auth/mfa-recovery", tags=["mfa-recovery"])
|
|
|
|
|
|
class SendEmailRequest(BaseModel):
|
|
mfa_token: str = Field(min_length=10)
|
|
|
|
|
|
class VerifyEmailRequest(BaseModel):
|
|
mfa_token: str = Field(min_length=10)
|
|
email_otp: str = Field(min_length=6, max_length=6)
|
|
|
|
|
|
class CompleteRecoveryRequest(BaseModel):
|
|
recovery_token: str = Field(min_length=10)
|
|
totp_code: str = Field(min_length=6, max_length=6)
|
|
|
|
|
|
@router.post("/send-email")
|
|
def send_recovery_email(body: SendEmailRequest, request: Request):
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
auth.check_login_rate_limit(client_ip)
|
|
username = auth.peek_mfa_token(body.mfa_token)
|
|
if not username:
|
|
raise HTTPException(401, "sessão 2FA expirada — faça login novamente")
|
|
row = auth._user_row(username)
|
|
if not row or not row["active"] or not auth.user_requires_totp(row):
|
|
raise HTTPException(400, "recuperação não disponível para esta conta")
|
|
try:
|
|
with auth.db() as conn:
|
|
code, target = mfa_recovery_store.set_recovery_email_otp(conn, username)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc)) from exc
|
|
sent = mail_notify.send_otp_email(target, code, "recuperação de 2FA (perdi autenticador)")
|
|
if not sent:
|
|
raise HTTPException(502, "falha ao enviar e-mail — verifique Postfix")
|
|
mail_notify.notify_mfa_recovery_started(username, target)
|
|
masked = mail_notify.mask_email(target)
|
|
return {
|
|
"ok": True,
|
|
"message": f"Código enviado para {masked}",
|
|
"email_hint": masked,
|
|
}
|
|
|
|
|
|
@router.post("/verify-email")
|
|
def verify_recovery_email(body: VerifyEmailRequest, request: Request):
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
auth.check_login_rate_limit(client_ip)
|
|
username = auth.peek_mfa_token(body.mfa_token)
|
|
if not username:
|
|
raise HTTPException(401, "sessão 2FA expirada — faça login novamente")
|
|
try:
|
|
with auth.db() as conn:
|
|
session = mfa_recovery_store.start_recovery_session(
|
|
conn, username, body.email_otp, mfa_token=body.mfa_token
|
|
)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc)) from exc
|
|
email = session["email"]
|
|
secret = session["totp_secret_pending"]
|
|
return {
|
|
"ok": True,
|
|
"recovery_token": session["recovery_token"],
|
|
"expires_in": session["expires_in"],
|
|
"email": mail_notify.mask_email(email),
|
|
"otpauth_uri": otpauth_uri(email, secret),
|
|
}
|
|
|
|
|
|
@router.get("/setup")
|
|
def recovery_setup(recovery_token: str = Query(..., min_length=10)):
|
|
with auth.db() as conn:
|
|
session = mfa_recovery_store.get_recovery_session(conn, recovery_token)
|
|
if not session:
|
|
raise HTTPException(401, "sessão de recuperação inválida ou expirada")
|
|
email = session["username"]
|
|
row = auth._user_row(session["username"])
|
|
if row and row.get("email"):
|
|
email = row["email"]
|
|
secret = session.get("totp_secret_pending") or ""
|
|
return {
|
|
"username": session["username"],
|
|
"otpauth_uri": otpauth_uri(email, secret) if secret else None,
|
|
}
|
|
|
|
|
|
@router.post("/complete")
|
|
def complete_recovery(body: CompleteRecoveryRequest, request: Request):
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
auth.check_login_rate_limit(client_ip)
|
|
try:
|
|
with auth.db() as conn:
|
|
result = mfa_recovery_store.complete_recovery(
|
|
conn, body.recovery_token, body.totp_code
|
|
)
|
|
codes = backup_codes.generate_backup_codes()
|
|
backup_codes.store_backup_codes(conn, result["username"], codes)
|
|
conn.commit()
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc)) from exc
|
|
|
|
mfa_token = result.get("mfa_token")
|
|
if mfa_token:
|
|
auth.consume_mfa_token(mfa_token)
|
|
|
|
row = auth._user_row(result["username"])
|
|
if not row or not row["active"]:
|
|
raise HTTPException(401, "usuário inativo")
|
|
user = auth.DeskUser(
|
|
username=row["username"],
|
|
role=row["role"],
|
|
display_name=row["display_name"],
|
|
active=True,
|
|
)
|
|
auth.touch_last_login(user.username)
|
|
token, expires_in = auth.create_access_token(user)
|
|
mail_notify.notify_mfa_recovery_completed(result["username"])
|
|
return {
|
|
"ok": True,
|
|
"access_token": token,
|
|
"token_type": "bearer",
|
|
"expires_in": expires_in,
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"display_name": user.display_name,
|
|
"backup_codes": codes,
|
|
"message": "2FA reconfigurado. Guarde os novos códigos de backup.",
|
|
}
|