ligbox-ops-platform/projects/ops-desk/api/app/audit_store.py
Ligbox Spec Hub 821675ab4a Reorganize monorepo into projects/wizard, ops-desk, finance
Specs stay at repo root (cross-VM). Move deploy and code into logical
projects with README per domain, updated manifest.yaml, and symlinks at
legacy paths for VM122 backward compatibility.
2026-06-19 18:55:03 +00:00

508 lines
16 KiB
Python

"""SQLite persistence for audit domains and checks."""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime, timezone
from typing import Any
from app.collectors.base import CHECK_LABELS
ONBOARD_DOMAIN_EVENTS = frozenset({"account.created", "onboarding.completed"})
TENANT_ONBOARD = 1
TENANT_WEBHOOK_SOURCE = {
1: "vm112-onboard",
2: "wazuh",
}
FUNNEL_EVENT_RANK = {
"onboarding.started": 1,
"domain.validated": 2,
"dns.applied": 3,
"account.created": 4,
"infra.synced": 5,
"onboarding.completed": 6,
"company.validated": 7,
"webmail.released": 8,
"onboarding.failed": 99,
}
FUNNEL_STAGE_BY_RANK = {
1: "started",
2: "domain_validated",
3: "dns_applied",
4: "account_created",
5: "infra_synced",
6: "completed",
7: "company_validated",
8: "webmail_released",
99: "failed",
}
FUNNEL_STAGE_LABELS = {
"started": "Iniciado",
"domain_validated": "Domínio OK",
"dns_applied": "DNS aplicado",
"account_created": "Conta criada",
"infra_synced": "Infra sync",
"completed": "Concluído",
"company_validated": "Empresa validada",
"webmail_released": "Webmail liberado",
"failed": "Falhou",
"registered": "Registado",
"unknown": "Sem dados",
}
STATUS_RANK = {"pass": 0, "skip": 1, "warn": 2, "error": 3, "fail": 4}
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_payload(raw: str | None) -> dict:
if not raw:
return {}
try:
return json.loads(raw)
except json.JSONDecodeError:
return {}
def init_audit_schema(conn: sqlite3.Connection) -> None:
conn.executescript("""
CREATE TABLE IF NOT EXISTS audit_domains (
id INTEGER PRIMARY KEY,
tenant_id INTEGER NOT NULL,
domain TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'onboarding',
created_at TEXT NOT NULL,
UNIQUE(tenant_id, domain)
);
CREATE TABLE IF NOT EXISTS audit_checks (
id INTEGER PRIMARY KEY,
tenant_id INTEGER NOT NULL,
domain TEXT NOT NULL,
check_id TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
evidence TEXT,
checked_at TEXT NOT NULL,
UNIQUE(tenant_id, domain, check_id)
);
""")
def sync_domains_from_webhooks(conn: sqlite3.Connection) -> int:
rows = conn.execute(
"""
SELECT event_type, payload FROM webhook_events
WHERE source = 'vm112-onboard'
ORDER BY id DESC LIMIT 500
"""
).fetchall()
added = 0
now = _now()
seen: set[tuple[int, str]] = set()
for row in rows:
if row["event_type"] not in ONBOARD_DOMAIN_EVENTS:
continue
payload = _parse_payload(row["payload"])
domain = (payload.get("domain") or "").strip().lower()
if not domain or len(domain) < 3:
continue
key = (TENANT_ONBOARD, domain)
if key in seen:
continue
seen.add(key)
cur = conn.execute(
"""
INSERT OR IGNORE INTO audit_domains (tenant_id, domain, source, created_at)
VALUES (?, ?, 'onboarding', ?)
""",
(TENANT_ONBOARD, domain, now),
)
if cur.rowcount:
added += 1
conn.commit()
return added
def list_audit_domains(conn: sqlite3.Connection, tenant_id: int | None = None) -> list[dict]:
if tenant_id:
rows = conn.execute(
"SELECT tenant_id, domain, source, created_at FROM audit_domains WHERE tenant_id = ? ORDER BY domain",
(tenant_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT tenant_id, domain, source, created_at FROM audit_domains ORDER BY tenant_id, domain"
).fetchall()
return [dict(r) for r in rows]
def upsert_check(
conn: sqlite3.Connection,
tenant_id: int,
domain: str,
check_id: str,
status: str,
message: str,
evidence: dict | None,
checked_at: str | None = None,
) -> None:
conn.execute(
"""
INSERT INTO audit_checks (tenant_id, domain, check_id, status, message, evidence, checked_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tenant_id, domain, check_id) DO UPDATE SET
status = excluded.status,
message = excluded.message,
evidence = excluded.evidence,
checked_at = excluded.checked_at
""",
(
tenant_id,
domain.lower(),
check_id,
status,
message,
json.dumps(evidence or {}),
checked_at or _now(),
),
)
def get_checks(conn: sqlite3.Connection, tenant_id: int, domain: str) -> list[dict]:
rows = conn.execute(
"""
SELECT check_id, status, message, evidence, checked_at
FROM audit_checks WHERE tenant_id = ? AND domain = ?
ORDER BY check_id
""",
(tenant_id, domain.lower()),
).fetchall()
out = []
for row in rows:
item = dict(row)
item["label"] = CHECK_LABELS.get(item["check_id"], item["check_id"])
item["evidence"] = _parse_payload(item.get("evidence"))
out.append(item)
return out
def aggregate_score(checks: list[dict]) -> dict[str, Any]:
total = len(CHECK_LABELS)
counts = {"pass": 0, "warn": 0, "fail": 0, "error": 0, "skip": 0}
worst = "pass"
for c in checks:
st = c.get("status") or "skip"
counts[st] = counts.get(st, 0) + 1
if STATUS_RANK.get(st, 0) > STATUS_RANK.get(worst, 0):
worst = st
if worst in ("fail", "error"):
overall = "critical"
elif worst == "warn":
overall = "degraded"
elif checks:
overall = "healthy"
else:
overall = "unknown"
return {
"pass": counts.get("pass", 0),
"warn": counts.get("warn", 0),
"fail": counts.get("fail", 0),
"error": counts.get("error", 0),
"skip": counts.get("skip", 0),
"total": total,
"overall_status": overall,
}
def tenant_overview(conn: sqlite3.Connection, tenant_id: int, name: str, ip: str) -> dict:
if tenant_id == 2:
from app.modules import store as module_store
if module_store.is_module_enabled("wazuh-soc"):
from app.wazuh_soc_store import wazuh_tenant_overview
return wazuh_tenant_overview(conn, tenant_id, name, ip)
domains = list_audit_domains(conn, tenant_id)
if not domains:
return {
"tenant_id": tenant_id,
"name": name,
"ip": ip,
"status": "unknown",
"score": {"pass": 0, "warn": 0, "fail": 0, "total": 8},
"domains_count": 0,
"last_audit_at": None,
"top_issues": [],
}
all_checks: list[dict] = []
last_audit = None
top_issues: list[dict] = []
domain_scores: list[dict] = []
for d in domains:
checks = get_checks(conn, tenant_id, d["domain"])
if not checks:
continue
all_checks.extend(checks)
domain_scores.append(aggregate_score(checks))
for c in checks:
if c["checked_at"] and (not last_audit or c["checked_at"] > last_audit):
last_audit = c["checked_at"]
if c["status"] in ("fail", "error", "warn"):
top_issues.append({
"domain": d["domain"],
"check_id": c["check_id"],
"status": c["status"],
"message": c.get("message"),
})
if domain_scores:
worst = max(domain_scores, key=lambda s: STATUS_RANK.get(s["overall_status"], 0))
score = worst
else:
score = aggregate_score(all_checks)
return {
"tenant_id": tenant_id,
"name": name,
"ip": ip,
"status": score["overall_status"],
"score": {
"pass": score["pass"],
"warn": score["warn"],
"fail": score["fail"] + score["error"],
"total": score["total"],
},
"domains_count": len(domains),
"last_audit_at": last_audit,
"top_issues": top_issues[:5],
}
def build_overview(conn: sqlite3.Connection) -> dict:
tenants = conn.execute("SELECT id, name, ip FROM tenants ORDER BY id").fetchall()
return {
"generated_at": _now(),
"tenants": [tenant_overview(conn, t["id"], t["name"], t["ip"]) for t in tenants],
}
def scorecard(conn: sqlite3.Connection, tenant_id: int, domain: str) -> dict:
domain = domain.lower().strip()
checks = get_checks(conn, tenant_id, domain)
score = aggregate_score(checks)
return {
"tenant_id": tenant_id,
"domain": domain,
"checked_at": max((c["checked_at"] for c in checks), default=None),
"overall_status": score["overall_status"],
"checks": checks,
}
def _extract_client_ip(payload: dict, data: dict | None = None) -> str | None:
data = data or {}
for key in ("client_ip", "user_ip", "remote_ip", "srcip", "ip", "agent_ip"):
val = data.get(key) or payload.get(key)
if val:
return str(val)
ingress = payload.get("ingress_client_ip")
return str(ingress) if ingress else None
def _funnel_stage_from_events(events: list[dict]) -> str:
best_rank = 0
for ev in events:
rank = FUNNEL_EVENT_RANK.get(ev.get("event") or "", 0)
if rank > best_rank:
best_rank = rank
if best_rank:
return FUNNEL_STAGE_BY_RANK.get(best_rank, "unknown")
return "registered"
def _execution_status(events: list[dict]) -> str:
types = {ev.get("event") for ev in events}
if "onboarding.failed" in types:
return "failed"
if "onboarding.completed" in types:
return "completed"
if types & set(FUNNEL_EVENT_RANK):
return "in_progress"
if events:
return "in_progress"
return "registered"
def _tickets_for_domain(conn: sqlite3.Connection, domain: str) -> list[dict]:
dom = domain.lower().strip()
rows = conn.execute(
"""
SELECT id, subject, status, session_id, payload, created_at
FROM tickets ORDER BY id DESC LIMIT 500
"""
).fetchall()
out = []
for row in rows:
payload = _parse_payload(row["payload"])
if (payload.get("domain") or "").strip().lower() != dom:
continue
data = payload.get("data") or {}
out.append({
"ticket_id": row["id"],
"status": row["status"],
"subject": row["subject"],
"session_id": row["session_id"] or payload.get("session_id"),
"email": data.get("email") or payload.get("account_email"),
"crm_track": payload.get("crm_track"),
"created_at": row["created_at"],
})
return out
def _domain_webhook_events(conn: sqlite3.Connection, source: str | None, domain: str) -> list[dict]:
if not source:
return []
dom = domain.lower().strip()
rows = conn.execute(
"""
SELECT event_type, payload, created_at FROM webhook_events
WHERE source = ?
ORDER BY created_at ASC
""",
(source,),
).fetchall()
events = []
for row in rows:
payload = _parse_payload(row["payload"])
if (payload.get("domain") or "").strip().lower() != dom:
continue
data = payload.get("data") or {}
client_ip = _extract_client_ip(payload, data)
detail = data.get("step") or data.get("description") or data.get("agent")
if source == "wazuh" and not client_ip:
client_ip = data.get("agent_ip") or data.get("srcip")
events.append({
"event": row["event_type"],
"at": row["created_at"],
"session_id": payload.get("session_id"),
"email": data.get("email"),
"client_ip": client_ip,
"detail": detail,
})
return events
def _domain_detail(conn: sqlite3.Connection, tenant_id: int, domain_row: dict) -> dict:
domain = domain_row["domain"]
checks = get_checks(conn, tenant_id, domain)
score = aggregate_score(checks)
issues = [
{
"check_id": c["check_id"],
"label": c.get("label") or CHECK_LABELS.get(c["check_id"], c["check_id"]),
"status": c["status"],
"message": c.get("message"),
"checked_at": c.get("checked_at"),
"evidence": c.get("evidence") or {},
}
for c in checks
if c.get("status") in ("fail", "error", "warn")
]
source = TENANT_WEBHOOK_SOURCE.get(tenant_id)
timeline = _domain_webhook_events(conn, source, domain)
tickets = _tickets_for_domain(conn, domain)
ticket = tickets[0] if tickets else None
funnel_stage = _funnel_stage_from_events(timeline)
execution_status = _execution_status(timeline)
client_ips = sorted({ev["client_ip"] for ev in timeline if ev.get("client_ip")})
last_event = timeline[-1] if timeline else None
started_at = timeline[0]["at"] if timeline else domain_row.get("created_at")
return {
"domain": domain,
"source": domain_row.get("source"),
"registered_at": domain_row.get("created_at"),
"email": (last_event or {}).get("email") or (ticket or {}).get("email"),
"session_id": (last_event or {}).get("session_id") or (ticket or {}).get("session_id"),
"client_ip": client_ips[-1] if client_ips else None,
"client_ips": client_ips,
"funnel_stage": funnel_stage,
"funnel_stage_label": FUNNEL_STAGE_LABELS.get(funnel_stage, funnel_stage),
"execution_status": execution_status,
"last_event": (last_event or {}).get("event"),
"last_event_at": (last_event or {}).get("at"),
"started_at": started_at,
"audit_status": score["overall_status"],
"score": {
"pass": score["pass"],
"warn": score["warn"],
"fail": score["fail"] + score["error"],
"total": score["total"],
},
"issue_count": len(issues),
"issues": issues,
"ticket_id": (ticket or {}).get("ticket_id"),
"ticket_status": (ticket or {}).get("status"),
"tickets_count": len(tickets),
"timeline": timeline,
"last_audit_at": max((c["checked_at"] for c in checks), default=None),
}
def _apply_funnel_timing_to_domains(domain_details: list[dict]) -> None:
from app.funnel_timing import apply_module_timing
for domain in domain_details:
timeline = domain.get("timeline") or []
if not timeline:
continue
enriched, timing_meta = apply_module_timing(timeline)
domain["timeline"] = enriched
if timing_meta:
domain["timing"] = timing_meta
def tenant_details(conn: sqlite3.Connection, tenant_id: int) -> dict | None:
row = conn.execute("SELECT id, name, ip FROM tenants WHERE id = ?", (tenant_id,)).fetchone()
if not row:
return None
if tenant_id == 2:
from app.modules import store as module_store
if module_store.is_module_enabled("wazuh-soc"):
from app.wazuh_soc_store import wazuh_tenant_details
return wazuh_tenant_details(conn, tenant_id, row["name"], row["ip"])
domains = list_audit_domains(conn, tenant_id)
domain_details = [_domain_detail(conn, tenant_id, d) for d in domains]
_apply_funnel_timing_to_domains(domain_details)
summary = {
"domains_total": len(domain_details),
"in_progress": sum(1 for d in domain_details if d["execution_status"] == "in_progress"),
"completed": sum(1 for d in domain_details if d["execution_status"] == "completed"),
"failed": sum(1 for d in domain_details if d["execution_status"] == "failed"),
"registered": sum(1 for d in domain_details if d["execution_status"] == "registered"),
"with_issues": sum(1 for d in domain_details if d["issue_count"] > 0),
}
result = {
"tenant_id": tenant_id,
"name": row["name"],
"ip": row["ip"],
"generated_at": _now(),
"summary": summary,
"domains": domain_details,
}
if tenant_id == 1:
from app.modules import store as module_store
if module_store.is_module_enabled("wizard-security"):
from app import security_store
result["security"] = security_store.build_summary(conn, window_hours=24)
return result