"""Assist / takeover API — Spec 010 Phase A.""" from __future__ import annotations import os import secrets from datetime import datetime, timezone import httpx from fastapi import APIRouter, Depends, HTTPException from app import assist_catalog, assist_store, auth from app.permissions import ( can_assist_handoff, can_assist_takeover, can_read_assist, ) router = APIRouter(prefix="/api/v1/assist", tags=["assist"]) VM112_ASSIST_API = os.getenv("VM112_ASSIST_API_URL", os.getenv("VM112_API_URL", "http://10.10.10.112:8090")) VM112_ASSIST_TOKEN = os.getenv("VM112_ASSIST_SERVICE_TOKEN", "") DESK_ASSIST_ENABLED = os.getenv("DESK_ASSIST_ENABLED", "true").lower() in ("1", "true", "yes") VM112_ASSIST_CALL = os.getenv("VM112_ASSIST_CALL_VM112", "false").lower() in ("1", "true", "yes") def _now() -> str: return datetime.now(timezone.utc).isoformat() def _main(): from app import main as m return m def _session_meta(conn, session_id: str) -> dict: m = _main() meta = assist_store.session_funnel_meta( conn, session_id, m.FUNNEL_EVENT_RANK, m.FUNNEL_STAGE_BY_RANK, m.ONBOARD_SOURCE ) if not meta.get("session_id"): raise HTTPException(404, "sessão não encontrada no funil") if meta["funnel_rank"] == 0 and not meta.get("last_event_at"): raise HTTPException(404, "sessão não encontrada no funil") return meta def _parse_payload(raw: str | None) -> dict: m = _main() return m._parse_payload(raw) def _build_session_view(conn, session_id: str, user: auth.DeskUser) -> dict: m = _main() meta = _session_meta(conn, session_id) assist = assist_store.get_active_assist(conn, session_id) ticket_row = assist_store.find_ticket_by_session(conn, session_id) ticket = None if ticket_row: full = conn.execute( f"SELECT {m.TICKET_COLUMNS}, session_id, assist_mode, assisted_by, assisted_at, client_paused FROM tickets WHERE id = ?", (int(ticket_row["id"]),), ).fetchone() if full: ticket = m._visible_ticket(m._enrich_ticket(full), user) view = { **meta, "assist_status": assist["status"] if assist else None, "assist_session_id": assist["id"] if assist else None, "assisted_by": assist.get("initiated_by_user") if assist else (ticket or {}).get("assisted_by"), "ticket_id": int(ticket_row["id"]) if ticket_row else None, "ticket_status": ticket_row["status"] if ticket_row else None, "assigned_to": ticket_row["assigned_to"] if ticket_row else None, "can_takeover": meta["can_escalate"] and can_assist_takeover(user.role) and not (assist and assist.get("status") == "active"), } if ticket: view["ticket"] = ticket return view def _call_vm112( method: str, path: str, json_body: dict | None = None, session_id: str | None = None, ) -> dict | None: if not VM112_ASSIST_CALL: return None headers: dict[str, str] = {} if VM112_ASSIST_TOKEN: headers["X-Desk-Assist-Token"] = VM112_ASSIST_TOKEN if session_id: headers["X-Onboarding-Session"] = session_id try: with httpx.Client(timeout=30.0) as client: r = client.request(method, f"{VM112_ASSIST_API.rstrip('/')}{path}", json=json_body, headers=headers) if r.status_code >= 400: return {"error": r.text[:200], "http_status": r.status_code} return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True} except Exception as exc: return {"error": str(exc)} @router.get("/sessions") def list_assist_sessions(user: auth.DeskUser = Depends(auth.get_current_user)): if not DESK_ASSIST_ENABLED: raise HTTPException(503, "assistência desactivada") if not can_read_assist(user.role): raise HTTPException(403, "permissão insuficiente") m = _main() with m.db() as conn: funnel = m._funnel_summary(conn, window_hours=48) session_ids = [s["session_id"] for s in funnel.get("active_sessions", []) if s.get("session_id")] assist_map = assist_store.get_assist_state_map(conn, session_ids) sessions = [] for item in funnel.get("active_sessions", []): sid = item.get("session_id") if not sid: continue assist = assist_map.get(sid) ticket_row = assist_store.find_ticket_by_session(conn, sid) status = "observing" if assist and assist.get("status") == "active": status = "assisting" elif ticket_row and ticket_row["status"] in ("escalated", "assisting"): status = ticket_row["status"] meta = assist_store.session_funnel_meta( conn, sid, m.FUNNEL_EVENT_RANK, m.FUNNEL_STAGE_BY_RANK, m.ONBOARD_SOURCE ) sessions.append({ **item, "assist_status": status, "assisted_by": assist.get("initiated_by_user") if assist else None, "assigned_to": ticket_row["assigned_to"] if ticket_row else None, "can_escalate": meta.get("can_escalate", False), }) return {"sessions": sessions, "window_hours": funnel.get("window_hours", 48)} @router.get("/sessions/{session_id}") def get_assist_session(session_id: str, user: auth.DeskUser = Depends(auth.get_current_user)): if not can_read_assist(user.role): raise HTTPException(403, "permissão insuficiente") sid = session_id.strip() m = _main() with m.db() as conn: view = _build_session_view(conn, sid, user) timeline = m._session_timeline(conn, sid) from app.funnel_timing import apply_module_timing enriched, timing_meta = apply_module_timing(timeline) view["timeline"] = enriched if timing_meta: view["timing"] = timing_meta actions = [] if view.get("assist_session_id"): rows = conn.execute( """ SELECT actor, action, payload, created_at FROM assist_actions WHERE assist_session_id = ? ORDER BY id ASC LIMIT 50 """, (view["assist_session_id"],), ).fetchall() actions = [ { "actor": r["actor"], "action": r["action"], "payload": _parse_payload(r["payload"]), "created_at": r["created_at"], } for r in rows ] view["actions"] = actions return view @router.post("/sessions/{session_id}/escalate") def escalate_session(session_id: str, user: auth.DeskUser = Depends(auth.get_current_user)): if not can_assist_takeover(user.role): raise HTTPException(403, "permissão insuficiente") sid = session_id.strip() now = _now() m = _main() with m.db() as conn: meta = _session_meta(conn, sid) if not meta["can_escalate"]: raise HTTPException(400, "escalada só após validação de domínio") active = assist_store.get_active_assist(conn, sid) open_assist = assist_store.get_open_assist(conn, sid) if active: raise HTTPException( 409, detail={ "message": "sessão já em assistência", "assisted_by": active.get("initiated_by_user"), }, ) if open_assist and open_assist.get("status") == "escalated" and open_assist.get("initiated_by_user") not in (None, user.username): raise HTTPException( 409, detail={ "message": "sessão já escalada por outro técnico", "assisted_by": open_assist.get("initiated_by_user"), }, ) ticket_id = assist_store.ensure_onboard_ticket(conn, sid, meta.get("domain")) conn.execute( """ UPDATE tickets SET status = 'escalated', assigned_to = ?, assigned_at = ?, session_id = ? WHERE id = ? """, (user.username, now, sid, ticket_id), ) if open_assist and open_assist.get("status") == "escalated": assist_id = int(open_assist["id"]) conn.execute( "UPDATE assist_sessions SET initiated_by_user = ?, initiated_by = 'technician' WHERE id = ?", (user.username, assist_id), ) else: cur = conn.execute( """ INSERT INTO assist_sessions (session_id, ticket_id, initiated_by, initiated_by_user, status, funnel_stage, domain, started_at) VALUES (?, ?, 'technician', ?, 'escalated', ?, ?, ?) """, (sid, ticket_id, user.username, meta.get("funnel_stage"), meta.get("domain"), now), ) assist_id = int(cur.lastrowid) assist_store.log_action(conn, assist_id, user.username, "escalate", {"source": "desk"}) conn.commit() view = _build_session_view(conn, sid, user) return {"status": "escalated", "ticket_id": ticket_id, "session": view} @router.post("/sessions/{session_id}/takeover") def takeover_session(session_id: str, user: auth.DeskUser = Depends(auth.get_current_user)): if not can_assist_takeover(user.role): raise HTTPException(403, "permissão insuficiente") sid = session_id.strip() now = _now() m = _main() with m.db() as conn: meta = _session_meta(conn, sid) if not meta["can_escalate"]: raise HTTPException(400, "takeover só após validação de domínio") active = assist_store.get_active_assist(conn, sid) open_assist = assist_store.get_open_assist(conn, sid) if active and active.get("initiated_by_user") not in (None, user.username): raise HTTPException( 409, detail={ "message": "sessão já assumida por outro técnico", "assisted_by": active.get("initiated_by_user"), }, ) ticket_id = assist_store.ensure_onboard_ticket(conn, sid, meta.get("domain")) token_hash = assist_store.hash_token(secrets.token_urlsafe(32)) if open_assist and open_assist.get("status") == "escalated": assist_id = int(open_assist["id"]) conn.execute( """ UPDATE assist_sessions SET status = 'active', initiated_by_user = ?, takeover_token_hash = ? WHERE id = ? """, (user.username, token_hash, assist_id), ) elif active: assist_id = int(active["id"]) else: cur = conn.execute( """ INSERT INTO assist_sessions (session_id, ticket_id, initiated_by, initiated_by_user, status, funnel_stage, domain, takeover_token_hash, started_at) VALUES (?, ?, 'technician', ?, 'active', ?, ?, ?, ?) """, (sid, ticket_id, user.username, meta.get("funnel_stage"), meta.get("domain"), token_hash, now), ) assist_id = int(cur.lastrowid) conn.execute( """ UPDATE tickets SET status = 'assisting', assigned_to = ?, assigned_at = ?, session_id = ?, assist_mode = 'asm', assisted_by = ?, assisted_at = ?, client_paused = 1 WHERE id = ? """, (user.username, now, sid, user.username, now, ticket_id), ) assist_store.log_action(conn, assist_id, user.username, "takeover", {"phase": "A"}) conn.commit() _call_vm112("POST", f"/onboarding/sessions/{sid}/pause", session_id=sid) vm112_result = _call_vm112( "POST", f"/onboarding/sessions/{sid}/takeover", {"technician": user.username}, session_id=sid, ) if vm112_result and vm112_result.get("takeover_url"): takeover_url = vm112_result["takeover_url"] else: base = os.getenv("VM112_WIZARD_URL", "https://onboard.ibytera.com") takeover_url = f"{base.rstrip('/')}/assist/{sid}?desk=1" return { "status": "assisting", "ticket_id": ticket_id, "takeover_url": takeover_url, "client_paused": True, "vm112": vm112_result, "note": "ASM activo — wizard VM112 pausado para o cliente", } @router.post("/sessions/{session_id}/handoff") def handoff_session(session_id: str, user: auth.DeskUser = Depends(auth.get_current_user)): if not can_assist_handoff(user.role, user.username): raise HTTPException(403, "permissão insuficiente") sid = session_id.strip() now = _now() m = _main() with m.db() as conn: active = assist_store.get_active_assist(conn, sid) if not active: raise HTTPException(404, "nenhuma assistência activa") if user.role == "technician" and active.get("initiated_by_user") not in (None, user.username): raise HTTPException(403, "sessão de outro técnico") ticket_row = assist_store.find_ticket_by_session(conn, sid) ticket_id = int(ticket_row["id"]) if ticket_row else active.get("ticket_id") conn.execute( "UPDATE assist_sessions SET status = 'ended', ended_at = ? WHERE id = ?", (now, int(active["id"])), ) if ticket_id: conn.execute( """ UPDATE tickets SET status = 'resolved', client_paused = 0, assist_mode = NULL WHERE id = ? """, (ticket_id,), ) assist_store.log_action(conn, int(active["id"]), user.username, "handoff", {}) conn.commit() _call_vm112("POST", f"/onboarding/sessions/{sid}/resume", session_id=sid) return {"status": "handoff", "ticket_id": ticket_id} @router.get("/sessions/{session_id}/links") def session_external_links(session_id: str, user: auth.DeskUser = Depends(auth.get_current_user)): if not can_read_assist(user.role): raise HTTPException(403, "permissão insuficiente") sid = session_id.strip() m = _main() with m.db() as conn: meta = _session_meta(conn, sid) return { "session_id": sid, "domain": meta.get("domain"), "links": assist_catalog.external_links(meta.get("domain")), } @router.get("/sessions/{session_id}/actions") def list_session_actions(session_id: str, user: auth.DeskUser = Depends(auth.get_current_user)): if not can_read_assist(user.role): raise HTTPException(403, "permissão insuficiente") sid = session_id.strip() m = _main() with m.db() as conn: meta = _session_meta(conn, sid) assist = assist_store.get_active_assist(conn, sid) ticket_row = assist_store.find_ticket_by_session(conn, sid) is_assisting = bool(assist and assist.get("status") == "active") or ( ticket_row and ticket_row["status"] == "assisting" ) actions = assist_catalog.list_actions_for_session( meta.get("funnel_stage"), is_assisting, user.role ) return {"session_id": sid, "actions": actions, "is_assisting": is_assisting} @router.post("/sessions/{session_id}/actions/{action_id}") def run_session_action( session_id: str, action_id: str, user: auth.DeskUser = Depends(auth.get_current_user), ): if not can_assist_takeover(user.role): raise HTTPException(403, "permissão insuficiente") sid = session_id.strip() action = action_id.strip() now = _now() m = _main() with m.db() as conn: meta = _session_meta(conn, sid) domain = (meta.get("domain") or "").strip() if not domain: raise HTTPException(400, "domínio não disponível na sessão") assist = assist_store.get_active_assist(conn, sid) open_assist = assist_store.get_open_assist(conn, sid) assist_id = int(assist["id"]) if assist else (int(open_assist["id"]) if open_assist else None) ticket_row = assist_store.find_ticket_by_session(conn, sid) ticket_payload = _parse_payload(ticket_row["payload"]) if ticket_row else {} is_assisting = bool(assist and assist.get("status") == "active") if action == assist_catalog.ABORT_ACTION: if user.role not in ("super_admin", "ops_lead"): raise HTTPException(403, "abortar só ops_lead+") if ticket_row: conn.execute( "UPDATE tickets SET status = 'closed', client_paused = 0, assist_mode = NULL WHERE id = ?", (int(ticket_row["id"]),), ) if assist: conn.execute( "UPDATE assist_sessions SET status = 'ended', ended_at = ? WHERE id = ?", (now, int(assist["id"])), ) if assist_id: assist_store.log_action(conn, assist_id, user.username, f"action.{action}", {"domain": domain}) conn.commit() _call_vm112("POST", f"/onboarding/sessions/{sid}/resume", session_id=sid) return {"status": "aborted", "action": action, "session_id": sid} if action == assist_catalog.MARK_STEP_ACTION: if not is_assisting: raise HTTPException(400, "acção só em modo assistindo") if assist_id: assist_store.log_action(conn, assist_id, user.username, f"action.{action}", {"domain": domain}) conn.commit() return {"status": "ok", "action": action, "note": "passo marcado no audit log"} if action not in assist_catalog.DESK_ACTIONS: raise HTTPException(400, f"acção inválida: {action}") spec = assist_catalog.DESK_ACTIONS[action] if assist_catalog.funnel_rank(meta.get("funnel_stage")) < spec["min_rank"]: raise HTTPException(400, "etapa do funil insuficiente para esta acção") if action not in ("dns.revalidate", "account.retry_sync") and not is_assisting: raise HTTPException(400, "assuma a sessão antes desta acção") method, path, body = assist_catalog.build_vm112_request(action, domain, ticket_payload) vm112_result = _call_vm112(method, path, body, session_id=sid) if not assist_id: ticket_id = assist_store.ensure_onboard_ticket(conn, sid, domain) cur = conn.execute( """ INSERT INTO assist_sessions (session_id, ticket_id, initiated_by, initiated_by_user, status, funnel_stage, domain, started_at) VALUES (?, ?, 'technician', ?, 'escalated', ?, ?, ?) """, (sid, ticket_id, user.username, meta.get("funnel_stage"), domain, now), ) assist_id = int(cur.lastrowid) assist_store.log_action( conn, assist_id, user.username, f"action.{action}", {"domain": domain, "vm112": vm112_result}, ) conn.commit() if vm112_result is None and not VM112_ASSIST_CALL: return { "status": "logged", "action": action, "note": "VM112_ASSIST_CALL_VM112=false — acção registada no audit", } if vm112_result and vm112_result.get("http_status", 0) >= 400: raise HTTPException(502, detail={"message": "VM112 rejeitou acção", "vm112": vm112_result}) return {"status": "ok", "action": action, "vm112": vm112_result} @router.get("/technicians/ranking") def technicians_ranking( window_days: int = 30, user: auth.DeskUser = Depends(auth.get_current_user), ): if not can_read_assist(user.role): raise HTTPException(403, "permissão insuficiente") if user.role == "noc": raise HTTPException(403, "permissão insuficiente") days = max(1, min(window_days, 365)) m = _main() with m.db() as conn: ranking = assist_catalog.technician_ranking(conn, window_days=days) return {"window_days": days, "ranking": ranking, "total": len(ranking)} def process_assist_started(conn, body, now: str) -> dict: m = _main() sid = (body.session_id or "").strip() technician = (body.data or {}).get("technician") meta = assist_store.session_funnel_meta( conn, sid, m.FUNNEL_EVENT_RANK, m.FUNNEL_STAGE_BY_RANK, m.ONBOARD_SOURCE ) ticket_id = assist_store.ensure_onboard_ticket(conn, sid, body.domain or meta.get("domain")) conn.execute( """ UPDATE tickets SET status = 'assisting', session_id = ?, assist_mode = 'asm', assisted_by = ?, assisted_at = ?, client_paused = 1 WHERE id = ? """, (sid, technician, now, ticket_id), ) return {"handled": True, "ticket_id": ticket_id, "session_id": sid} def process_assist_ended(conn, body, now: str) -> dict: sid = (body.session_id or "").strip() ticket_row = assist_store.find_ticket_by_session(conn, sid) if ticket_row: conn.execute( "UPDATE tickets SET status = 'resolved', client_paused = 0, assist_mode = NULL WHERE id = ?", (int(ticket_row["id"]),), ) return {"handled": True, "ticket_id": int(ticket_row["id"]), "session_id": sid} return {"handled": False, "session_id": sid} def process_escalation_webhook(conn, body, now: str) -> dict: """Handle onboarding.escalated / onboarding.failed from VM112.""" m = _main() sid = (body.session_id or "").strip() if not sid: return {"handled": False, "reason": "missing session_id"} meta = assist_store.session_funnel_meta( conn, sid, m.FUNNEL_EVENT_RANK, m.FUNNEL_STAGE_BY_RANK, m.ONBOARD_SOURCE ) ticket_id = assist_store.ensure_onboard_ticket(conn, sid, body.domain or meta.get("domain")) conn.execute( "UPDATE tickets SET status = 'escalated', session_id = ? WHERE id = ?", (sid, ticket_id), ) if not assist_store.get_open_assist(conn, sid): cur = conn.execute( """ INSERT INTO assist_sessions (session_id, ticket_id, initiated_by, initiated_by_user, status, funnel_stage, domain, started_at) VALUES (?, ?, 'client', NULL, 'escalated', ?, ?, ?) """, (sid, ticket_id, meta.get("funnel_stage"), body.domain or meta.get("domain"), now), ) assist_store.log_action(conn, int(cur.lastrowid), "client", "escalate", {"event": body.event}) return {"handled": True, "ticket_id": ticket_id, "session_id": sid}