57 lines
1.5 KiB
Python
57 lines
1.5 KiB
Python
"""Cliente webhook segurança VM112 → VM122 (Spec 021)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
log = logging.getLogger("wizard.security")
|
|
|
|
WEBHOOK_URL = os.environ.get(
|
|
"DESK_SECURITY_WEBHOOK_URL",
|
|
"https://desk.ligbox.com.br/api/v1/webhooks/security",
|
|
)
|
|
WEBHOOK_SECRET = os.environ.get("DESK_WEBHOOK_SECRET", "")
|
|
|
|
|
|
def emit_security_event(
|
|
event: str,
|
|
session_id: str | None = None,
|
|
domain: str | None = None,
|
|
data: dict | None = None,
|
|
) -> bool:
|
|
if not event.startswith("security."):
|
|
log.warning("ignored non-security event: %s", event)
|
|
return False
|
|
if not WEBHOOK_SECRET:
|
|
log.warning("DESK_WEBHOOK_SECRET not set — skip %s", event)
|
|
return False
|
|
|
|
body = json.dumps({
|
|
"event": event,
|
|
"session_id": session_id,
|
|
"domain": domain,
|
|
"data": data or {},
|
|
}).encode("utf-8")
|
|
|
|
req = urllib.request.Request(
|
|
WEBHOOK_URL,
|
|
data=body,
|
|
method="POST",
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"X-Webhook-Secret": WEBHOOK_SECRET,
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
ok = 200 <= resp.status < 300
|
|
if not ok:
|
|
log.error("security webhook HTTP %s for %s", resp.status, event)
|
|
return ok
|
|
except urllib.error.URLError as exc:
|
|
log.error("security webhook failed %s: %s", event, exc)
|
|
return False
|