ligbox-ops-platform/api/app/security_routes.py
Ligbox Spec Hub 3a2c64834b Initial import: ligbox-ops-platform + specs + LAPTOP + obsidian merge (CT130)
Source: VM122 /opt + obsidian-infra + LAPTOP
Hub: CT130 spec-hub 10.10.10.130
2026-06-19 17:26:41 +00:00

136 lines
3.9 KiB
Python

"""Rotas segurança wizard — Spec 021."""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request
from pydantic import BaseModel, Field
from app import auth, security_store
from app.permissions import can_read_audit_overview
router = APIRouter(prefix="/api/v1/security", tags=["wizard-security"])
class SecurityWebhookBody(BaseModel):
event: str = Field(..., min_length=3)
domain: str | None = None
session_id: str | None = None
data: dict | None = None
class SecurityAuditTestBody(BaseModel):
field: str = "domain"
value: str = Field(..., min_length=1)
def _require_security_reader(user: auth.DeskUser = Depends(auth.get_current_user)) -> auth.DeskUser:
if not can_read_audit_overview(user.role):
raise HTTPException(403, "permissão insuficiente")
return user
def _client_ip(request: Request) -> str | None:
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
def _module_enabled() -> bool:
from app.modules import store as module_store
return module_store.is_module_enabled("wizard-security")
@router.post("/csp-report")
async def csp_report(request: Request):
if not _module_enabled():
return {"accepted": False, "reason": "module_disabled"}
try:
body = await request.json()
except Exception:
body = {}
conn = auth.db()
try:
return security_store.ingest_csp_report(conn, body if isinstance(body, dict) else {}, _client_ip(request))
finally:
conn.close()
@router.post("/webhook")
def security_webhook(
body: SecurityWebhookBody,
request: Request,
x_webhook_secret: str | None = Header(default=None),
):
from app.main import INTEGRATION_SECRETS, _verify_secret
if not _module_enabled():
return {"accepted": False, "reason": "module_disabled"}
_verify_secret("onboard", x_webhook_secret)
if not security_store.is_security_event(body.event):
raise HTTPException(400, "event must start with security.")
conn = auth.db()
try:
return security_store.ingest_event(
conn,
event=body.event,
session_id=body.session_id,
domain=body.domain,
data=body.data,
client_ip=_client_ip(request),
)
finally:
conn.close()
@router.get("/summary")
def security_summary(
window_hours: int = Query(24, ge=1, le=168),
user: auth.DeskUser = Depends(_require_security_reader),
):
if not _module_enabled():
return {"enabled": False, "window_hours": window_hours, "total": 0}
conn = auth.db()
try:
return security_store.build_summary(conn, window_hours=window_hours)
finally:
conn.close()
@router.get("/events")
def security_events(
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
window_hours: int = Query(168, ge=1, le=720),
session_id: str = "",
user: auth.DeskUser = Depends(_require_security_reader),
):
if not _module_enabled():
return {"events": [], "total": 0, "enabled": False}
conn = auth.db()
try:
return security_store.list_events(
conn,
limit=limit,
offset=offset,
window_hours=window_hours,
session_id=session_id.strip() or None,
)
finally:
conn.close()
@router.post("/audit-test")
def security_audit_test(
body: SecurityAuditTestBody,
user: auth.DeskUser = Depends(_require_security_reader),
):
"""Teste interno — simula heurística de input (sem gravar)."""
if user.role not in ("super_admin", "ops_lead"):
raise HTTPException(403, "apenas admin")
return security_store.audit_field_value(body.value, field=body.field)