"""MFA recovery sessions — e-mail OTP + TOTP re-enrollment.""" from __future__ import annotations import secrets import sqlite3 from datetime import datetime, timedelta, timezone from app.totp_util import generate_secret, verify_code RECOVERY_TOKEN_TTL_MIN = 15 def _now() -> str: return datetime.now(timezone.utc).isoformat() def _expires(minutes: int) -> str: return (datetime.now(timezone.utc) + timedelta(minutes=minutes)).isoformat() def _is_expired(expires: str | None) -> bool: if not expires: return True try: exp = datetime.fromisoformat(expires) if exp.tzinfo is None: exp = exp.replace(tzinfo=timezone.utc) except ValueError: return True return datetime.now(timezone.utc) > exp def init_recovery_schema(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS desk_mfa_recovery ( recovery_token TEXT PRIMARY KEY, username TEXT NOT NULL, totp_secret_pending TEXT NOT NULL, mfa_token TEXT, created_at TEXT NOT NULL, expires_at TEXT NOT NULL ) """ ) def _user_email(row: dict) -> str | None: email = (row.get("email") or row.get("username") or "").strip() if "@" in email: return email.lower() return None def set_recovery_email_otp(conn: sqlite3.Connection, username: str) -> tuple[str, str | None]: row = conn.execute( "SELECT username, email FROM desk_users WHERE username = ? AND active = 1", (username,), ).fetchone() if not row: raise ValueError("usuário não encontrado") row_d = dict(row) target = _user_email(row_d) if not target: raise ValueError("conta sem e-mail cadastrado — contacte o root") code = f"{secrets.randbelow(1_000_000):06d}" conn.execute( """ UPDATE desk_users SET recovery_email_otp = ?, recovery_email_otp_expires = ?, updated_at = ? WHERE username = ? """, (code, _expires(10), _now(), username), ) conn.commit() return code, target def _otp_valid(stored: str | None, expires: str | None, provided: str) -> bool: if not stored or not expires or not provided: return False if stored.strip() != provided.strip(): return False return not _is_expired(expires) def start_recovery_session( conn: sqlite3.Connection, username: str, email_otp: str, mfa_token: str | None = None, ) -> dict: row = conn.execute( """ SELECT username, email, recovery_email_otp, recovery_email_otp_expires, totp_enabled FROM desk_users WHERE username = ? AND active = 1 """, (username,), ).fetchone() if not row: raise ValueError("usuário não encontrado") row_d = dict(row) if not row_d.get("totp_enabled"): raise ValueError("2FA não está ativo nesta conta") if not _otp_valid( row_d.get("recovery_email_otp"), row_d.get("recovery_email_otp_expires"), email_otp, ): raise ValueError("código de e-mail inválido ou expirado") recovery_token = secrets.token_urlsafe(32) secret = generate_secret() now = _now() expires = _expires(RECOVERY_TOKEN_TTL_MIN) conn.execute( """ INSERT INTO desk_mfa_recovery (recovery_token, username, totp_secret_pending, mfa_token, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?) """, (recovery_token, username, secret, mfa_token, now, expires), ) conn.execute( """ UPDATE desk_users SET recovery_email_otp = NULL, recovery_email_otp_expires = NULL, updated_at = ? WHERE username = ? """, (now, username), ) conn.commit() email = _user_email(row_d) or username return { "recovery_token": recovery_token, "username": username, "email": email, "totp_secret_pending": secret, "expires_in": RECOVERY_TOKEN_TTL_MIN * 60, } def get_recovery_session(conn: sqlite3.Connection, recovery_token: str) -> dict | None: row = conn.execute( "SELECT * FROM desk_mfa_recovery WHERE recovery_token = ?", (recovery_token,), ).fetchone() if not row: return None data = dict(row) if _is_expired(data.get("expires_at")): conn.execute( "DELETE FROM desk_mfa_recovery WHERE recovery_token = ?", (recovery_token,), ) conn.commit() return None return data def complete_recovery(conn: sqlite3.Connection, recovery_token: str, totp_code: str) -> dict: session = get_recovery_session(conn, recovery_token) if not session: raise ValueError("sessão de recuperação inválida ou expirada") secret = session.get("totp_secret_pending") or "" if not verify_code(secret, totp_code): raise ValueError("código do autenticador inválido — confirme o novo QR") username = session["username"] now = _now() conn.execute( """ UPDATE desk_users SET totp_secret = ?, totp_enabled = 1, mfa_enabled = 1, updated_at = ? WHERE username = ? """, (secret, now, username), ) conn.execute( "DELETE FROM desk_mfa_recovery WHERE recovery_token = ?", (recovery_token,), ) conn.commit() return { "username": username, "mfa_token": session.get("mfa_token"), }