Serialize agentic ticks and retry SQLite writes under concurrent load.

Redis lock prevents overlapping worker ticks; auth login retries on DB locked.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Ligbox Spec Hub 2026-06-19 23:36:41 +00:00
parent 33db764c74
commit d066586023
4 changed files with 29 additions and 13 deletions

View file

@ -14,6 +14,7 @@ from typing import Any
from fastapi import Depends, Header, HTTPException, Request from fastapi import Depends, Header, HTTPException, Request
from jose import JWTError, jwt from jose import JWTError, jwt
import bcrypt import bcrypt
import time
from app.totp_util import verify_code as verify_totp_code from app.totp_util import verify_code as verify_totp_code
@ -55,7 +56,7 @@ def db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, timeout=30.0) conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000") conn.execute("PRAGMA busy_timeout=60000")
return conn return conn
@ -188,12 +189,19 @@ def check_credentials(username: str, password: str) -> tuple[DeskUser | None, sq
def touch_last_login(username: str) -> None: def touch_last_login(username: str) -> None:
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
with db() as conn: for attempt in range(8):
conn.execute( try:
"UPDATE desk_users SET last_login_at = ?, updated_at = ? WHERE username = ?", with db() as conn:
(now, now, username), conn.execute(
) "UPDATE desk_users SET last_login_at = ?, updated_at = ? WHERE username = ?",
conn.commit() (now, now, username),
)
conn.commit()
return
except sqlite3.OperationalError as exc:
if "locked" not in str(exc).lower() or attempt >= 7:
raise
time.sleep(0.25 * (attempt + 1))
def authenticate_user(username: str, password: str) -> DeskUser | None: def authenticate_user(username: str, password: str) -> DeskUser | None:

View file

@ -145,7 +145,7 @@ def db():
conn = sqlite3.connect(DB_PATH, timeout=30.0) conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000") conn.execute("PRAGMA busy_timeout=60000")
return conn return conn
@ -190,7 +190,7 @@ def init_db():
init_purge_auth_schema(conn) init_purge_auth_schema(conn)
init_agent_schema(conn) init_agent_schema(conn)
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000") conn.execute("PRAGMA busy_timeout=60000")
conn.commit() conn.commit()

View file

@ -31,7 +31,7 @@ services:
environment: environment:
OPS_API_URL: http://api-staging:8080 OPS_API_URL: http://api-staging:8080
REDIS_URL: redis://redis-staging:6379/0 REDIS_URL: redis://redis-staging:6379/0
AGENTIC_INTERVAL_SEC: "300" AGENTIC_INTERVAL_SEC: "600"
depends_on: [redis-staging, api-staging] depends_on: [redis-staging, api-staging]
networks: [agentic-staging] networks: [agentic-staging]
frontend-staging: frontend-staging:

View file

@ -41,12 +41,17 @@ def poll_vm112() -> None:
print(f"[worker] vm112 ERROR: {exc}", flush=True) print(f"[worker] vm112 ERROR: {exc}", flush=True)
def agentic_tick() -> None: def agentic_tick(redis_client=None) -> None:
"""Spec 029 — run all agent scenarios (T0 checks + T1 advisor).""" """Spec 029 — run all agent scenarios (T0 checks + T1 advisor)."""
if not OPS_INTERNAL_TOKEN: if not OPS_INTERNAL_TOKEN:
return return
lock_key = "ops:agentic:tick:lock"
if redis_client is not None:
if not redis_client.set(lock_key, "1", nx=True, ex=900):
print("[worker] agentic tick skipped (lock held)", flush=True)
return
try: try:
with httpx.Client(timeout=180.0) as client: with httpx.Client(timeout=600.0) as client:
response = client.post( response = client.post(
f"{OPS_API_URL}/api/v1/agents/internal/tick", f"{OPS_API_URL}/api/v1/agents/internal/tick",
headers={"X-Ops-Internal-Token": OPS_INTERNAL_TOKEN}, headers={"X-Ops-Internal-Token": OPS_INTERNAL_TOKEN},
@ -54,6 +59,9 @@ def agentic_tick() -> None:
print(f"[worker] agentic tick {response.status_code}: {response.text[:200]}", flush=True) print(f"[worker] agentic tick {response.status_code}: {response.text[:200]}", flush=True)
except Exception as exc: except Exception as exc:
print(f"[worker] agentic tick ERROR: {exc}", flush=True) print(f"[worker] agentic tick ERROR: {exc}", flush=True)
finally:
if redis_client is not None:
redis_client.delete(lock_key)
def check_integration_gap() -> None: def check_integration_gap() -> None:
@ -114,7 +122,7 @@ def main() -> None:
check_integration_gap() check_integration_gap()
last_lead_sync = now last_lead_sync = now
if now - last_agentic >= AGENTIC_INTERVAL_SEC: if now - last_agentic >= AGENTIC_INTERVAL_SEC:
agentic_tick() agentic_tick(redis_client)
last_agentic = now last_agentic = now
time.sleep(WORKER_INTERVAL) time.sleep(WORKER_INTERVAL)