import os import time import httpx import redis from audit_runner import run_cycle REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") VM112_API = os.getenv("VM112_API_URL", "http://10.10.10.112:8090") OPS_API_URL = os.getenv("OPS_API_URL", "http://api:8080") OPS_INTERNAL_TOKEN = os.getenv("OPS_INTERNAL_TOKEN", "") WORKER_INTERVAL = int(os.getenv("WORKER_INTERVAL", "120")) AUDIT_INTERVAL_SEC = int(os.getenv("AUDIT_INTERVAL_SEC", "600")) LEAD_SYNC_INTERVAL_SEC = int(os.getenv("LEAD_SYNC_INTERVAL_SEC", "900")) WEBHOOK_GAP_ALERT_MIN = int(os.getenv("WEBHOOK_GAP_ALERT_MIN", "15")) AGENTIC_INTERVAL_SEC = int(os.getenv("AGENTIC_INTERVAL_SEC", "300")) OPS_NTFY_TOPIC = os.getenv("DESK_OPS_NTFY_TOPIC", "").strip() def sync_stale_leads() -> None: if not OPS_INTERNAL_TOKEN: return try: with httpx.Client(timeout=30.0) as client: response = client.post( f"{OPS_API_URL}/api/v1/crm/leads/sync", headers={"X-Ops-Internal-Token": OPS_INTERNAL_TOKEN}, ) print(f"[worker] leads sync {response.status_code}: {response.text[:160]}", flush=True) except Exception as exc: print(f"[worker] leads sync ERROR: {exc}", flush=True) def poll_vm112() -> None: try: with httpx.Client(timeout=10.0) as client: response = client.get(f"{VM112_API}/api/onboarding/health") print(f"[worker] vm112 {response.status_code}: {response.text[:120]}", flush=True) except Exception as exc: print(f"[worker] vm112 ERROR: {exc}", flush=True) def agentic_tick() -> None: """Spec 029 — run all agent scenarios (T0 checks + T1 advisor).""" if not OPS_INTERNAL_TOKEN: return try: with httpx.Client(timeout=180.0) as client: response = client.post( f"{OPS_API_URL}/api/v1/agents/internal/tick", headers={"X-Ops-Internal-Token": OPS_INTERNAL_TOKEN}, ) print(f"[worker] agentic tick {response.status_code}: {response.text[:200]}", flush=True) except Exception as exc: print(f"[worker] agentic tick ERROR: {exc}", flush=True) def check_integration_gap() -> None: if not OPS_INTERNAL_TOKEN: return try: with httpx.Client(timeout=15.0) as client: response = client.get( f"{OPS_API_URL}/api/v1/integrations/health", headers={"X-Ops-Internal-Token": OPS_INTERNAL_TOKEN}, ) if response.status_code != 200: print(f"[worker] integration health {response.status_code}", flush=True) return report = response.json() gap = (report.get("vm112_onboard") or {}).get("gap_minutes") alerts = report.get("alerts") or [] if gap is not None and gap > WEBHOOK_GAP_ALERT_MIN: print(f"[worker] ALERT webhook gap {int(gap)}min", flush=True) if OPS_NTFY_TOPIC: try: import urllib.request body = f"Sem webhook VM112 ha {int(gap)} min" req = urllib.request.Request( f"https://ntfy.sh/{OPS_NTFY_TOPIC}", data=body.encode("utf-8"), method="POST", headers={"Title": "Ligbox Ops - integration.gap", "Priority": "high"}, ) urllib.request.urlopen(req, timeout=8) except Exception as exc: print(f"[worker] ntfy gap alert ERROR: {exc}", flush=True) for alert in alerts: if alert.get("level") == "critical": print(f"[worker] CRITICAL {alert.get('message')}", flush=True) except Exception as exc: print(f"[worker] integration gap ERROR: {exc}", flush=True) def main() -> None: redis_client = redis.from_url(REDIS_URL) print("[worker] started", flush=True) last_audit = 0.0 last_lead_sync = 0.0 last_agentic = 0.0 while True: event = redis_client.rpop("ops:events") if event: print(f"[worker] event={event.decode()}", flush=True) poll_vm112() now = time.time() if now - last_audit >= AUDIT_INTERVAL_SEC: run_cycle() last_audit = now if now - last_lead_sync >= LEAD_SYNC_INTERVAL_SEC: sync_stale_leads() check_integration_gap() last_lead_sync = now if now - last_agentic >= AGENTIC_INTERVAL_SEC: agentic_tick() last_agentic = now time.sleep(WORKER_INTERVAL) if __name__ == "__main__": main()