Specs stay at repo root (cross-VM). Move deploy and code into logical projects with README per domain, updated manifest.yaml, and symlinks at legacy paths for VM122 backward compatibility.
138 lines
4.9 KiB
Python
138 lines
4.9 KiB
Python
"""MFA recovery API — perdi acesso ao autenticador."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Request
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app import auth, backup_codes, mail_notify, mfa_recovery_store
|
|
from app.totp_util import otpauth_uri
|
|
|
|
router = APIRouter(prefix="/api/v1/auth/mfa-recovery", tags=["mfa-recovery"])
|
|
|
|
|
|
class SendEmailRequest(BaseModel):
|
|
mfa_token: str = Field(min_length=10)
|
|
|
|
|
|
class VerifyEmailRequest(BaseModel):
|
|
mfa_token: str = Field(min_length=10)
|
|
email_otp: str = Field(min_length=6, max_length=6)
|
|
|
|
|
|
class CompleteRecoveryRequest(BaseModel):
|
|
recovery_token: str = Field(min_length=10)
|
|
totp_code: str = Field(min_length=6, max_length=6)
|
|
|
|
|
|
@router.post("/send-email")
|
|
def send_recovery_email(body: SendEmailRequest, request: Request):
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
auth.check_login_rate_limit(client_ip)
|
|
username = auth.peek_mfa_token(body.mfa_token)
|
|
if not username:
|
|
raise HTTPException(401, "sessão 2FA expirada — faça login novamente")
|
|
row = auth._user_row(username)
|
|
if not row or not row["active"] or not auth.user_requires_totp(row):
|
|
raise HTTPException(400, "recuperação não disponível para esta conta")
|
|
try:
|
|
with auth.db() as conn:
|
|
code, target = mfa_recovery_store.set_recovery_email_otp(conn, username)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc)) from exc
|
|
sent = mail_notify.send_otp_email(target, code, "recuperação de 2FA (perdi autenticador)")
|
|
if not sent:
|
|
raise HTTPException(502, "falha ao enviar e-mail — verifique Postfix")
|
|
mail_notify.notify_mfa_recovery_started(username, target)
|
|
masked = mail_notify.mask_email(target)
|
|
return {
|
|
"ok": True,
|
|
"message": f"Código enviado para {masked}",
|
|
"email_hint": masked,
|
|
}
|
|
|
|
|
|
@router.post("/verify-email")
|
|
def verify_recovery_email(body: VerifyEmailRequest, request: Request):
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
auth.check_login_rate_limit(client_ip)
|
|
username = auth.peek_mfa_token(body.mfa_token)
|
|
if not username:
|
|
raise HTTPException(401, "sessão 2FA expirada — faça login novamente")
|
|
try:
|
|
with auth.db() as conn:
|
|
session = mfa_recovery_store.start_recovery_session(
|
|
conn, username, body.email_otp, mfa_token=body.mfa_token
|
|
)
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc)) from exc
|
|
email = session["email"]
|
|
secret = session["totp_secret_pending"]
|
|
return {
|
|
"ok": True,
|
|
"recovery_token": session["recovery_token"],
|
|
"expires_in": session["expires_in"],
|
|
"email": mail_notify.mask_email(email),
|
|
"otpauth_uri": otpauth_uri(email, secret),
|
|
}
|
|
|
|
|
|
@router.get("/setup")
|
|
def recovery_setup(recovery_token: str = Query(..., min_length=10)):
|
|
with auth.db() as conn:
|
|
session = mfa_recovery_store.get_recovery_session(conn, recovery_token)
|
|
if not session:
|
|
raise HTTPException(401, "sessão de recuperação inválida ou expirada")
|
|
email = session["username"]
|
|
row = auth._user_row(session["username"])
|
|
if row and row.get("email"):
|
|
email = row["email"]
|
|
secret = session.get("totp_secret_pending") or ""
|
|
return {
|
|
"username": session["username"],
|
|
"otpauth_uri": otpauth_uri(email, secret) if secret else None,
|
|
}
|
|
|
|
|
|
@router.post("/complete")
|
|
def complete_recovery(body: CompleteRecoveryRequest, request: Request):
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
auth.check_login_rate_limit(client_ip)
|
|
try:
|
|
with auth.db() as conn:
|
|
result = mfa_recovery_store.complete_recovery(
|
|
conn, body.recovery_token, body.totp_code
|
|
)
|
|
codes = backup_codes.generate_backup_codes()
|
|
backup_codes.store_backup_codes(conn, result["username"], codes)
|
|
conn.commit()
|
|
except ValueError as exc:
|
|
raise HTTPException(400, str(exc)) from exc
|
|
|
|
mfa_token = result.get("mfa_token")
|
|
if mfa_token:
|
|
auth.consume_mfa_token(mfa_token)
|
|
|
|
row = auth._user_row(result["username"])
|
|
if not row or not row["active"]:
|
|
raise HTTPException(401, "usuário inativo")
|
|
user = auth.DeskUser(
|
|
username=row["username"],
|
|
role=row["role"],
|
|
display_name=row["display_name"],
|
|
active=True,
|
|
)
|
|
auth.touch_last_login(user.username)
|
|
token, expires_in = auth.create_access_token(user)
|
|
mail_notify.notify_mfa_recovery_completed(result["username"])
|
|
return {
|
|
"ok": True,
|
|
"access_token": token,
|
|
"token_type": "bearer",
|
|
"expires_in": expires_in,
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"display_name": user.display_name,
|
|
"backup_codes": codes,
|
|
"message": "2FA reconfigurado. Guarde os novos códigos de backup.",
|
|
}
|