Specs stay at repo root (cross-VM). Move deploy and code into logical projects with README per domain, updated manifest.yaml, and symlinks at legacy paths for VM122 backward compatibility.
136 lines
3.9 KiB
Python
136 lines
3.9 KiB
Python
"""Rotas segurança wizard — Spec 021."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app import auth, security_store
|
|
from app.permissions import can_read_audit_overview
|
|
|
|
router = APIRouter(prefix="/api/v1/security", tags=["wizard-security"])
|
|
|
|
|
|
class SecurityWebhookBody(BaseModel):
|
|
event: str = Field(..., min_length=3)
|
|
domain: str | None = None
|
|
session_id: str | None = None
|
|
data: dict | None = None
|
|
|
|
|
|
class SecurityAuditTestBody(BaseModel):
|
|
field: str = "domain"
|
|
value: str = Field(..., min_length=1)
|
|
|
|
|
|
def _require_security_reader(user: auth.DeskUser = Depends(auth.get_current_user)) -> auth.DeskUser:
|
|
if not can_read_audit_overview(user.role):
|
|
raise HTTPException(403, "permissão insuficiente")
|
|
return user
|
|
|
|
|
|
def _client_ip(request: Request) -> str | None:
|
|
forwarded = request.headers.get("x-forwarded-for")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
if request.client:
|
|
return request.client.host
|
|
return None
|
|
|
|
|
|
def _module_enabled() -> bool:
|
|
from app.modules import store as module_store
|
|
|
|
return module_store.is_module_enabled("wizard-security")
|
|
|
|
|
|
@router.post("/csp-report")
|
|
async def csp_report(request: Request):
|
|
if not _module_enabled():
|
|
return {"accepted": False, "reason": "module_disabled"}
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
body = {}
|
|
conn = auth.db()
|
|
try:
|
|
return security_store.ingest_csp_report(conn, body if isinstance(body, dict) else {}, _client_ip(request))
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@router.post("/webhook")
|
|
def security_webhook(
|
|
body: SecurityWebhookBody,
|
|
request: Request,
|
|
x_webhook_secret: str | None = Header(default=None),
|
|
):
|
|
from app.main import INTEGRATION_SECRETS, _verify_secret
|
|
|
|
if not _module_enabled():
|
|
return {"accepted": False, "reason": "module_disabled"}
|
|
_verify_secret("onboard", x_webhook_secret)
|
|
if not security_store.is_security_event(body.event):
|
|
raise HTTPException(400, "event must start with security.")
|
|
conn = auth.db()
|
|
try:
|
|
return security_store.ingest_event(
|
|
conn,
|
|
event=body.event,
|
|
session_id=body.session_id,
|
|
domain=body.domain,
|
|
data=body.data,
|
|
client_ip=_client_ip(request),
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@router.get("/summary")
|
|
def security_summary(
|
|
window_hours: int = Query(24, ge=1, le=168),
|
|
user: auth.DeskUser = Depends(_require_security_reader),
|
|
):
|
|
if not _module_enabled():
|
|
return {"enabled": False, "window_hours": window_hours, "total": 0}
|
|
conn = auth.db()
|
|
try:
|
|
return security_store.build_summary(conn, window_hours=window_hours)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@router.get("/events")
|
|
def security_events(
|
|
limit: int = Query(100, ge=1, le=500),
|
|
offset: int = Query(0, ge=0),
|
|
window_hours: int = Query(168, ge=1, le=720),
|
|
session_id: str = "",
|
|
user: auth.DeskUser = Depends(_require_security_reader),
|
|
):
|
|
if not _module_enabled():
|
|
return {"events": [], "total": 0, "enabled": False}
|
|
conn = auth.db()
|
|
try:
|
|
return security_store.list_events(
|
|
conn,
|
|
limit=limit,
|
|
offset=offset,
|
|
window_hours=window_hours,
|
|
session_id=session_id.strip() or None,
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@router.post("/audit-test")
|
|
def security_audit_test(
|
|
body: SecurityAuditTestBody,
|
|
user: auth.DeskUser = Depends(_require_security_reader),
|
|
):
|
|
"""Teste interno — simula heurística de input (sem gravar)."""
|
|
if user.role not in ("super_admin", "ops_lead"):
|
|
raise HTTPException(403, "apenas admin")
|
|
return security_store.audit_field_value(body.value, field=body.field)
|