Specs stay at repo root (cross-VM). Move deploy and code into logical projects with README per domain, updated manifest.yaml, and symlinks at legacy paths for VM122 backward compatibility.
272 lines
8.5 KiB
Python
272 lines
8.5 KiB
Python
"""Billing accounts store — Spec 023."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
FOSSBILLING_URL = "https://financeiro.ligbox.com.br"
|
|
ODOO_URL = "https://financeiro.ligbox.com.br/odoo/web/login?db=ligbox"
|
|
|
|
TAX_ID_RE = re.compile(r"\d")
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def init_schema(conn) -> None:
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS billing_accounts (
|
|
id INTEGER PRIMARY KEY,
|
|
domain TEXT NOT NULL,
|
|
session_id TEXT,
|
|
ticket_id INTEGER,
|
|
tax_id TEXT,
|
|
legal_name TEXT,
|
|
trade_name TEXT,
|
|
email_billing TEXT,
|
|
company_profile_json TEXT,
|
|
billing_state TEXT NOT NULL DEFAULT 'awaiting_billing_validation',
|
|
recurrence_active INTEGER NOT NULL DEFAULT 0,
|
|
external_customer_id TEXT,
|
|
external_subscription_id TEXT,
|
|
payment_provider TEXT,
|
|
plan_code TEXT,
|
|
activated_at TEXT,
|
|
activated_by TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_billing_domain ON billing_accounts(domain);
|
|
"""
|
|
)
|
|
|
|
|
|
def _mask_tax_id(tax_id: str | None) -> str:
|
|
if not tax_id:
|
|
return "—"
|
|
digits = TAX_ID_RE.sub("", tax_id)
|
|
if len(digits) < 4:
|
|
return "***"
|
|
return f"{'*' * (len(digits) - 4)}{digits[-4:]}"
|
|
|
|
|
|
def _mask_email(email: str | None) -> str:
|
|
if not email or "@" not in email:
|
|
return "—"
|
|
local, dom = email.split("@", 1)
|
|
if len(local) <= 2:
|
|
return f"**@{dom}"
|
|
return f"{local[:2]}***@{dom}"
|
|
|
|
|
|
def _row_dict(row, *, mask: bool = False) -> dict[str, Any]:
|
|
profile = {}
|
|
if row["company_profile_json"]:
|
|
try:
|
|
profile = json.loads(row["company_profile_json"])
|
|
except json.JSONDecodeError:
|
|
profile = {}
|
|
out = {
|
|
"id": row["id"],
|
|
"domain": row["domain"],
|
|
"session_id": row["session_id"],
|
|
"ticket_id": row["ticket_id"],
|
|
"tax_id": _mask_tax_id(row["tax_id"]) if mask else row["tax_id"],
|
|
"legal_name": row["legal_name"],
|
|
"trade_name": row["trade_name"],
|
|
"email_billing": _mask_email(row["email_billing"]) if mask else row["email_billing"],
|
|
"company_profile": profile if not mask else _mask_profile(profile),
|
|
"billing_state": row["billing_state"],
|
|
"recurrence_active": bool(row["recurrence_active"]),
|
|
"external_customer_id": row["external_customer_id"],
|
|
"external_subscription_id": row["external_subscription_id"],
|
|
"payment_provider": row["payment_provider"],
|
|
"plan_code": row["plan_code"],
|
|
"activated_at": row["activated_at"],
|
|
"activated_by": row["activated_by"],
|
|
"created_at": row["created_at"],
|
|
"updated_at": row["updated_at"],
|
|
"links": {
|
|
"fossbilling": FOSSBILLING_URL,
|
|
"odoo": ODOO_URL,
|
|
},
|
|
}
|
|
return out
|
|
|
|
|
|
def _mask_profile(profile: dict) -> dict:
|
|
p = dict(profile)
|
|
if p.get("tax_id"):
|
|
p["tax_id"] = _mask_tax_id(str(p["tax_id"]))
|
|
if p.get("email_billing"):
|
|
p["email_billing"] = _mask_email(str(p["email_billing"]))
|
|
return p
|
|
|
|
|
|
def upsert_from_company_validated(
|
|
conn,
|
|
*,
|
|
domain: str,
|
|
session_id: str | None,
|
|
ticket_id: int | None,
|
|
data: dict | None,
|
|
) -> dict[str, Any]:
|
|
dom = domain.strip().lower()
|
|
profile = (data or {}).get("company_profile") or {}
|
|
billing_state = (data or {}).get("billing_state") or "awaiting_billing_validation"
|
|
now = _now()
|
|
existing = conn.execute(
|
|
"SELECT id FROM billing_accounts WHERE domain = ?",
|
|
(dom,),
|
|
).fetchone()
|
|
if existing:
|
|
conn.execute(
|
|
"""
|
|
UPDATE billing_accounts SET
|
|
session_id = COALESCE(?, session_id),
|
|
ticket_id = COALESCE(?, ticket_id),
|
|
tax_id = ?, legal_name = ?, trade_name = ?, email_billing = ?,
|
|
company_profile_json = ?, billing_state = ?, updated_at = ?
|
|
WHERE domain = ?
|
|
""",
|
|
(
|
|
session_id,
|
|
ticket_id,
|
|
profile.get("tax_id"),
|
|
profile.get("legal_name"),
|
|
profile.get("trade_name"),
|
|
profile.get("email_billing"),
|
|
json.dumps(profile, ensure_ascii=False),
|
|
billing_state,
|
|
now,
|
|
dom,
|
|
),
|
|
)
|
|
acc_id = int(existing["id"])
|
|
else:
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO billing_accounts
|
|
(domain, session_id, ticket_id, tax_id, legal_name, trade_name, email_billing,
|
|
company_profile_json, billing_state, recurrence_active, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?)
|
|
""",
|
|
(
|
|
dom,
|
|
session_id,
|
|
ticket_id,
|
|
profile.get("tax_id"),
|
|
profile.get("legal_name"),
|
|
profile.get("trade_name"),
|
|
profile.get("email_billing"),
|
|
json.dumps(profile, ensure_ascii=False),
|
|
billing_state,
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
acc_id = int(cur.lastrowid)
|
|
conn.commit()
|
|
return get_account(conn, acc_id) or {}
|
|
|
|
|
|
def get_account(conn, account_id: int, *, mask: bool = False) -> dict[str, Any] | None:
|
|
row = conn.execute("SELECT * FROM billing_accounts WHERE id = ?", (account_id,)).fetchone()
|
|
return _row_dict(row, mask=mask) if row else None
|
|
|
|
|
|
def get_by_domain(conn, domain: str, *, mask: bool = False) -> dict[str, Any] | None:
|
|
row = conn.execute(
|
|
"SELECT * FROM billing_accounts WHERE domain = ?",
|
|
(domain.strip().lower(),),
|
|
).fetchone()
|
|
return _row_dict(row, mask=mask) if row else None
|
|
|
|
|
|
def list_accounts(
|
|
conn,
|
|
*,
|
|
billing_state: str | None = None,
|
|
domain: str | None = None,
|
|
limit: int = 100,
|
|
mask: bool = False,
|
|
) -> dict[str, Any]:
|
|
limit = max(1, min(limit, 500))
|
|
clauses = []
|
|
params: list[Any] = []
|
|
if billing_state:
|
|
clauses.append("billing_state = ?")
|
|
params.append(billing_state)
|
|
if domain:
|
|
clauses.append("domain LIKE ?")
|
|
params.append(f"%{domain.strip().lower()}%")
|
|
where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
|
|
rows = conn.execute(
|
|
f"SELECT * FROM billing_accounts {where} ORDER BY updated_at DESC LIMIT ?",
|
|
(*params, limit),
|
|
).fetchall()
|
|
total = conn.execute(
|
|
f"SELECT COUNT(*) FROM billing_accounts {where}",
|
|
tuple(params),
|
|
).fetchone()[0]
|
|
return {
|
|
"accounts": [_row_dict(r, mask=mask) for r in rows],
|
|
"total": total,
|
|
}
|
|
|
|
|
|
def patch_account(conn, account_id: int, **fields) -> dict[str, Any] | None:
|
|
allowed = {
|
|
"billing_state",
|
|
"recurrence_active",
|
|
"external_customer_id",
|
|
"external_subscription_id",
|
|
"payment_provider",
|
|
"plan_code",
|
|
"activated_at",
|
|
"activated_by",
|
|
"ticket_id",
|
|
}
|
|
if fields.get("recurrence_active"):
|
|
fields.setdefault("billing_state", "billing_active")
|
|
sets = []
|
|
params: list[Any] = []
|
|
for key, val in fields.items():
|
|
if key not in allowed:
|
|
continue
|
|
if key == "recurrence_active":
|
|
val = 1 if val else 0
|
|
sets.append(f"{key} = ?")
|
|
params.append(val)
|
|
if not sets:
|
|
return get_account(conn, account_id)
|
|
sets.append("updated_at = ?")
|
|
params.append(_now())
|
|
params.append(account_id)
|
|
conn.execute(f"UPDATE billing_accounts SET {', '.join(sets)} WHERE id = ?", params)
|
|
conn.commit()
|
|
return get_account(conn, account_id)
|
|
|
|
|
|
def summary(conn) -> dict[str, Any]:
|
|
pending = conn.execute(
|
|
"SELECT COUNT(*) FROM billing_accounts WHERE billing_state = 'awaiting_billing_validation'"
|
|
).fetchone()[0]
|
|
active = conn.execute(
|
|
"SELECT COUNT(*) FROM billing_accounts WHERE recurrence_active = 1"
|
|
).fetchone()[0]
|
|
total = conn.execute("SELECT COUNT(*) FROM billing_accounts").fetchone()[0]
|
|
recent = conn.execute(
|
|
"SELECT * FROM billing_accounts ORDER BY updated_at DESC LIMIT 5"
|
|
).fetchall()
|
|
return {
|
|
"billing_pending": pending,
|
|
"billing_active": active,
|
|
"billing_total": total,
|
|
"recent_validations": [_row_dict(r) for r in recent],
|
|
}
|