"""Registration requests for Desk administrators.""" from __future__ import annotations import secrets import sqlite3 from datetime import datetime, timedelta, timezone from app import auth from app import backup_codes from app.permissions import ASSIGNABLE_ROLES from app.totp_util import generate_secret, ntfy_topic, verify_code STATUSES = frozenset({"pending", "approved", "rejected", "active"}) REQUIRED_FACTORS = 2 def _now() -> str: return datetime.now(timezone.utc).isoformat() def _otp_expires(minutes: int = 10) -> str: return (datetime.now(timezone.utc) + timedelta(minutes=minutes)).isoformat() def _ensure_column(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> None: cols = {row[1] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()} if column not in cols: conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}") def init_registration_schema(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS desk_registration_requests ( id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, display_name TEXT, status TEXT NOT NULL DEFAULT 'pending', role TEXT, activation_token TEXT UNIQUE, phone TEXT, email_otp TEXT, email_otp_expires TEXT, phone_otp TEXT, phone_otp_expires TEXT, approved_by TEXT, rejected_by TEXT, rejection_reason TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, approved_at TEXT ) """ ) for col, ddl in [ ("totp_secret_pending", "totp_secret_pending TEXT"), ("ntfy_topic", "ntfy_topic TEXT"), ("email_verified", "email_verified INTEGER NOT NULL DEFAULT 0"), ("phone_verified", "phone_verified INTEGER NOT NULL DEFAULT 0"), ("totp_verified", "totp_verified INTEGER NOT NULL DEFAULT 0"), ]: _ensure_column(conn, "desk_registration_requests", col, ddl) for col, ddl in [ ("email", "email TEXT"), ("phone", "phone TEXT"), ("mfa_enabled", "mfa_enabled INTEGER NOT NULL DEFAULT 0"), ("totp_secret", "totp_secret TEXT"), ("totp_enabled", "totp_enabled INTEGER NOT NULL DEFAULT 0"), ]: _ensure_column(conn, "desk_users", col, ddl) def normalize_email(email: str) -> str: return email.strip().lower() def create_request(conn: sqlite3.Connection, email: str, password: str, display_name: str | None) -> dict: email = normalize_email(email) existing = conn.execute( "SELECT id FROM desk_users WHERE username = ? OR email = ?", (email, email), ).fetchone() if existing: raise ValueError("e-mail já cadastrado") pending = conn.execute( "SELECT id FROM desk_registration_requests WHERE email = ? AND status IN ('pending', 'approved')", (email,), ).fetchone() if pending: raise ValueError("já existe pedido pendente para este e-mail") now = _now() cur = conn.execute( """ INSERT INTO desk_registration_requests (email, password_hash, display_name, status, created_at, updated_at) VALUES (?, ?, ?, 'pending', ?, ?) """, (email, auth.hash_password(password), display_name, now, now), ) conn.commit() return get_request(conn, int(cur.lastrowid)) def get_request(conn: sqlite3.Connection, request_id: int) -> dict | None: row = conn.execute( "SELECT * FROM desk_registration_requests WHERE id = ?", (request_id,), ).fetchone() return dict(row) if row else None def get_request_by_token(conn: sqlite3.Connection, token: str) -> dict | None: row = conn.execute( "SELECT * FROM desk_registration_requests WHERE activation_token = ?", (token,), ).fetchone() return dict(row) if row else None def list_requests(conn: sqlite3.Connection, status: str | None = None) -> list[dict]: if status: rows = conn.execute( "SELECT * FROM desk_registration_requests WHERE status = ? ORDER BY created_at DESC", (status,), ).fetchall() else: rows = conn.execute( "SELECT * FROM desk_registration_requests ORDER BY created_at DESC" ).fetchall() return [public_request(dict(r)) for r in rows] def factor_status(row: dict) -> dict: return { "email": bool(row.get("email_verified")), "phone": bool(row.get("phone_verified")), "totp": bool(row.get("totp_verified")), "verified_count": sum( 1 for k in ("email_verified", "phone_verified", "totp_verified") if row.get(k) ), "required": REQUIRED_FACTORS, "ready": sum(1 for k in ("email_verified", "phone_verified", "totp_verified") if row.get(k)) >= REQUIRED_FACTORS, } def public_request(row: dict) -> dict: return { "id": row["id"], "email": row["email"], "display_name": row.get("display_name"), "status": row["status"], "role": row.get("role"), "phone": row.get("phone"), "approved_by": row.get("approved_by"), "rejected_by": row.get("rejected_by"), "rejection_reason": row.get("rejection_reason"), "created_at": row.get("created_at"), "updated_at": row.get("updated_at"), "approved_at": row.get("approved_at"), "factors": factor_status(row), } def ensure_activation_secrets(conn: sqlite3.Connection, request_id: int) -> dict: row = get_request(conn, request_id) if not row: raise ValueError("request not found") secret = row.get("totp_secret_pending") or generate_secret() topic = row.get("ntfy_topic") or ntfy_topic(row["email"], request_id) if not row.get("totp_secret_pending") or not row.get("ntfy_topic"): conn.execute( """ UPDATE desk_registration_requests SET totp_secret_pending = ?, ntfy_topic = ?, updated_at = ? WHERE id = ? """, (secret, topic, _now(), request_id), ) conn.commit() row = get_request(conn, request_id) return row def approve_request(conn: sqlite3.Connection, request_id: int, role: str, approved_by: str) -> dict: if role not in ASSIGNABLE_ROLES: raise ValueError("invalid role for new registration") row = get_request(conn, request_id) if not row: raise ValueError("request not found") if row["status"] != "pending": raise ValueError(f"cannot approve status {row['status']}") token = secrets.token_urlsafe(32) secret = generate_secret() topic = ntfy_topic(row["email"], request_id) now = _now() conn.execute( """ UPDATE desk_registration_requests SET status = 'approved', role = ?, activation_token = ?, approved_by = ?, approved_at = ?, updated_at = ?, totp_secret_pending = ?, ntfy_topic = ?, email_verified = 0, phone_verified = 0, totp_verified = 0 WHERE id = ? """, (role, token, approved_by, now, now, secret, topic, request_id), ) conn.commit() return get_request(conn, request_id) def reject_request( conn: sqlite3.Connection, request_id: int, rejected_by: str, reason: str | None = None ) -> dict: row = get_request(conn, request_id) if not row: raise ValueError("request not found") if row["status"] != "pending": raise ValueError(f"cannot reject status {row['status']}") now = _now() conn.execute( """ UPDATE desk_registration_requests SET status = 'rejected', rejected_by = ?, rejection_reason = ?, updated_at = ? WHERE id = ? """, (rejected_by, reason, now, request_id), ) conn.commit() return get_request(conn, request_id) def set_email_otp(conn: sqlite3.Connection, request_id: int) -> tuple[str, dict]: code = f"{secrets.randbelow(1_000_000):06d}" conn.execute( """ UPDATE desk_registration_requests SET email_otp = ?, email_otp_expires = ?, updated_at = ? WHERE id = ? """, (code, _otp_expires(), _now(), request_id), ) conn.commit() return code, get_request(conn, request_id) def set_phone_otp(conn: sqlite3.Connection, request_id: int, phone: str) -> tuple[str, dict]: code = f"{secrets.randbelow(1_000_000):06d}" conn.execute( """ UPDATE desk_registration_requests SET phone = ?, phone_otp = ?, phone_otp_expires = ?, updated_at = ? WHERE id = ? """, (phone.strip(), code, _otp_expires(), _now(), request_id), ) conn.commit() return code, get_request(conn, request_id) def _otp_valid(stored: str | None, expires: str | None, provided: str) -> bool: if not stored or not expires or not provided: return False if stored.strip() != provided.strip(): return False try: exp = datetime.fromisoformat(expires) if exp.tzinfo is None: exp = exp.replace(tzinfo=timezone.utc) except ValueError: return False return datetime.now(timezone.utc) <= exp def _count_verified(row: dict) -> int: return sum(1 for k in ("email_verified", "phone_verified", "totp_verified") if row.get(k)) def complete_activation( conn: sqlite3.Connection, request_id: int, email_otp: str | None = None, phone_otp: str | None = None, totp_code: str | None = None, ) -> dict: row = get_request(conn, request_id) if not row: raise ValueError("request not found") if row["status"] != "approved": raise ValueError("request not approved") email_verified = bool(row.get("email_verified")) phone_verified = bool(row.get("phone_verified")) totp_verified = bool(row.get("totp_verified")) if email_otp and not email_verified: if _otp_valid(row.get("email_otp"), row.get("email_otp_expires"), email_otp): email_verified = True if phone_otp and not phone_verified: if row.get("phone") and _otp_valid(row.get("phone_otp"), row.get("phone_otp_expires"), phone_otp): phone_verified = True if totp_code and not totp_verified: secret = row.get("totp_secret_pending") if secret and verify_code(secret, totp_code): totp_verified = True verified_count = sum([email_verified, phone_verified, totp_verified]) if verified_count < REQUIRED_FACTORS: conn.execute( """ UPDATE desk_registration_requests SET email_verified = ?, phone_verified = ?, totp_verified = ?, updated_at = ? WHERE id = ? """, (int(email_verified), int(phone_verified), int(totp_verified), _now(), request_id), ) conn.commit() raise ValueError(f"valide pelo menos {REQUIRED_FACTORS} fatores ({verified_count}/{REQUIRED_FACTORS})") email = row["email"] role = row["role"] if not role: raise ValueError("role not set") now = _now() display = row.get("display_name") or email.split("@")[0] totp_secret = row.get("totp_secret_pending") if totp_verified else None totp_enabled = 1 if totp_verified else 0 phone = row.get("phone") if phone_verified else None conn.execute( """ INSERT INTO desk_users (username, password_hash, role, display_name, email, phone, mfa_enabled, totp_secret, totp_enabled, active, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, 1, ?, ?) """, ( email, row["password_hash"], role, display, email, phone, totp_secret, totp_enabled, now, now, ), ) conn.execute( """ UPDATE desk_registration_requests SET status = 'active', email_verified = ?, phone_verified = ?, totp_verified = ?, updated_at = ? WHERE id = ? """, (int(email_verified), int(phone_verified), int(totp_verified), now, request_id), ) conn.commit() result = get_request(conn, request_id) if totp_enabled and totp_secret: codes = backup_codes.generate_backup_codes() backup_codes.store_backup_codes(conn, email, codes) conn.commit() result = dict(result) result["backup_codes"] = codes return result