ligbox-ops-platform/projects/ops-desk/api/tests/test_agents_030.py
Ligbox Spec Hub fd491e5859 Implement Spec 030 Agentic Ops Mission Board (UI-A/B/C).
Add agent_incidents dedup, overview/incidents/timeline API, mission board UI with fleet rail, kanban, context panel, mobile tabs, poll and keyboard shortcuts.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-20 06:49:38 +00:00

78 lines
3.1 KiB
Python

"""Tests Agentic Ops UI — Spec 030."""
from __future__ import annotations
import sqlite3
from app.agents import store
def _mem_conn():
conn = sqlite3.connect(":memory:", check_same_thread=False)
conn.row_factory = sqlite3.Row
store.init_agent_schema(conn)
store.sync_registry = lambda c: None # noqa: patch below
for sc in [
{"id": "desk.api.health", "title": "Desk", "severity_default": "warn"},
{"id": "vm123.finance.stack", "title": "FOSS", "severity_default": "high"},
]:
store.upsert_scenario(conn, sc)
conn.commit()
return conn
def test_agent_incidents_table():
conn = sqlite3.connect(":memory:")
store.init_agent_schema(conn)
tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'")}
assert "agent_incidents" in tables
def test_upsert_incident_deduplicates_scenario():
conn = sqlite3.connect(":memory:", check_same_thread=False)
conn.row_factory = sqlite3.Row
store.init_agent_schema(conn)
store.upsert_scenario(conn, {"id": "test.scenario", "title": "Test", "severity_default": "warn"})
run_id = store.create_run(conn, "test.scenario", "test")
f1 = store.add_finding(conn, run_id, severity="warn", category="api", title="Alert A", human_action="fix A")
inc1, n1 = store.upsert_incident(
conn, scenario_id="test.scenario", finding_id=f1, title="Alert A",
severity="warn", primary_agent="sentinel", suggested_human_action="fix A",
)
f2 = store.add_finding(conn, run_id, severity="warn", category="api", title="Alert A", human_action="fix A")
inc2, n2 = store.upsert_incident(
conn, scenario_id="test.scenario", finding_id=f2, title="Alert A",
severity="warn", primary_agent="sentinel", suggested_human_action="fix A",
)
assert inc1["id"] == inc2["id"]
assert inc2["occurrence_count"] == 2
assert n1 is True
assert n2 is False
assert len(store.list_incidents(conn, status="open")) == 1
def test_ack_incident_closes_open():
conn = sqlite3.connect(":memory:", check_same_thread=False)
conn.row_factory = sqlite3.Row
store.init_agent_schema(conn)
store.upsert_scenario(conn, {"id": "x.scenario", "title": "X", "severity_default": "high"})
run_id = store.create_run(conn, "x.scenario", "test")
fid = store.add_finding(conn, run_id, severity="high", category="api", title="Down", human_action="check")
store.upsert_incident(
conn, scenario_id="x.scenario", finding_id=fid, title="Down",
severity="high", primary_agent="sentinel", suggested_human_action="check",
)
inc = store.list_incidents(conn)[0]
store.ack_incident(conn, inc["id"], "root")
conn.commit()
assert store.list_incidents(conn, status="open") == []
def test_overview_structure():
conn = sqlite3.connect(":memory:", check_same_thread=False)
conn.row_factory = sqlite3.Row
store.init_agent_schema(conn)
store.upsert_scenario(conn, {"id": "desk.api.health", "title": "Desk", "severity_default": "warn"})
ov = store.get_overview(conn)
assert "tier" in ov
assert "incidents_open" in ov
assert "scenarios_total" in ov