From d066586023b26d3eccb010fc2b75fcf8ca5d02a3 Mon Sep 17 00:00:00 2001 From: Ligbox Spec Hub Date: Fri, 19 Jun 2026 23:36:41 +0000 Subject: [PATCH] Serialize agentic ticks and retry SQLite writes under concurrent load. Redis lock prevents overlapping worker ticks; auth login retries on DB locked. Co-authored-by: Cursor --- projects/ops-desk/api/app/auth.py | 22 +++++++++++++------ projects/ops-desk/api/app/main.py | 4 ++-- .../docker-compose.agentic-staging.yml | 2 +- projects/ops-desk/worker/worker.py | 14 +++++++++--- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/projects/ops-desk/api/app/auth.py b/projects/ops-desk/api/app/auth.py index 2255214..7543928 100644 --- a/projects/ops-desk/api/app/auth.py +++ b/projects/ops-desk/api/app/auth.py @@ -14,6 +14,7 @@ from typing import Any from fastapi import Depends, Header, HTTPException, Request from jose import JWTError, jwt import bcrypt +import time from app.totp_util import verify_code as verify_totp_code @@ -55,7 +56,7 @@ def db() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH, timeout=30.0) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA busy_timeout=30000") + conn.execute("PRAGMA busy_timeout=60000") return conn @@ -188,12 +189,19 @@ def check_credentials(username: str, password: str) -> tuple[DeskUser | None, sq def touch_last_login(username: str) -> None: now = datetime.now(timezone.utc).isoformat() - with db() as conn: - conn.execute( - "UPDATE desk_users SET last_login_at = ?, updated_at = ? WHERE username = ?", - (now, now, username), - ) - conn.commit() + for attempt in range(8): + try: + with db() as conn: + conn.execute( + "UPDATE desk_users SET last_login_at = ?, updated_at = ? WHERE username = ?", + (now, now, username), + ) + conn.commit() + return + except sqlite3.OperationalError as exc: + if "locked" not in str(exc).lower() or attempt >= 7: + raise + time.sleep(0.25 * (attempt + 1)) def authenticate_user(username: str, password: str) -> DeskUser | None: diff --git a/projects/ops-desk/api/app/main.py b/projects/ops-desk/api/app/main.py index 836e037..e64988d 100644 --- a/projects/ops-desk/api/app/main.py +++ b/projects/ops-desk/api/app/main.py @@ -145,7 +145,7 @@ def db(): conn = sqlite3.connect(DB_PATH, timeout=30.0) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA busy_timeout=30000") + conn.execute("PRAGMA busy_timeout=60000") return conn @@ -190,7 +190,7 @@ def init_db(): init_purge_auth_schema(conn) init_agent_schema(conn) conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA busy_timeout=30000") + conn.execute("PRAGMA busy_timeout=60000") conn.commit() diff --git a/projects/ops-desk/docker-compose.agentic-staging.yml b/projects/ops-desk/docker-compose.agentic-staging.yml index 0be7c50..174d307 100644 --- a/projects/ops-desk/docker-compose.agentic-staging.yml +++ b/projects/ops-desk/docker-compose.agentic-staging.yml @@ -31,7 +31,7 @@ services: environment: OPS_API_URL: http://api-staging:8080 REDIS_URL: redis://redis-staging:6379/0 - AGENTIC_INTERVAL_SEC: "300" + AGENTIC_INTERVAL_SEC: "600" depends_on: [redis-staging, api-staging] networks: [agentic-staging] frontend-staging: diff --git a/projects/ops-desk/worker/worker.py b/projects/ops-desk/worker/worker.py index 289b41a..d940e51 100644 --- a/projects/ops-desk/worker/worker.py +++ b/projects/ops-desk/worker/worker.py @@ -41,12 +41,17 @@ def poll_vm112() -> None: print(f"[worker] vm112 ERROR: {exc}", flush=True) -def agentic_tick() -> None: +def agentic_tick(redis_client=None) -> None: """Spec 029 — run all agent scenarios (T0 checks + T1 advisor).""" if not OPS_INTERNAL_TOKEN: return + lock_key = "ops:agentic:tick:lock" + if redis_client is not None: + if not redis_client.set(lock_key, "1", nx=True, ex=900): + print("[worker] agentic tick skipped (lock held)", flush=True) + return try: - with httpx.Client(timeout=180.0) as client: + with httpx.Client(timeout=600.0) as client: response = client.post( f"{OPS_API_URL}/api/v1/agents/internal/tick", headers={"X-Ops-Internal-Token": OPS_INTERNAL_TOKEN}, @@ -54,6 +59,9 @@ def agentic_tick() -> None: print(f"[worker] agentic tick {response.status_code}: {response.text[:200]}", flush=True) except Exception as exc: print(f"[worker] agentic tick ERROR: {exc}", flush=True) + finally: + if redis_client is not None: + redis_client.delete(lock_key) def check_integration_gap() -> None: @@ -114,7 +122,7 @@ def main() -> None: check_integration_gap() last_lead_sync = now if now - last_agentic >= AGENTIC_INTERVAL_SEC: - agentic_tick() + agentic_tick(redis_client) last_agentic = now time.sleep(WORKER_INTERVAL)