"""SQLite store for email migration jobs — Spec 019 / 013.""" from __future__ import annotations import json from datetime import datetime, timezone from typing import Any def _now() -> str: return datetime.now(timezone.utc).isoformat() def init_schema(conn) -> None: conn.executescript( """ CREATE TABLE IF NOT EXISTS migration_jobs ( id INTEGER PRIMARY KEY, tenant_id INTEGER NOT NULL DEFAULT 1, ticket_id INTEGER, domain TEXT NOT NULL, phase TEXT NOT NULL DEFAULT 'discovered', migration_gate TEXT NOT NULL DEFAULT 'blocked', source_server_label TEXT, dest_imap_host TEXT, notes TEXT, approved_by TEXT, approved_at TEXT, dns_cutover_at TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS migration_mailboxes ( id INTEGER PRIMARY KEY, job_id INTEGER NOT NULL, email TEXT NOT NULL, source_type TEXT NOT NULL DEFAULT 'imap', source_host TEXT, source_user TEXT, credentials_ref TEXT, pst_path TEXT, folder_map_json TEXT, messages_source INTEGER NOT NULL DEFAULT 0, messages_dest INTEGER NOT NULL DEFAULT 0, bytes_source INTEGER NOT NULL DEFAULT 0, bytes_dest INTEGER NOT NULL DEFAULT 0, sync_percent REAL NOT NULL DEFAULT 0, last_error TEXT, status TEXT NOT NULL DEFAULT 'pending', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY (job_id) REFERENCES migration_jobs(id) ); CREATE TABLE IF NOT EXISTS migration_runs ( id INTEGER PRIMARY KEY, job_id INTEGER NOT NULL, mailbox_id INTEGER, run_type TEXT NOT NULL, tool TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'queued', exit_code INTEGER, log_path TEXT, stats_json TEXT, started_at TEXT NOT NULL, finished_at TEXT, triggered_by TEXT, FOREIGN KEY (job_id) REFERENCES migration_jobs(id) ); CREATE TABLE IF NOT EXISTS migration_gate_checks ( id INTEGER PRIMARY KEY, job_id INTEGER NOT NULL, check_id TEXT NOT NULL, status TEXT NOT NULL, message TEXT, checked_at TEXT NOT NULL, FOREIGN KEY (job_id) REFERENCES migration_jobs(id) ); CREATE TABLE IF NOT EXISTS migration_credentials ( id TEXT PRIMARY KEY, mailbox_id INTEGER NOT NULL, secret_blob BLOB NOT NULL, created_at TEXT NOT NULL, expires_at TEXT ); CREATE INDEX IF NOT EXISTS idx_migration_jobs_domain ON migration_jobs(domain); CREATE INDEX IF NOT EXISTS idx_migration_mailboxes_job ON migration_mailboxes(job_id); """ ) def _job_dict(row) -> dict[str, Any]: return { "id": row["id"], "tenant_id": row["tenant_id"], "ticket_id": row["ticket_id"], "domain": row["domain"], "phase": row["phase"], "migration_gate": row["migration_gate"], "source_server_label": row["source_server_label"], "dest_imap_host": row["dest_imap_host"], "notes": row["notes"], "approved_by": row["approved_by"], "approved_at": row["approved_at"], "dns_cutover_at": row["dns_cutover_at"], "created_at": row["created_at"], "updated_at": row["updated_at"], } def _mailbox_dict(row) -> dict[str, Any]: return { "id": row["id"], "job_id": row["job_id"], "email": row["email"], "source_type": row["source_type"], "source_host": row["source_host"], "source_user": row["source_user"], "credentials_ref": row["credentials_ref"], "pst_path": row["pst_path"], "folder_map_json": row["folder_map_json"], "messages_source": row["messages_source"], "messages_dest": row["messages_dest"], "bytes_source": row["bytes_source"], "bytes_dest": row["bytes_dest"], "sync_percent": row["sync_percent"], "last_error": row["last_error"], "status": row["status"], "created_at": row["created_at"], "updated_at": row["updated_at"], } def _run_dict(row) -> dict[str, Any]: stats = {} if row["stats_json"]: try: stats = json.loads(row["stats_json"]) except json.JSONDecodeError: stats = {} return { "id": row["id"], "job_id": row["job_id"], "mailbox_id": row["mailbox_id"], "run_type": row["run_type"], "tool": row["tool"], "status": row["status"], "exit_code": row["exit_code"], "log_path": row["log_path"], "stats": stats, "started_at": row["started_at"], "finished_at": row["finished_at"], "triggered_by": row["triggered_by"], } def list_jobs(conn, *, domain: str | None = None, limit: int = 100) -> dict[str, Any]: limit = max(1, min(limit, 500)) if domain: rows = conn.execute( "SELECT * FROM migration_jobs WHERE domain = ? ORDER BY id DESC LIMIT ?", (domain.strip().lower(), limit), ).fetchall() total = conn.execute( "SELECT COUNT(*) FROM migration_jobs WHERE domain = ?", (domain.strip().lower(),), ).fetchone()[0] else: rows = conn.execute( "SELECT * FROM migration_jobs ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() total = conn.execute("SELECT COUNT(*) FROM migration_jobs").fetchone()[0] return {"jobs": [_job_dict(r) for r in rows], "total": total} def get_job(conn, job_id: int) -> dict[str, Any] | None: row = conn.execute("SELECT * FROM migration_jobs WHERE id = ?", (job_id,)).fetchone() if not row: return None job = _job_dict(row) mboxes = conn.execute( "SELECT * FROM migration_mailboxes WHERE job_id = ? ORDER BY id", (job_id,), ).fetchall() runs = conn.execute( "SELECT * FROM migration_runs WHERE job_id = ? ORDER BY id DESC LIMIT 20", (job_id,), ).fetchall() checks = conn.execute( "SELECT * FROM migration_gate_checks WHERE job_id = ? ORDER BY id DESC LIMIT 20", (job_id,), ).fetchall() job["mailboxes"] = [_mailbox_dict(m) for m in mboxes] job["runs"] = [_run_dict(r) for r in runs] job["gate_checks"] = [ { "id": c["id"], "check_id": c["check_id"], "status": c["status"], "message": c["message"], "checked_at": c["checked_at"], } for c in checks ] if job["mailboxes"]: avg = sum(m["sync_percent"] for m in job["mailboxes"]) / len(job["mailboxes"]) job["sync_percent_avg"] = round(avg, 2) else: job["sync_percent_avg"] = 0.0 return job def create_job( conn, *, domain: str, tenant_id: int = 1, ticket_id: int | None = None, source_server_label: str = "", dest_imap_host: str = "", notes: str = "", mailboxes: list[dict] | None = None, ) -> dict[str, Any]: now = _now() dom = domain.strip().lower() cur = conn.execute( """ INSERT INTO migration_jobs (tenant_id, ticket_id, domain, phase, migration_gate, source_server_label, dest_imap_host, notes, created_at, updated_at) VALUES (?, ?, ?, 'discovered', 'blocked', ?, ?, ?, ?, ?) """, (tenant_id, ticket_id, dom, source_server_label[:200], dest_imap_host[:200], notes[:2000], now, now), ) job_id = int(cur.lastrowid) for mb in mailboxes or []: email = (mb.get("email") or "").strip().lower() if not email: continue conn.execute( """ INSERT INTO migration_mailboxes (job_id, email, source_type, source_host, source_user, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, 'pending', ?, ?) """, ( job_id, email, mb.get("source_type") or "imap", (mb.get("source_host") or "")[:200] or None, (mb.get("source_user") or email)[:200] or None, now, now, ), ) conn.commit() return get_job(conn, job_id) or {} def update_job(conn, job_id: int, **fields) -> dict[str, Any] | None: allowed = { "phase", "migration_gate", "source_server_label", "dest_imap_host", "notes", "approved_by", "approved_at", "dns_cutover_at", "ticket_id", } sets = [] params: list[Any] = [] for key, val in fields.items(): if key in allowed: sets.append(f"{key} = ?") params.append(val) if not sets: return get_job(conn, job_id) sets.append("updated_at = ?") params.append(_now()) params.append(job_id) conn.execute(f"UPDATE migration_jobs SET {', '.join(sets)} WHERE id = ?", params) conn.commit() return get_job(conn, job_id) def add_run( conn, *, job_id: int, run_type: str, tool: str, triggered_by: str, mailbox_id: int | None = None, status: str = "running", stats: dict | None = None, exit_code: int | None = None, log_path: str | None = None, ) -> dict[str, Any]: now = _now() cur = conn.execute( """ INSERT INTO migration_runs (job_id, mailbox_id, run_type, tool, status, exit_code, log_path, stats_json, started_at, finished_at, triggered_by) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( job_id, mailbox_id, run_type, tool, status, exit_code, log_path, json.dumps(stats or {}), now, now if status != "running" else None, triggered_by, ), ) conn.commit() row = conn.execute("SELECT * FROM migration_runs WHERE id = ?", (int(cur.lastrowid),)).fetchone() return _run_dict(row) def finish_run(conn, run_id: int, *, status: str, exit_code: int | None = None, stats: dict | None = None) -> None: conn.execute( """ UPDATE migration_runs SET status = ?, exit_code = ?, stats_json = COALESCE(?, stats_json), finished_at = ? WHERE id = ? """, (status, exit_code, json.dumps(stats) if stats else None, _now(), run_id), ) conn.commit() def update_mailbox_sync( conn, mailbox_id: int, *, messages_source: int, messages_dest: int, sync_percent: float, status: str = "ok", last_error: str | None = None, ) -> None: conn.execute( """ UPDATE migration_mailboxes SET messages_source = ?, messages_dest = ?, sync_percent = ?, status = ?, last_error = ?, updated_at = ? WHERE id = ? """, (messages_source, messages_dest, sync_percent, status, last_error, _now(), mailbox_id), ) conn.commit() def add_gate_check(conn, job_id: int, check_id: str, status: str, message: str) -> None: conn.execute( """ INSERT INTO migration_gate_checks (job_id, check_id, status, message, checked_at) VALUES (?, ?, ?, ?, ?) """, (job_id, check_id, status, message[:500], _now()), ) conn.commit() def get_gate_for_domain(conn, domain: str) -> dict[str, Any]: dom = domain.strip().lower() row = conn.execute( """ SELECT * FROM migration_jobs WHERE domain = ? AND phase NOT IN ('closed', 'failed') ORDER BY id DESC LIMIT 1 """, (dom,), ).fetchone() if not row: return { "domain": dom, "gate": "ready_for_dns", "reason": "no_active_migration_job", "job_id": None, } job = _job_dict(row) return { "domain": dom, "gate": job["migration_gate"], "phase": job["phase"], "job_id": job["id"], "approved_by": job["approved_by"], "sync_percent_avg": get_job(conn, job["id"]).get("sync_percent_avg", 0) if job["id"] else 0, }