obsidian-vault/ligbox-ops-platform/worker/audit_runner.py
2026-06-19 17:26:42 +00:00

27 lines
815 B
Python

"""Periodic audit cycle via Ops API."""
from __future__ import annotations
import os
import time
import httpx
API_URL = os.getenv("OPS_API_URL", "http://api:8080")
AUDIT_INTERVAL_SEC = int(os.getenv("AUDIT_INTERVAL_SEC", "600"))
OPS_INTERNAL_TOKEN = os.getenv("OPS_INTERNAL_TOKEN", "")
def run_cycle() -> None:
try:
headers = {}
if OPS_INTERNAL_TOKEN:
headers["X-Ops-Internal-Token"] = OPS_INTERNAL_TOKEN
with httpx.Client(timeout=120.0) as client:
response = client.post(
f"{API_URL.rstrip('/')}/api/v1/audit/cycle",
headers=headers,
)
print(f"[audit] cycle {response.status_code}: {response.text[:300]}", flush=True)
except Exception as exc:
print(f"[audit] cycle ERROR: {exc}", flush=True)