- deploy/vm112-wizard: main-wizard DomainAdmin route, clientSettings, FinishToolbar - deploy/vm122-desk: card Ligbox Datacenter Node VM001, audit sync - Spec 025: secao Passo Concluido CTAs - KB: Portal de gerenciamento reabria wizard Concluido
518 lines
16 KiB
Python
518 lines
16 KiB
Python
"""SQLite persistence for audit domains and checks."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from app.collectors.base import CHECK_LABELS
|
|
|
|
ONBOARD_DOMAIN_EVENTS = frozenset({
|
|
"onboarding.started",
|
|
"domain.validated",
|
|
"dns.applied",
|
|
"account.created",
|
|
"infra.synced",
|
|
"onboarding.completed",
|
|
"onboarding.escalated",
|
|
"onboarding.failed",
|
|
"webmail.released",
|
|
})
|
|
TENANT_ONBOARD = 1
|
|
|
|
TENANT_WEBHOOK_SOURCE = {
|
|
1: "vm112-onboard",
|
|
2: "wazuh",
|
|
}
|
|
|
|
FUNNEL_EVENT_RANK = {
|
|
"onboarding.started": 1,
|
|
"domain.validated": 2,
|
|
"dns.applied": 3,
|
|
"account.created": 4,
|
|
"infra.synced": 5,
|
|
"onboarding.completed": 6,
|
|
"company.validated": 7,
|
|
"webmail.released": 8,
|
|
"onboarding.failed": 99,
|
|
}
|
|
|
|
FUNNEL_STAGE_BY_RANK = {
|
|
1: "started",
|
|
2: "domain_validated",
|
|
3: "dns_applied",
|
|
4: "account_created",
|
|
5: "infra_synced",
|
|
6: "completed",
|
|
7: "company_validated",
|
|
8: "webmail_released",
|
|
99: "failed",
|
|
}
|
|
|
|
FUNNEL_STAGE_LABELS = {
|
|
"started": "Iniciado",
|
|
"domain_validated": "Domínio OK",
|
|
"dns_applied": "DNS aplicado",
|
|
"account_created": "Conta criada",
|
|
"infra_synced": "Infra sync",
|
|
"completed": "Concluído",
|
|
"company_validated": "Empresa validada",
|
|
"webmail_released": "Webmail liberado",
|
|
"failed": "Falhou",
|
|
"registered": "Registado",
|
|
"unknown": "Sem dados",
|
|
}
|
|
|
|
STATUS_RANK = {"pass": 0, "skip": 1, "warn": 2, "error": 3, "fail": 4}
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _parse_payload(raw: str | None) -> dict:
|
|
if not raw:
|
|
return {}
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
def init_audit_schema(conn: sqlite3.Connection) -> None:
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS audit_domains (
|
|
id INTEGER PRIMARY KEY,
|
|
tenant_id INTEGER NOT NULL,
|
|
domain TEXT NOT NULL,
|
|
source TEXT NOT NULL DEFAULT 'onboarding',
|
|
created_at TEXT NOT NULL,
|
|
UNIQUE(tenant_id, domain)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS audit_checks (
|
|
id INTEGER PRIMARY KEY,
|
|
tenant_id INTEGER NOT NULL,
|
|
domain TEXT NOT NULL,
|
|
check_id TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
message TEXT,
|
|
evidence TEXT,
|
|
checked_at TEXT NOT NULL,
|
|
UNIQUE(tenant_id, domain, check_id)
|
|
);
|
|
""")
|
|
|
|
|
|
def sync_domains_from_webhooks(conn: sqlite3.Connection) -> int:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT event_type, payload FROM webhook_events
|
|
WHERE source = 'vm112-onboard'
|
|
ORDER BY id DESC LIMIT 500
|
|
"""
|
|
).fetchall()
|
|
added = 0
|
|
now = _now()
|
|
seen: set[tuple[int, str]] = set()
|
|
for row in rows:
|
|
if row["event_type"] not in ONBOARD_DOMAIN_EVENTS:
|
|
continue
|
|
payload = _parse_payload(row["payload"])
|
|
domain = (payload.get("domain") or "").strip().lower()
|
|
if not domain or len(domain) < 3:
|
|
continue
|
|
key = (TENANT_ONBOARD, domain)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO audit_domains (tenant_id, domain, source, created_at)
|
|
VALUES (?, ?, 'onboarding', ?)
|
|
""",
|
|
(TENANT_ONBOARD, domain, now),
|
|
)
|
|
if cur.rowcount:
|
|
added += 1
|
|
conn.commit()
|
|
return added
|
|
|
|
|
|
def list_audit_domains(conn: sqlite3.Connection, tenant_id: int | None = None) -> list[dict]:
|
|
if tenant_id:
|
|
rows = conn.execute(
|
|
"SELECT tenant_id, domain, source, created_at FROM audit_domains WHERE tenant_id = ? ORDER BY domain",
|
|
(tenant_id,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT tenant_id, domain, source, created_at FROM audit_domains ORDER BY tenant_id, domain"
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def upsert_check(
|
|
conn: sqlite3.Connection,
|
|
tenant_id: int,
|
|
domain: str,
|
|
check_id: str,
|
|
status: str,
|
|
message: str,
|
|
evidence: dict | None,
|
|
checked_at: str | None = None,
|
|
) -> None:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO audit_checks (tenant_id, domain, check_id, status, message, evidence, checked_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(tenant_id, domain, check_id) DO UPDATE SET
|
|
status = excluded.status,
|
|
message = excluded.message,
|
|
evidence = excluded.evidence,
|
|
checked_at = excluded.checked_at
|
|
""",
|
|
(
|
|
tenant_id,
|
|
domain.lower(),
|
|
check_id,
|
|
status,
|
|
message,
|
|
json.dumps(evidence or {}),
|
|
checked_at or _now(),
|
|
),
|
|
)
|
|
|
|
|
|
def get_checks(conn: sqlite3.Connection, tenant_id: int, domain: str) -> list[dict]:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT check_id, status, message, evidence, checked_at
|
|
FROM audit_checks WHERE tenant_id = ? AND domain = ?
|
|
ORDER BY check_id
|
|
""",
|
|
(tenant_id, domain.lower()),
|
|
).fetchall()
|
|
out = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
item["label"] = CHECK_LABELS.get(item["check_id"], item["check_id"])
|
|
item["evidence"] = _parse_payload(item.get("evidence"))
|
|
out.append(item)
|
|
return out
|
|
|
|
|
|
def aggregate_score(checks: list[dict]) -> dict[str, Any]:
|
|
total = len(CHECK_LABELS)
|
|
counts = {"pass": 0, "warn": 0, "fail": 0, "error": 0, "skip": 0}
|
|
worst = "pass"
|
|
for c in checks:
|
|
st = c.get("status") or "skip"
|
|
counts[st] = counts.get(st, 0) + 1
|
|
if STATUS_RANK.get(st, 0) > STATUS_RANK.get(worst, 0):
|
|
worst = st
|
|
if worst in ("fail", "error"):
|
|
overall = "critical"
|
|
elif worst == "warn":
|
|
overall = "degraded"
|
|
elif checks:
|
|
overall = "healthy"
|
|
else:
|
|
overall = "unknown"
|
|
return {
|
|
"pass": counts.get("pass", 0),
|
|
"warn": counts.get("warn", 0),
|
|
"fail": counts.get("fail", 0),
|
|
"error": counts.get("error", 0),
|
|
"skip": counts.get("skip", 0),
|
|
"total": total,
|
|
"overall_status": overall,
|
|
}
|
|
|
|
|
|
def tenant_overview(conn: sqlite3.Connection, tenant_id: int, name: str, ip: str) -> dict:
|
|
if tenant_id == 2:
|
|
from app.modules import store as module_store
|
|
|
|
if module_store.is_module_enabled("wazuh-soc"):
|
|
from app.wazuh_soc_store import wazuh_tenant_overview
|
|
|
|
return wazuh_tenant_overview(conn, tenant_id, name, ip)
|
|
domains = list_audit_domains(conn, tenant_id)
|
|
if not domains:
|
|
return {
|
|
"tenant_id": tenant_id,
|
|
"name": name,
|
|
"ip": ip,
|
|
"status": "unknown",
|
|
"score": {"pass": 0, "warn": 0, "fail": 0, "total": 8},
|
|
"domains_count": 0,
|
|
"last_audit_at": None,
|
|
"top_issues": [],
|
|
}
|
|
|
|
all_checks: list[dict] = []
|
|
last_audit = None
|
|
top_issues: list[dict] = []
|
|
domain_scores: list[dict] = []
|
|
for d in domains:
|
|
checks = get_checks(conn, tenant_id, d["domain"])
|
|
if not checks:
|
|
continue
|
|
all_checks.extend(checks)
|
|
domain_scores.append(aggregate_score(checks))
|
|
for c in checks:
|
|
if c["checked_at"] and (not last_audit or c["checked_at"] > last_audit):
|
|
last_audit = c["checked_at"]
|
|
if c["status"] in ("fail", "error", "warn"):
|
|
top_issues.append({
|
|
"domain": d["domain"],
|
|
"check_id": c["check_id"],
|
|
"status": c["status"],
|
|
"message": c.get("message"),
|
|
})
|
|
|
|
if domain_scores:
|
|
worst = max(domain_scores, key=lambda s: STATUS_RANK.get(s["overall_status"], 0))
|
|
score = worst
|
|
else:
|
|
score = aggregate_score(all_checks)
|
|
return {
|
|
"tenant_id": tenant_id,
|
|
"name": name,
|
|
"ip": ip,
|
|
"status": score["overall_status"],
|
|
"score": {
|
|
"pass": score["pass"],
|
|
"warn": score["warn"],
|
|
"fail": score["fail"] + score["error"],
|
|
"total": score["total"],
|
|
},
|
|
"domains_count": len(domains),
|
|
"last_audit_at": last_audit,
|
|
"top_issues": top_issues[:5],
|
|
}
|
|
|
|
|
|
def build_overview(conn: sqlite3.Connection) -> dict:
|
|
tenants = conn.execute("SELECT id, name, ip FROM tenants ORDER BY id").fetchall()
|
|
return {
|
|
"generated_at": _now(),
|
|
"tenants": [tenant_overview(conn, t["id"], t["name"], t["ip"]) for t in tenants],
|
|
}
|
|
|
|
|
|
def scorecard(conn: sqlite3.Connection, tenant_id: int, domain: str) -> dict:
|
|
domain = domain.lower().strip()
|
|
checks = get_checks(conn, tenant_id, domain)
|
|
score = aggregate_score(checks)
|
|
return {
|
|
"tenant_id": tenant_id,
|
|
"domain": domain,
|
|
"checked_at": max((c["checked_at"] for c in checks), default=None),
|
|
"overall_status": score["overall_status"],
|
|
"checks": checks,
|
|
}
|
|
|
|
|
|
def _extract_client_ip(payload: dict, data: dict | None = None) -> str | None:
|
|
data = data or {}
|
|
for key in ("client_ip", "user_ip", "remote_ip", "srcip", "ip", "agent_ip"):
|
|
val = data.get(key) or payload.get(key)
|
|
if val:
|
|
return str(val)
|
|
ingress = payload.get("ingress_client_ip")
|
|
return str(ingress) if ingress else None
|
|
|
|
|
|
def _funnel_stage_from_events(events: list[dict]) -> str:
|
|
best_rank = 0
|
|
for ev in events:
|
|
rank = FUNNEL_EVENT_RANK.get(ev.get("event") or "", 0)
|
|
if rank > best_rank:
|
|
best_rank = rank
|
|
if best_rank:
|
|
return FUNNEL_STAGE_BY_RANK.get(best_rank, "unknown")
|
|
return "registered"
|
|
|
|
|
|
def _execution_status(events: list[dict]) -> str:
|
|
types = {ev.get("event") for ev in events}
|
|
if "onboarding.failed" in types:
|
|
return "failed"
|
|
if "onboarding.completed" in types:
|
|
return "completed"
|
|
if types & set(FUNNEL_EVENT_RANK):
|
|
return "in_progress"
|
|
if events:
|
|
return "in_progress"
|
|
return "registered"
|
|
|
|
|
|
def _tickets_for_domain(conn: sqlite3.Connection, domain: str) -> list[dict]:
|
|
dom = domain.lower().strip()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, subject, status, session_id, payload, created_at
|
|
FROM tickets ORDER BY id DESC LIMIT 500
|
|
"""
|
|
).fetchall()
|
|
out = []
|
|
for row in rows:
|
|
payload = _parse_payload(row["payload"])
|
|
if (payload.get("domain") or "").strip().lower() != dom:
|
|
continue
|
|
data = payload.get("data") or {}
|
|
out.append({
|
|
"ticket_id": row["id"],
|
|
"status": row["status"],
|
|
"subject": row["subject"],
|
|
"session_id": row["session_id"] or payload.get("session_id"),
|
|
"email": data.get("email") or payload.get("account_email"),
|
|
"crm_track": payload.get("crm_track"),
|
|
"created_at": row["created_at"],
|
|
})
|
|
return out
|
|
|
|
|
|
def _domain_webhook_events(conn: sqlite3.Connection, source: str | None, domain: str) -> list[dict]:
|
|
if not source:
|
|
return []
|
|
dom = domain.lower().strip()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT event_type, payload, created_at FROM webhook_events
|
|
WHERE source = ?
|
|
ORDER BY created_at ASC
|
|
""",
|
|
(source,),
|
|
).fetchall()
|
|
events = []
|
|
for row in rows:
|
|
payload = _parse_payload(row["payload"])
|
|
if (payload.get("domain") or "").strip().lower() != dom:
|
|
continue
|
|
data = payload.get("data") or {}
|
|
client_ip = _extract_client_ip(payload, data)
|
|
detail = data.get("step") or data.get("description") or data.get("agent")
|
|
if source == "wazuh" and not client_ip:
|
|
client_ip = data.get("agent_ip") or data.get("srcip")
|
|
events.append({
|
|
"event": row["event_type"],
|
|
"at": row["created_at"],
|
|
"session_id": payload.get("session_id"),
|
|
"email": data.get("email"),
|
|
"client_ip": client_ip,
|
|
"detail": detail,
|
|
})
|
|
return events
|
|
|
|
|
|
def _domain_detail(conn: sqlite3.Connection, tenant_id: int, domain_row: dict) -> dict:
|
|
domain = domain_row["domain"]
|
|
checks = get_checks(conn, tenant_id, domain)
|
|
score = aggregate_score(checks)
|
|
issues = [
|
|
{
|
|
"check_id": c["check_id"],
|
|
"label": c.get("label") or CHECK_LABELS.get(c["check_id"], c["check_id"]),
|
|
"status": c["status"],
|
|
"message": c.get("message"),
|
|
"checked_at": c.get("checked_at"),
|
|
"evidence": c.get("evidence") or {},
|
|
}
|
|
for c in checks
|
|
if c.get("status") in ("fail", "error", "warn")
|
|
]
|
|
source = TENANT_WEBHOOK_SOURCE.get(tenant_id)
|
|
timeline = _domain_webhook_events(conn, source, domain)
|
|
tickets = _tickets_for_domain(conn, domain)
|
|
ticket = tickets[0] if tickets else None
|
|
funnel_stage = _funnel_stage_from_events(timeline)
|
|
execution_status = _execution_status(timeline)
|
|
client_ips = sorted({ev["client_ip"] for ev in timeline if ev.get("client_ip")})
|
|
last_event = timeline[-1] if timeline else None
|
|
started_at = timeline[0]["at"] if timeline else domain_row.get("created_at")
|
|
return {
|
|
"domain": domain,
|
|
"source": domain_row.get("source"),
|
|
"registered_at": domain_row.get("created_at"),
|
|
"email": (last_event or {}).get("email") or (ticket or {}).get("email"),
|
|
"session_id": (last_event or {}).get("session_id") or (ticket or {}).get("session_id"),
|
|
"client_ip": client_ips[-1] if client_ips else None,
|
|
"client_ips": client_ips,
|
|
"funnel_stage": funnel_stage,
|
|
"funnel_stage_label": FUNNEL_STAGE_LABELS.get(funnel_stage, funnel_stage),
|
|
"execution_status": execution_status,
|
|
"last_event": (last_event or {}).get("event"),
|
|
"last_event_at": (last_event or {}).get("at"),
|
|
"started_at": started_at,
|
|
"audit_status": score["overall_status"],
|
|
"score": {
|
|
"pass": score["pass"],
|
|
"warn": score["warn"],
|
|
"fail": score["fail"] + score["error"],
|
|
"total": score["total"],
|
|
},
|
|
"issue_count": len(issues),
|
|
"issues": issues,
|
|
"ticket_id": (ticket or {}).get("ticket_id"),
|
|
"ticket_status": (ticket or {}).get("status"),
|
|
"tickets_count": len(tickets),
|
|
"timeline": timeline,
|
|
"last_audit_at": max((c["checked_at"] for c in checks), default=None),
|
|
}
|
|
|
|
|
|
def _apply_funnel_timing_to_domains(domain_details: list[dict]) -> None:
|
|
from app.funnel_timing import apply_module_timing
|
|
|
|
for domain in domain_details:
|
|
timeline = domain.get("timeline") or []
|
|
if not timeline:
|
|
continue
|
|
enriched, timing_meta = apply_module_timing(timeline)
|
|
domain["timeline"] = enriched
|
|
if timing_meta:
|
|
domain["timing"] = timing_meta
|
|
|
|
|
|
def tenant_details(conn: sqlite3.Connection, tenant_id: int) -> dict | None:
|
|
row = conn.execute("SELECT id, name, ip FROM tenants WHERE id = ?", (tenant_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
if tenant_id == 2:
|
|
from app.modules import store as module_store
|
|
|
|
if module_store.is_module_enabled("wazuh-soc"):
|
|
from app.wazuh_soc_store import wazuh_tenant_details
|
|
|
|
return wazuh_tenant_details(conn, tenant_id, row["name"], row["ip"])
|
|
domains = list_audit_domains(conn, tenant_id)
|
|
domain_details = [_domain_detail(conn, tenant_id, d) for d in domains]
|
|
_apply_funnel_timing_to_domains(domain_details)
|
|
summary = {
|
|
"domains_total": len(domain_details),
|
|
"in_progress": sum(1 for d in domain_details if d["execution_status"] == "in_progress"),
|
|
"completed": sum(1 for d in domain_details if d["execution_status"] == "completed"),
|
|
"failed": sum(1 for d in domain_details if d["execution_status"] == "failed"),
|
|
"registered": sum(1 for d in domain_details if d["execution_status"] == "registered"),
|
|
"with_issues": sum(1 for d in domain_details if d["issue_count"] > 0),
|
|
}
|
|
result = {
|
|
"tenant_id": tenant_id,
|
|
"name": row["name"],
|
|
"ip": row["ip"],
|
|
"generated_at": _now(),
|
|
"summary": summary,
|
|
"domains": domain_details,
|
|
}
|
|
if tenant_id == 1:
|
|
from app.modules import store as module_store
|
|
|
|
if module_store.is_module_enabled("wizard-security"):
|
|
from app import security_store
|
|
|
|
result["security"] = security_store.build_summary(conn, window_hours=24)
|
|
return result
|