"""Segurança wizard VM112 — telemetria Spec 021.""" from __future__ import annotations import json import re from datetime import datetime, timedelta, timezone from typing import Any SECURITY_SOURCE = "vm112-security" SECURITY_PREFIX = "security." VM112_TENANT_ID = 1 AUTO_TICKET_EVENTS = frozenset({ "security.input_blocked", "security.handoff_rejected", "security.session_anomaly", }) SEVERITY_BY_EVENT = { "security.csp_violation": "warn", "security.input_warn": "info", "security.input_blocked": "high", "security.rate_limited": "warn", "security.handoff_created": "info", "security.handoff_consumed": "info", "security.handoff_rejected": "high", "security.handoff_expired": "info", "security.auth_failed": "warn", "security.session_anomaly": "high", } FORBIDDEN_PAYLOAD_KEYS = frozenset({ "password", "root_password", "new_password", "current_password", "handoff_token", "token", "secret", }) SQLI_PATTERNS = [ re.compile(r"'\s*or\s+", re.I), re.compile(r"union\s+select", re.I), re.compile(r";\s*drop\s+", re.I), re.compile(r"1\s*=\s*1", re.I), re.compile(r"--\s*$"), ] XSS_PATTERNS = [ re.compile(r"<\s*script", re.I), re.compile(r"javascript\s*:", re.I), re.compile(r"onerror\s*=", re.I), re.compile(r"onload\s*=", re.I), ] PATH_PATTERNS = [ re.compile(r"\.\./"), re.compile(r"%2e%2e", re.I), ] def _now() -> str: return datetime.now(timezone.utc).isoformat() def _parse_payload(raw: str | None) -> dict: if not raw: return {} try: return json.loads(raw) except json.JSONDecodeError: return {} def _scrub_data(data: dict | None) -> dict: if not isinstance(data, dict): return {} out: dict[str, Any] = {} for key, val in data.items(): if key.lower() in FORBIDDEN_PAYLOAD_KEYS: continue if isinstance(val, str) and len(val) > 500: out[key] = val[:500] + "…" else: out[key] = val return out def is_security_event(event: str) -> bool: return bool(event) and event.startswith(SECURITY_PREFIX) def audit_field_value(value: str, *, field: str = "") -> dict[str, Any]: """Heurística local (VM122) — espelho do middleware VM112.""" text = (value or "").strip() if not text: return {"ok": True} if len(text) > 2000: return {"ok": False, "reason": "oversize", "pattern_id": "field_too_long", "severity": "high"} for pat in SQLI_PATTERNS: if pat.search(text): return {"ok": False, "reason": "sql_injection_pattern", "pattern_id": pat.pattern[:40], "severity": "high"} for pat in XSS_PATTERNS: if pat.search(text): return {"ok": False, "reason": "xss_pattern", "pattern_id": pat.pattern[:40], "severity": "high"} for pat in PATH_PATTERNS: if pat.search(text): return {"ok": False, "reason": "path_traversal", "pattern_id": pat.pattern[:40], "severity": "high"} return {"ok": True} def _enrich_row(row) -> dict[str, Any]: payload = _parse_payload(row["payload"]) data = payload.get("data") or {} return { "id": row["id"], "event_type": row["event_type"], "source": row["source"], "created_at": row["created_at"], "session_id": payload.get("session_id"), "domain": payload.get("domain"), "severity": data.get("severity") or SEVERITY_BY_EVENT.get(row["event_type"], "info"), "client_ip": data.get("client_ip") or payload.get("ingress_client_ip"), "endpoint": data.get("endpoint"), "reason": data.get("reason"), "payload": payload, } def ingest_event( conn, *, event: str, session_id: str | None = None, domain: str | None = None, data: dict | None = None, client_ip: str | None = None, ) -> dict[str, Any]: if not is_security_event(event): raise ValueError(f"not a security event: {event}") now = _now() clean_data = _scrub_data(data) if client_ip and not clean_data.get("client_ip"): clean_data["client_ip"] = client_ip if "severity" not in clean_data: clean_data["severity"] = SEVERITY_BY_EVENT.get(event, "info") stored = { "event": event, "source": SECURITY_SOURCE, "session_id": session_id, "domain": domain, "data": clean_data, } if client_ip: stored["ingress_client_ip"] = client_ip payload = json.dumps(stored, ensure_ascii=False) cur = conn.execute( "INSERT INTO webhook_events (event_type, source, payload, created_at) VALUES (?,?,?,?)", (event, SECURITY_SOURCE, payload, now), ) event_id = int(cur.lastrowid) ticket_id = None if event in AUTO_TICKET_EVENTS: domain_label = domain or "sem domínio" subject = f"[security] {domain_label} — {event.replace('security.', '')}" cur2 = conn.execute( """ INSERT INTO tickets (tenant_id, subject, status, payload, created_at, session_id) VALUES (?, ?, 'escalated', ?, ?, ?) """, (VM112_TENANT_ID, subject, payload, now, session_id), ) ticket_id = int(cur2.lastrowid) conn.commit() return { "accepted": True, "event_id": event_id, "event": event, "ticket_id": ticket_id, } def ingest_csp_report(conn, body: dict, client_ip: str | None = None) -> dict[str, Any]: report = body.get("csp-report") or body.get("csp_report") or body if not isinstance(report, dict): report = {} data = { "document_uri": report.get("document-uri") or report.get("document_uri"), "violated_directive": report.get("violated-directive") or report.get("violated_directive"), "blocked_uri": report.get("blocked-uri") or report.get("blocked_uri"), "source_file": report.get("source-file") or report.get("source_file"), "line_number": report.get("line-number") or report.get("line_number"), "severity": "warn", "client_ip": client_ip, } return ingest_event( conn, event="security.csp_violation", data=data, client_ip=client_ip, ) def build_summary(conn, *, window_hours: int = 24) -> dict[str, Any]: cutoff = (datetime.now(timezone.utc) - timedelta(hours=window_hours)).isoformat() rows = conn.execute( """ SELECT event_type, payload, created_at FROM webhook_events WHERE source = ? AND created_at >= ? ORDER BY id DESC """, (SECURITY_SOURCE, cutoff), ).fetchall() counts: dict[str, int] = {} sessions: set[str] = set() for row in rows: counts[row["event_type"]] = counts.get(row["event_type"], 0) + 1 p = _parse_payload(row["payload"]) sid = (p.get("session_id") or "").strip() if sid: sessions.add(sid) recent = list_events(conn, limit=8, offset=0, window_hours=window_hours)["events"] return { "window_hours": window_hours, "total": len(rows), "csp_violations": counts.get("security.csp_violation", 0), "inputs_blocked": counts.get("security.input_blocked", 0), "inputs_warn": counts.get("security.input_warn", 0), "handoffs_rejected": counts.get("security.handoff_rejected", 0), "rate_limited": counts.get("security.rate_limited", 0), "sessions_with_alerts": len(sessions), "by_event": counts, "recent": recent, "enabled": True, } def list_events( conn, *, limit: int = 100, offset: int = 0, window_hours: int = 168, session_id: str | None = None, ) -> dict[str, Any]: limit = max(1, min(int(limit), 500)) offset = max(0, int(offset)) cutoff = (datetime.now(timezone.utc) - timedelta(hours=window_hours)).isoformat() if session_id: rows = conn.execute( """ SELECT id, event_type, source, payload, created_at FROM webhook_events WHERE source = ? AND created_at >= ? AND payload LIKE ? ORDER BY id DESC LIMIT ? OFFSET ? """, (SECURITY_SOURCE, cutoff, f'%"{session_id}"%', limit, offset), ).fetchall() total = conn.execute( """ SELECT COUNT(*) FROM webhook_events WHERE source = ? AND created_at >= ? AND payload LIKE ? """, (SECURITY_SOURCE, cutoff, f'%"{session_id}"%',), ).fetchone()[0] else: rows = conn.execute( """ SELECT id, event_type, source, payload, created_at FROM webhook_events WHERE source = ? AND created_at >= ? ORDER BY id DESC LIMIT ? OFFSET ? """, (SECURITY_SOURCE, cutoff, limit, offset), ).fetchall() total = conn.execute( "SELECT COUNT(*) FROM webhook_events WHERE source = ? AND created_at >= ?", (SECURITY_SOURCE, cutoff), ).fetchone()[0] return { "events": [_enrich_row(r) for r in rows], "total": int(total), "limit": limit, "offset": offset, "window_hours": window_hours, }