188 lines
5.4 KiB
Python
188 lines
5.4 KiB
Python
"""MFA recovery sessions — e-mail OTP + TOTP re-enrollment."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import secrets
|
|
import sqlite3
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from app.totp_util import generate_secret, verify_code
|
|
|
|
RECOVERY_TOKEN_TTL_MIN = 15
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _expires(minutes: int) -> str:
|
|
return (datetime.now(timezone.utc) + timedelta(minutes=minutes)).isoformat()
|
|
|
|
|
|
def _is_expired(expires: str | None) -> bool:
|
|
if not expires:
|
|
return True
|
|
try:
|
|
exp = datetime.fromisoformat(expires)
|
|
if exp.tzinfo is None:
|
|
exp = exp.replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return True
|
|
return datetime.now(timezone.utc) > exp
|
|
|
|
|
|
def init_recovery_schema(conn: sqlite3.Connection) -> None:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS desk_mfa_recovery (
|
|
recovery_token TEXT PRIMARY KEY,
|
|
username TEXT NOT NULL,
|
|
totp_secret_pending TEXT NOT NULL,
|
|
mfa_token TEXT,
|
|
created_at TEXT NOT NULL,
|
|
expires_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
|
|
def _user_email(row: dict) -> str | None:
|
|
email = (row.get("email") or row.get("username") or "").strip()
|
|
if "@" in email:
|
|
return email.lower()
|
|
return None
|
|
|
|
|
|
def set_recovery_email_otp(conn: sqlite3.Connection, username: str) -> tuple[str, str | None]:
|
|
row = conn.execute(
|
|
"SELECT username, email FROM desk_users WHERE username = ? AND active = 1",
|
|
(username,),
|
|
).fetchone()
|
|
if not row:
|
|
raise ValueError("usuário não encontrado")
|
|
row_d = dict(row)
|
|
target = _user_email(row_d)
|
|
if not target:
|
|
raise ValueError("conta sem e-mail cadastrado — contacte o root")
|
|
code = f"{secrets.randbelow(1_000_000):06d}"
|
|
conn.execute(
|
|
"""
|
|
UPDATE desk_users
|
|
SET recovery_email_otp = ?, recovery_email_otp_expires = ?, updated_at = ?
|
|
WHERE username = ?
|
|
""",
|
|
(code, _expires(10), _now(), username),
|
|
)
|
|
conn.commit()
|
|
return code, target
|
|
|
|
|
|
def _otp_valid(stored: str | None, expires: str | None, provided: str) -> bool:
|
|
if not stored or not expires or not provided:
|
|
return False
|
|
if stored.strip() != provided.strip():
|
|
return False
|
|
return not _is_expired(expires)
|
|
|
|
|
|
def start_recovery_session(
|
|
conn: sqlite3.Connection,
|
|
username: str,
|
|
email_otp: str,
|
|
mfa_token: str | None = None,
|
|
) -> dict:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT username, email, recovery_email_otp, recovery_email_otp_expires, totp_enabled
|
|
FROM desk_users WHERE username = ? AND active = 1
|
|
""",
|
|
(username,),
|
|
).fetchone()
|
|
if not row:
|
|
raise ValueError("usuário não encontrado")
|
|
row_d = dict(row)
|
|
if not row_d.get("totp_enabled"):
|
|
raise ValueError("2FA não está ativo nesta conta")
|
|
if not _otp_valid(
|
|
row_d.get("recovery_email_otp"),
|
|
row_d.get("recovery_email_otp_expires"),
|
|
email_otp,
|
|
):
|
|
raise ValueError("código de e-mail inválido ou expirado")
|
|
|
|
recovery_token = secrets.token_urlsafe(32)
|
|
secret = generate_secret()
|
|
now = _now()
|
|
expires = _expires(RECOVERY_TOKEN_TTL_MIN)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO desk_mfa_recovery
|
|
(recovery_token, username, totp_secret_pending, mfa_token, created_at, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(recovery_token, username, secret, mfa_token, now, expires),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE desk_users
|
|
SET recovery_email_otp = NULL, recovery_email_otp_expires = NULL, updated_at = ?
|
|
WHERE username = ?
|
|
""",
|
|
(now, username),
|
|
)
|
|
conn.commit()
|
|
email = _user_email(row_d) or username
|
|
return {
|
|
"recovery_token": recovery_token,
|
|
"username": username,
|
|
"email": email,
|
|
"totp_secret_pending": secret,
|
|
"expires_in": RECOVERY_TOKEN_TTL_MIN * 60,
|
|
}
|
|
|
|
|
|
def get_recovery_session(conn: sqlite3.Connection, recovery_token: str) -> dict | None:
|
|
row = conn.execute(
|
|
"SELECT * FROM desk_mfa_recovery WHERE recovery_token = ?",
|
|
(recovery_token,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
data = dict(row)
|
|
if _is_expired(data.get("expires_at")):
|
|
conn.execute(
|
|
"DELETE FROM desk_mfa_recovery WHERE recovery_token = ?",
|
|
(recovery_token,),
|
|
)
|
|
conn.commit()
|
|
return None
|
|
return data
|
|
|
|
|
|
def complete_recovery(conn: sqlite3.Connection, recovery_token: str, totp_code: str) -> dict:
|
|
session = get_recovery_session(conn, recovery_token)
|
|
if not session:
|
|
raise ValueError("sessão de recuperação inválida ou expirada")
|
|
secret = session.get("totp_secret_pending") or ""
|
|
if not verify_code(secret, totp_code):
|
|
raise ValueError("código do autenticador inválido — confirme o novo QR")
|
|
|
|
username = session["username"]
|
|
now = _now()
|
|
conn.execute(
|
|
"""
|
|
UPDATE desk_users
|
|
SET totp_secret = ?, totp_enabled = 1, mfa_enabled = 1, updated_at = ?
|
|
WHERE username = ?
|
|
""",
|
|
(secret, now, username),
|
|
)
|
|
conn.execute(
|
|
"DELETE FROM desk_mfa_recovery WHERE recovery_token = ?",
|
|
(recovery_token,),
|
|
)
|
|
conn.commit()
|
|
return {
|
|
"username": username,
|
|
"mfa_token": session.get("mfa_token"),
|
|
}
|