"""Registration and activation routes for Desk administrators.""" from __future__ import annotations from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field from app import auth, desk_tickets, mail_notify, registration_store from app.permissions import ASSIGNABLE_ROLES, can_manage_users from app import ntfy_notify from app.totp_util import otpauth_uri router = APIRouter(prefix="/api/v1/auth", tags=["registration"]) class RegisterRequest(BaseModel): email: str = Field(min_length=5) password: str = Field(min_length=8) display_name: str | None = None class ApproveRequest(BaseModel): role: str class RejectRequest(BaseModel): reason: str | None = None class PhoneOtpRequest(BaseModel): token: str phone: str = Field(min_length=8) class ActivateRequest(BaseModel): token: str email_otp: str | None = Field(default=None, min_length=6, max_length=6) phone_otp: str | None = Field(default=None, min_length=6, max_length=6) totp_code: str | None = Field(default=None, min_length=6, max_length=6) @router.post("/register") def register(body: RegisterRequest): email = registration_store.normalize_email(body.email) if "@" not in email: raise HTTPException(400, "invalid email") try: with auth.db() as conn: row = registration_store.create_request(conn, email, body.password, body.display_name) ticket_id = desk_tickets.ticket_registration_pending( conn, row["id"], email, body.display_name ) except ValueError as exc: raise HTTPException(400, str(exc)) from exc mail_notify.notify_root_registration_pending(email, row["id"]) return { "ok": True, "message": "Pedido enviado. Aguarde aprovação do root.", "request_id": row["id"], "ticket_id": ticket_id, } @router.get("/registration-requests") def list_registration_requests(user: auth.DeskUser = Depends(auth.require_roles("super_admin"))): with auth.db() as conn: items = registration_store.list_requests(conn) pending = sum(1 for i in items if i["status"] == "pending") return {"requests": items, "pending_count": pending} @router.post("/registration-requests/{request_id}/approve") def approve_registration( request_id: int, body: ApproveRequest, user: auth.DeskUser = Depends(auth.require_roles("super_admin")), ): if body.role not in ASSIGNABLE_ROLES: raise HTTPException(400, f"role must be one of: {', '.join(sorted(ASSIGNABLE_ROLES))}") try: with auth.db() as conn: row = registration_store.approve_request(conn, request_id, body.role, user.username) except ValueError as exc: raise HTTPException(400, str(exc)) from exc token = row.get("activation_token") url = f"{mail_notify.DESK_PUBLIC_URL}/activate.html?token={token}" with auth.db() as conn: ticket_id = desk_tickets.ticket_registration_approved( conn, request_id, row["email"], body.role, url, row.get("display_name"), ) mail_notify.notify_candidate_approved(row["email"], url, body.role) return { "ok": True, "request": registration_store.public_request(row), "ticket_id": ticket_id, } @router.post("/registration-requests/{request_id}/reject") def reject_registration( request_id: int, body: RejectRequest, user: auth.DeskUser = Depends(auth.require_roles("super_admin")), ): try: with auth.db() as conn: row = registration_store.reject_request(conn, request_id, user.username, body.reason) except ValueError as exc: raise HTTPException(400, str(exc)) from exc mail_notify.notify_candidate_rejected(row["email"], body.reason) return {"ok": True, "request": registration_store.public_request(row)} @router.get("/activate") def validate_activation_token(token: str = Query(..., min_length=10)): with auth.db() as conn: row = registration_store.get_request_by_token(conn, token) if not row or row["status"] != "approved": raise HTTPException(400, "invalid or expired activation token") row = registration_store.ensure_activation_secrets(conn, row["id"]) secret = row.get("totp_secret_pending") or "" return { "email": row["email"], "role": row.get("role"), "display_name": row.get("display_name"), "otpauth_uri": otpauth_uri(row["email"], secret) if secret else None, "ntfy_topic": row.get("ntfy_topic"), "ntfy_subscribe_url": ntfy_notify.subscribe_url(row["ntfy_topic"]) if row.get("ntfy_topic") else None, "factors": registration_store.factor_status(row), "required_factors": registration_store.REQUIRED_FACTORS, } @router.post("/activate/send-email-otp") def send_email_otp(token: str = Query(..., min_length=10)): with auth.db() as conn: row = registration_store.get_request_by_token(conn, token) if not row or row["status"] != "approved": raise HTTPException(400, "invalid activation token") code, _ = registration_store.set_email_otp(conn, row["id"]) sent = mail_notify.send_otp_email(row["email"], code, "ativação de conta (e-mail)") if not sent: raise HTTPException(502, "falha ao enviar e-mail - verifique Postfix") topic = row.get("ntfy_topic") if topic: try: ntfy_notify.push(topic, "Codigo e-mail - Ligbox Ops", f"Seu codigo: {code}") except Exception: pass return {"ok": True, "message": "Código enviado para seu e-mail"} @router.post("/activate/send-phone-otp") def send_phone_otp(body: PhoneOtpRequest): with auth.db() as conn: row = registration_store.get_request_by_token(conn, body.token) if not row or row["status"] != "approved": raise HTTPException(400, "invalid activation token") code, _ = registration_store.set_phone_otp(conn, row["id"], body.phone) # MVP: SMS via email até integração SMS dedicada sent = mail_notify.send_otp_email( row["email"], code, f"ativação de conta (telefone {body.phone})", ) if not sent: raise HTTPException(502, "failed to send phone verification") topic = row.get("ntfy_topic") if topic: try: ntfy_notify.push(topic, "Codigo telefone - Ligbox Ops", f"Seu codigo: {code}") except Exception: pass return {"ok": True, "message": "Código de telefone enviado (verifique o e-mail)"} @router.post("/activate") def complete_activation(body: ActivateRequest): with auth.db() as conn: row = registration_store.get_request_by_token(conn, body.token) if not row: raise HTTPException(400, "invalid activation token") if not any([body.email_otp, body.phone_otp, body.totp_code]): raise HTTPException(400, "informe códigos de pelo menos 2 fatores") try: row = registration_store.complete_activation( conn, row["id"], email_otp=body.email_otp, phone_otp=body.phone_otp, totp_code=body.totp_code, ) except ValueError as exc: raise HTTPException(400, str(exc)) from exc backup_codes_list = row.get("backup_codes") if isinstance(row, dict) else None provision_result = None try: from app.vm123 import provision as vm123_provision with auth.db() as conn: provision_result = vm123_provision.provision_desk_user( conn, desk_username=row["email"], desk_role=row.get("role") or "", display_name=row.get("display_name") or row["email"], email=row["email"], ) except Exception: provision_result = {"skipped": True, "reason": "provisionamento VM123 falhou — ver logs"} if backup_codes_list and row.get("email"): mail_notify.send_backup_codes_email(row["email"], backup_codes_list) return { "ok": True, "message": "Conta ativa. Você já pode entrar com seu e-mail e senha.", "totp_login_required": bool(body.totp_code), "backup_codes": backup_codes_list, "vm123_provision": provision_result, }