#!/usr/bin/env python3 """OpenPanel Community → FOSSBilling API bridge (Ligbox re-engenharia). Substitui API Enterprise: opencli user-add + domains-add + gestão contas. LAN only — porta 18087. """ from __future__ import annotations import json import os import re import subprocess import sys from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import parse_qs, urlparse HOST = os.environ.get("BRIDGE_HOST", "0.0.0.0") PORT = int(os.environ.get("BRIDGE_PORT", "18087")) ADMIN_USER = os.environ.get("BRIDGE_ADMIN_USER", "ligboxadmin") ADMIN_PASS = os.environ.get("BRIDGE_ADMIN_PASS", "LbOpen805353") TOKEN = os.environ.get("BRIDGE_TOKEN", "ligbox-community-bridge-token") DEFAULT_PLAN = os.environ.get("BRIDGE_DEFAULT_PLAN", "ligbox-site-cms") USER_RE = re.compile(r"^[a-z][a-z0-9]{2,15}$") def run_opencli(*args: str, timeout: int = 300) -> tuple[int, str, str]: cmd = ["opencli", *args] proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) out = (proc.stdout or "").strip() err = (proc.stderr or "").strip() return proc.returncode, out, err def normalize_domain(domain: str) -> str: """Corrige domínios FOSS (ex: test94812ligbox.com.br → test94812.ligbox.com.br).""" d = (domain or "").strip().lower() if not d: return d if d.endswith(".ligbox"): return f"{d}.com.br" # FOSS às vezes concatena sld+tld sem ponto antes de ligbox.com.br m = re.fullmatch(r"([a-z0-9-]+)ligbox\.com\.br", d) if m: return f"{m.group(1)}.ligbox.com.br" return d def panel_domain_for(domain: str) -> str: return normalize_domain(domain) def username_from_domain(domain: str) -> str: base = re.sub(r"[^a-z0-9]", "", domain.lower()) if base.endswith("ligbox"): base = base[:-6] return base[:15] if len(base) >= 3 else "" def ok_message(*parts: str) -> str: return "\n".join(p for p in parts if p) class Handler(BaseHTTPRequestHandler): server_version = "LigboxOpenPanelBridge/2.0" def log_message(self, fmt: str, *args) -> None: sys.stderr.write("%s - %s\n" % (self.address_string(), fmt % args)) def _read_json(self) -> dict: length = int(self.headers.get("Content-Length", 0)) if not length: return {} raw = self.rfile.read(length).decode("utf-8") return json.loads(raw) if raw.strip() else {} def _send(self, code: int, payload: dict) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _auth_ok(self) -> bool: return self.headers.get("Authorization", "") == f"Bearer {TOKEN}" def _provision_user(self, data: dict) -> tuple[int, dict]: username = (data.get("username") or "").strip().lower() password = data.get("password") or "" email = data.get("email") or "" plan = data.get("plan_name") or DEFAULT_PLAN domain = (data.get("domain") or data.get("domain_name") or "").strip().lower() if not username and domain: username = username_from_domain(domain) if not email and domain: email = f"hosting@{domain}" if not USER_RE.fullmatch(username or ""): return 400, {"success": False, "error": f"Invalid username: {username!r}"} if not password: return 400, {"success": False, "error": "password required"} if not email: return 400, {"success": False, "error": "email required"} code, out, err = run_opencli( "user-add", username, password, email, plan, "--no-sentinel" ) msg = ok_message(out, err) if code != 0 and "Successfully added user" not in msg and "already exists" not in msg.lower(): return 500, {"success": False, "error": msg} domain_msg = "" if domain: panel_dom = panel_domain_for(domain) dcode, dout, derr = run_opencli("domains-add", panel_dom, username) domain_msg = ok_message(dout, derr) if dcode != 0 and "already exists" not in domain_msg.lower() and "success" not in domain_msg.lower(): return 500, { "success": False, "error": f"user OK but domain failed: {domain_msg}", "username": username, } return 200, { "success": True, "response": { "message": ok_message(msg, domain_msg) or f"Provisioned {username}", "username": username, "domain": panel_domain_for(domain) if domain else None, }, } def do_POST(self) -> None: path = urlparse(self.path).path.rstrip("/") or "/" if path == "/api": data = self._read_json() if data.get("username") == ADMIN_USER and data.get("password") == ADMIN_PASS: self._send(200, {"access_token": TOKEN}) else: self._send(401, {"error": "Invalid credentials"}) return if not self._auth_ok(): self._send(401, {"error": "Unauthorized"}) return if path == "/api/users": code, payload = self._provision_user(self._read_json()) self._send(code, payload) return if path == "/api/domains": data = self._read_json() domain = panel_domain_for(data.get("domain") or data.get("domain_name") or "") username = (data.get("username") or "").strip().lower() if not domain or not USER_RE.fullmatch(username): self._send(400, {"success": False, "error": "domain + username required"}) return code, out, err = run_opencli("domains-add", domain, username) msg = ok_message(out, err) if code == 0 or "success" in msg.lower() or "already exists" in msg.lower(): self._send(200, {"success": True, "response": {"message": msg, "domain": domain}}) else: self._send(500, {"success": False, "error": msg}) return self._send(404, {"error": "This api route does not exist."}) def do_GET(self) -> None: path = urlparse(self.path).path.rstrip("/") or "/" qs = parse_qs(urlparse(self.path).query) if path == "/api": if not self._auth_ok(): self._send(401, {"error": "Unauthorized"}) return self._send(200, {"message": "API is working!", "bridge": "ligbox-v2"}) return if not self._auth_ok(): self._send(401, {"error": "Unauthorized"}) return if path == "/api/users": code, out, err = run_opencli("user-list", "--json") if code == 0 and out: try: users = json.loads(out) self._send(200, {"success": True, "users": users}) return except json.JSONDecodeError: pass self._send(200, {"success": True, "raw": ok_message(out, err)}) return m = re.match(r"^/api/users/([a-z0-9]+)$", path) if m: username = m.group(1) code, out, err = run_opencli("domains-user", username) self._send(200, { "success": code == 0, "username": username, "domains": out, "error": err or None, }) return self._send(404, {"error": "This api route does not exist."}) def do_PATCH(self) -> None: if not self._auth_ok(): self._send(401, {"error": "Unauthorized"}) return path = urlparse(self.path).path m = re.match(r"^/api/users/([a-z0-9]+)$", path) if not m: self._send(404, {"error": "This api route does not exist."}) return username = m.group(1) data = self._read_json() action = data.get("action") if action == "suspend": code, out, err = run_opencli("user-suspend", username) elif action == "unsuspend": code, out, err = run_opencli("user-unsuspend", username) elif "password" in data: code, out, err = run_opencli("user-password", username, data["password"]) else: self._send(400, {"success": False, "error": "Unknown action"}) return if code == 0: self._send(200, {"success": True}) else: self._send(500, {"success": False, "error": ok_message(out, err)}) def do_DELETE(self) -> None: if not self._auth_ok(): self._send(401, {"error": "Unauthorized"}) return path = urlparse(self.path).path m = re.match(r"^/api/users/([a-z0-9]+)$", path) if not m: self._send(404, {"error": "This api route does not exist."}) return username = m.group(1) code, out, err = run_opencli("user-delete", username, "-y") if code == 0: self._send(200, {"success": True}) else: self._send(500, {"success": False, "error": ok_message(out, err)}) def main() -> None: httpd = ThreadingHTTPServer((HOST, PORT), Handler) print(f"Ligbox OpenPanel bridge v2 on http://{HOST}:{PORT}", flush=True) httpd.serve_forever() if __name__ == "__main__": main()