"""Auth API routes.""" from __future__ import annotations from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel, Field from pydantic import model_validator from app import auth, backup_codes, mail_notify from app.permissions import ROLES, can_manage_users router = APIRouter(prefix="/api/v1/auth", tags=["auth"]) class LoginRequest(BaseModel): username: str password: str class LoginMfaRequest(BaseModel): mfa_token: str totp_code: str | None = Field(default=None, min_length=6, max_length=6) backup_code: str | None = Field(default=None, min_length=8, max_length=12) @model_validator(mode="after") def require_one_factor(self): has_totp = bool(self.totp_code and self.totp_code.strip()) has_backup = bool(self.backup_code and self.backup_code.strip()) if has_totp == has_backup: raise ValueError("informe o código 2FA ou um código de backup") return self class UserUpdateRequest(BaseModel): role: str | None = None active: bool | None = None password: str | None = Field(default=None, min_length=6) display_name: str | None = None class ChangePasswordRequest(BaseModel): current_password: str = Field(min_length=1) new_password: str = Field(min_length=8) totp_code: str | None = Field(default=None, min_length=6, max_length=6) @router.post("/login") def login(body: LoginRequest, request: Request): client_ip = request.client.host if request.client else "unknown" auth.check_login_rate_limit(client_ip) user, row = auth.check_credentials(body.username, body.password) if not user: raise HTTPException(401, "invalid credentials") if auth.user_requires_totp(row): mfa_token = auth.create_mfa_token(user.username) return { "mfa_required": True, "mfa_token": mfa_token, "expires_in": auth.MFA_TOKEN_TTL_SEC, "username": user.username, } auth.touch_last_login(user.username) token, expires_in = auth.create_access_token(user) return { "access_token": token, "token_type": "bearer", "expires_in": expires_in, "username": user.username, "role": user.role, "display_name": user.display_name, } @router.post("/login/mfa") def login_mfa(body: LoginMfaRequest, request: Request): client_ip = request.client.host if request.client else "unknown" auth.check_login_rate_limit(client_ip) username = auth.consume_mfa_token(body.mfa_token) if not username: raise HTTPException(401, "invalid or expired mfa session") if body.backup_code: with auth.db() as conn: ok = backup_codes.consume_backup_code(conn, username, body.backup_code.strip()) conn.commit() if not ok: raise HTTPException(401, "código de backup inválido ou já utilizado") elif not auth.verify_user_totp(username, body.totp_code or ""): raise HTTPException(401, "invalid authenticator code") row = auth._user_row(username) if not row or not row["active"]: raise HTTPException(401, "user inactive") user = auth.DeskUser( username=row["username"], role=row["role"], display_name=row["display_name"], active=True, ) auth.touch_last_login(user.username) token, expires_in = auth.create_access_token(user) return { "access_token": token, "token_type": "bearer", "expires_in": expires_in, "username": user.username, "role": user.role, "display_name": user.display_name, } @router.post("/logout") def logout(user: auth.DeskUser = Depends(auth.get_current_user)): return {"ok": True, "username": user.username} @router.get("/me") def me(user: auth.DeskUser = Depends(auth.get_current_user)): with auth.db() as conn: row = conn.execute( """ SELECT username, role, display_name, active, last_login_at, created_at, updated_at, email, phone, mfa_enabled, totp_enabled FROM desk_users WHERE username = ? """, (user.username,), ).fetchone() if not row: raise HTTPException(404, "user not found") out = auth.user_public_dict(row) with auth.db() as conn: out["backup_codes_remaining"] = backup_codes.count_remaining(conn, user.username) return out @router.post("/change-password") def change_password( body: ChangePasswordRequest, user: auth.DeskUser = Depends(auth.get_current_user), ): row = auth._user_row(user.username) if not row or not row["active"]: raise HTTPException(401, "user inactive or not found") if not auth.verify_password(body.current_password, row["password_hash"]): raise HTTPException(401, "senha atual incorreta") if body.current_password == body.new_password: raise HTTPException(400, "a nova senha deve ser diferente da atual") if auth.user_requires_totp(row): code = (body.totp_code or "").strip() if not code: raise HTTPException(400, "código 2FA obrigatório") if not auth.verify_user_totp(user.username, code): raise HTTPException(401, "código 2FA inválido") now = datetime.now(timezone.utc).isoformat() with auth.db() as conn: conn.execute( "UPDATE desk_users SET password_hash = ?, updated_at = ? WHERE username = ?", (auth.hash_password(body.new_password), now, user.username), ) conn.commit() return {"ok": True, "message": "Senha alterada com sucesso"} @router.get("/users") def list_users(user: auth.DeskUser = Depends(auth.get_current_user)): if not can_manage_users(user.role): raise HTTPException(403, "insufficient permissions") with auth.db() as conn: rows = conn.execute( """ SELECT u.username, u.role, u.display_name, u.active, u.last_login_at, u.created_at, u.updated_at, u.email, u.phone, u.mfa_enabled, u.totp_enabled, (SELECT COUNT(*) FROM desk_backup_codes b WHERE b.username = u.username AND b.used_at IS NULL) AS backup_codes_remaining FROM desk_users u ORDER BY u.username """ ).fetchall() users = [] for row in rows: item = auth.user_public_dict(row) item["totp_enabled"] = bool(row["totp_enabled"]) item["backup_codes_remaining"] = int(row["backup_codes_remaining"] or 0) users.append(item) return {"users": users} @router.patch("/users/{username}") def update_user( username: str, body: UserUpdateRequest, user: auth.DeskUser = Depends(auth.get_current_user), ): if not can_manage_users(user.role): raise HTTPException(403, "insufficient permissions") target = username.strip() if target.lower() != "root": target = target.lower() updates: list[str] = [] params: list[object] = [] if body.role is not None: if body.role not in ROLES: raise HTTPException(400, "invalid role") updates.append("role = ?") params.append(body.role) if body.active is not None: updates.append("active = ?") params.append(1 if body.active else 0) if body.display_name is not None: updates.append("display_name = ?") params.append(body.display_name) if body.password: updates.append("password_hash = ?") params.append(auth.hash_password(body.password)) if not updates: raise HTTPException(400, "no fields to update") now = datetime.now(timezone.utc).isoformat() updates.append("updated_at = ?") params.append(now) params.append(target) with auth.db() as conn: cur = conn.execute( f"UPDATE desk_users SET {', '.join(updates)} WHERE username = ?", params, ) conn.commit() if cur.rowcount == 0: raise HTTPException(404, "user not found") row = conn.execute( """ SELECT u.username, u.role, u.display_name, u.active, u.last_login_at, u.created_at, u.updated_at, u.email, u.phone, u.mfa_enabled, u.totp_enabled, (SELECT COUNT(*) FROM desk_backup_codes b WHERE b.username = u.username AND b.used_at IS NULL) AS backup_codes_remaining FROM desk_users u WHERE u.username = ? """, (target,), ).fetchone() item = auth.user_public_dict(row) item["totp_enabled"] = bool(row["totp_enabled"]) item["backup_codes_remaining"] = int(row["backup_codes_remaining"] or 0) return {"user": item} @router.post("/users/{username}/reset-2fa") def reset_user_2fa( username: str, user: auth.DeskUser = Depends(auth.get_current_user), ): if not can_manage_users(user.role): raise HTTPException(403, "insufficient permissions") target = username.strip() if target.lower() != "root": target = target.lower() if target == "root": raise HTTPException(400, "não é possível resetar 2FA do root por aqui") now = datetime.now(timezone.utc).isoformat() with auth.db() as conn: row = conn.execute( "SELECT username, email, totp_enabled FROM desk_users WHERE username = ?", (target,), ).fetchone() if not row: raise HTTPException(404, "user not found") if not row["totp_enabled"]: raise HTTPException(400, "utilizador não tem 2FA ativo") conn.execute( """ UPDATE desk_users SET totp_secret = NULL, totp_enabled = 0, mfa_enabled = 0, updated_at = ? WHERE username = ? """, (now, target), ) conn.execute("DELETE FROM desk_backup_codes WHERE username = ?", (target,)) conn.commit() email = row["email"] or target mail_notify.notify_admin_2fa_reset(target, email, user.username) return {"ok": True, "message": f"2FA resetado para {target}. O utilizador pode reconfigurar no login."}