"""Abandoned onboarding → Lead CRM — Spec 012 Phase B.""" from __future__ import annotations import json import os import sqlite3 from datetime import datetime, timedelta, timezone from typing import Any ONBOARD_STALE_HOURS = int(os.getenv("ONBOARD_STALE_HOURS", "24")) ONBOARD_SOURCE = "vm112-onboard" FUNNEL_EVENT_RANK = { "onboarding.started": 1, "domain.validated": 2, "dns.applied": 3, "account.created": 4, "infra.synced": 5, "onboarding.completed": 6, "onboarding.failed": 99, } FUNNEL_STAGE_BY_RANK = { 1: "started", 2: "domain_validated", 3: "dns_applied", 4: "account_created", 5: "infra_synced", 6: "completed", 99: "failed", } LEAD_PROMOTE_STATUSES = frozenset({"open", "escalated"}) SKIP_STAGES = frozenset({"completed", "failed"}) def _parse_payload(raw: str | None) -> dict: if not raw: return {} try: return json.loads(raw) except json.JSONDecodeError: return {} def _now() -> str: return datetime.now(timezone.utc).isoformat() def session_funnel_state(conn: sqlite3.Connection, session_id: str) -> dict[str, Any]: sid = (session_id or "").strip() if not sid: return {"stage": "unknown", "last_event_at": None, "failed": False} rows = conn.execute( """ SELECT event_type, payload, created_at FROM webhook_events WHERE source = ? ORDER BY id ASC """, (ONBOARD_SOURCE,), ).fetchall() max_rank = 0 last_event_at = None domain = None failed = False for row in rows: payload = _parse_payload(row["payload"]) if (payload.get("session_id") or "").strip() != sid: continue if payload.get("domain"): domain = payload.get("domain") last_event_at = row["created_at"] if row["event_type"] == "onboarding.failed": failed = True max_rank = max(max_rank, 99) else: rank = FUNNEL_EVENT_RANK.get(row["event_type"], 0) if rank > max_rank and not failed: max_rank = rank if failed: stage = "failed" else: stage = FUNNEL_STAGE_BY_RANK.get(max_rank, "started") return { "stage": stage, "last_event_at": last_event_at, "failed": failed, "domain": domain, } def is_session_stale(last_event_at: str | None, stage: str, stale_hours: int) -> bool: if not last_event_at or stage in SKIP_STAGES: return False cutoff = (datetime.now(timezone.utc) - timedelta(hours=stale_hours)).isoformat() return last_event_at < cutoff def promote_stale_leads(conn: sqlite3.Connection, stale_hours: int | None = None) -> dict[str, Any]: hours = stale_hours if stale_hours is not None else ONBOARD_STALE_HOURS now = _now() promoted_ids: list[int] = [] rows = conn.execute( """ SELECT id, status, session_id, subject, payload, created_at FROM tickets WHERE session_id IS NOT NULL AND session_id != '' ORDER BY id DESC LIMIT 500 """ ).fetchall() for row in rows: payload = _parse_payload(row["payload"]) crm_track = payload.get("crm_track") or "onboarding" if crm_track != "onboarding": continue if row["status"] not in LEAD_PROMOTE_STATUSES: continue sid = (row["session_id"] or payload.get("session_id") or "").strip() if not sid: continue meta = session_funnel_state(conn, sid) if not is_session_stale(meta.get("last_event_at"), meta.get("stage", ""), hours): continue payload["crm_track"] = "lead" payload["lead_detected_at"] = now payload["lead_reason"] = "stale_session" payload["lead_funnel_stage"] = meta.get("stage") payload["lead_last_event_at"] = meta.get("last_event_at") conn.execute( "UPDATE tickets SET payload = ? WHERE id = ?", (json.dumps(payload), int(row["id"])), ) promoted_ids.append(int(row["id"])) if promoted_ids: conn.commit() return { "promoted": len(promoted_ids), "ticket_ids": promoted_ids, "stale_hours": hours, "ran_at": now, } def lead_from_ticket(row: sqlite3.Row, conn: sqlite3.Connection | None = None) -> dict[str, Any]: payload = _parse_payload(row["payload"]) sid = (row["session_id"] or payload.get("session_id") or "").strip() stage = payload.get("lead_funnel_stage") last_event = payload.get("lead_last_event_at") domain = payload.get("domain") if conn and sid and (not stage or not last_event): meta = session_funnel_state(conn, sid) stage = stage or meta.get("stage") last_event = last_event or meta.get("last_event_at") domain = domain or meta.get("domain") email = payload.get("account_email") or (payload.get("data") or {}).get("email") return { "ticket_id": int(row["id"]), "session_id": sid, "domain": domain, "email": email, "subject": row["subject"], "status": row["status"], "crm_track": payload.get("crm_track"), "lead_detected_at": payload.get("lead_detected_at"), "lead_reason": payload.get("lead_reason"), "funnel_stage": stage, "last_event_at": last_event, "created_at": row["created_at"], "assigned_to": row["assigned_to"] if "assigned_to" in row.keys() else None, } def list_leads(conn: sqlite3.Connection, limit: int = 100) -> list[dict[str, Any]]: rows = conn.execute( """ SELECT id, status, session_id, subject, payload, created_at, assigned_to FROM tickets ORDER BY id DESC LIMIT 500 """ ).fetchall() leads = [] for row in rows: payload = _parse_payload(row["payload"]) if payload.get("crm_track") != "lead": continue leads.append(lead_from_ticket(row, conn)) if len(leads) >= limit: break leads.sort(key=lambda x: x.get("lead_detected_at") or "", reverse=True) return leads def count_leads(conn: sqlite3.Connection) -> int: rows = conn.execute("SELECT payload FROM tickets").fetchall() return sum(1 for row in rows if _parse_payload(row["payload"]).get("crm_track") == "lead")