"""Encrypted credential vault — Spec 019.""" from __future__ import annotations import json import os import uuid from datetime import datetime, timezone from typing import Any from cryptography.fernet import Fernet, InvalidToken _KEY_ENV = "MIGRATION_CREDENTIALS_KEY" def _fernet() -> Fernet: raw = os.getenv(_KEY_ENV, "").strip() if not raw: raw = Fernet.generate_key().decode() if len(raw) != 44: raw = Fernet.generate_key().decode() return Fernet(raw.encode() if isinstance(raw, str) else raw) def store_secret(conn, mailbox_id: int, secret: dict[str, Any]) -> str: cred_id = str(uuid.uuid4()) blob = _fernet().encrypt(json.dumps(secret).encode()) now = datetime.now(timezone.utc).isoformat() conn.execute( """ INSERT INTO migration_credentials (id, mailbox_id, secret_blob, created_at) VALUES (?, ?, ?, ?) """, (cred_id, mailbox_id, blob, now), ) conn.execute( "UPDATE migration_mailboxes SET credentials_ref = ? WHERE id = ?", (cred_id, mailbox_id), ) conn.commit() return cred_id def load_secret(conn, cred_id: str) -> dict[str, Any] | None: row = conn.execute( "SELECT secret_blob FROM migration_credentials WHERE id = ?", (cred_id,), ).fetchone() if not row: return None try: return json.loads(_fernet().decrypt(bytes(row["secret_blob"])).decode()) except (InvalidToken, json.JSONDecodeError): return None