262 lines
9.4 KiB
Python
Executable file
262 lines
9.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""OpenPanel Community → FOSSBilling API bridge (Ligbox re-engenharia).
|
|
|
|
Substitui API Enterprise: opencli user-add + domains-add + gestão contas.
|
|
LAN only — porta 18087.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
HOST = os.environ.get("BRIDGE_HOST", "0.0.0.0")
|
|
PORT = int(os.environ.get("BRIDGE_PORT", "18087"))
|
|
ADMIN_USER = os.environ.get("BRIDGE_ADMIN_USER", "ligboxadmin")
|
|
ADMIN_PASS = os.environ.get("BRIDGE_ADMIN_PASS", "LbOpen805353")
|
|
TOKEN = os.environ.get("BRIDGE_TOKEN", "ligbox-community-bridge-token")
|
|
DEFAULT_PLAN = os.environ.get("BRIDGE_DEFAULT_PLAN", "ligbox-site-cms")
|
|
USER_RE = re.compile(r"^[a-z][a-z0-9]{2,15}$")
|
|
|
|
|
|
def run_opencli(*args: str, timeout: int = 300) -> tuple[int, str, str]:
|
|
cmd = ["opencli", *args]
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
out = (proc.stdout or "").strip()
|
|
err = (proc.stderr or "").strip()
|
|
return proc.returncode, out, err
|
|
|
|
|
|
def normalize_domain(domain: str) -> str:
|
|
"""Corrige domínios FOSS (ex: test94812ligbox.com.br → test94812.ligbox.com.br)."""
|
|
d = (domain or "").strip().lower()
|
|
if not d:
|
|
return d
|
|
if d.endswith(".ligbox"):
|
|
return f"{d}.com.br"
|
|
# FOSS às vezes concatena sld+tld sem ponto antes de ligbox.com.br
|
|
m = re.fullmatch(r"([a-z0-9-]+)ligbox\.com\.br", d)
|
|
if m:
|
|
return f"{m.group(1)}.ligbox.com.br"
|
|
return d
|
|
|
|
|
|
def panel_domain_for(domain: str) -> str:
|
|
return normalize_domain(domain)
|
|
|
|
|
|
def username_from_domain(domain: str) -> str:
|
|
base = re.sub(r"[^a-z0-9]", "", domain.lower())
|
|
if base.endswith("ligbox"):
|
|
base = base[:-6]
|
|
return base[:15] if len(base) >= 3 else ""
|
|
|
|
|
|
def ok_message(*parts: str) -> str:
|
|
return "\n".join(p for p in parts if p)
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
server_version = "LigboxOpenPanelBridge/2.0"
|
|
|
|
def log_message(self, fmt: str, *args) -> None:
|
|
sys.stderr.write("%s - %s\n" % (self.address_string(), fmt % args))
|
|
|
|
def _read_json(self) -> dict:
|
|
length = int(self.headers.get("Content-Length", 0))
|
|
if not length:
|
|
return {}
|
|
raw = self.rfile.read(length).decode("utf-8")
|
|
return json.loads(raw) if raw.strip() else {}
|
|
|
|
def _send(self, code: int, payload: dict) -> None:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _auth_ok(self) -> bool:
|
|
return self.headers.get("Authorization", "") == f"Bearer {TOKEN}"
|
|
|
|
def _provision_user(self, data: dict) -> tuple[int, dict]:
|
|
username = (data.get("username") or "").strip().lower()
|
|
password = data.get("password") or ""
|
|
email = data.get("email") or ""
|
|
plan = data.get("plan_name") or DEFAULT_PLAN
|
|
domain = (data.get("domain") or data.get("domain_name") or "").strip().lower()
|
|
|
|
if not username and domain:
|
|
username = username_from_domain(domain)
|
|
if not email and domain:
|
|
email = f"hosting@{domain}"
|
|
|
|
if not USER_RE.fullmatch(username or ""):
|
|
return 400, {"success": False, "error": f"Invalid username: {username!r}"}
|
|
if not password:
|
|
return 400, {"success": False, "error": "password required"}
|
|
if not email:
|
|
return 400, {"success": False, "error": "email required"}
|
|
|
|
code, out, err = run_opencli(
|
|
"user-add", username, password, email, plan, "--no-sentinel"
|
|
)
|
|
msg = ok_message(out, err)
|
|
if code != 0 and "Successfully added user" not in msg and "already exists" not in msg.lower():
|
|
return 500, {"success": False, "error": msg}
|
|
|
|
domain_msg = ""
|
|
if domain:
|
|
panel_dom = panel_domain_for(domain)
|
|
dcode, dout, derr = run_opencli("domains-add", panel_dom, username)
|
|
domain_msg = ok_message(dout, derr)
|
|
if dcode != 0 and "already exists" not in domain_msg.lower() and "success" not in domain_msg.lower():
|
|
return 500, {
|
|
"success": False,
|
|
"error": f"user OK but domain failed: {domain_msg}",
|
|
"username": username,
|
|
}
|
|
|
|
return 200, {
|
|
"success": True,
|
|
"response": {
|
|
"message": ok_message(msg, domain_msg) or f"Provisioned {username}",
|
|
"username": username,
|
|
"domain": panel_domain_for(domain) if domain else None,
|
|
},
|
|
}
|
|
|
|
def do_POST(self) -> None:
|
|
path = urlparse(self.path).path.rstrip("/") or "/"
|
|
|
|
if path == "/api":
|
|
data = self._read_json()
|
|
if data.get("username") == ADMIN_USER and data.get("password") == ADMIN_PASS:
|
|
self._send(200, {"access_token": TOKEN})
|
|
else:
|
|
self._send(401, {"error": "Invalid credentials"})
|
|
return
|
|
|
|
if not self._auth_ok():
|
|
self._send(401, {"error": "Unauthorized"})
|
|
return
|
|
|
|
if path == "/api/users":
|
|
code, payload = self._provision_user(self._read_json())
|
|
self._send(code, payload)
|
|
return
|
|
|
|
if path == "/api/domains":
|
|
data = self._read_json()
|
|
domain = panel_domain_for(data.get("domain") or data.get("domain_name") or "")
|
|
username = (data.get("username") or "").strip().lower()
|
|
if not domain or not USER_RE.fullmatch(username):
|
|
self._send(400, {"success": False, "error": "domain + username required"})
|
|
return
|
|
code, out, err = run_opencli("domains-add", domain, username)
|
|
msg = ok_message(out, err)
|
|
if code == 0 or "success" in msg.lower() or "already exists" in msg.lower():
|
|
self._send(200, {"success": True, "response": {"message": msg, "domain": domain}})
|
|
else:
|
|
self._send(500, {"success": False, "error": msg})
|
|
return
|
|
|
|
self._send(404, {"error": "This api route does not exist."})
|
|
|
|
def do_GET(self) -> None:
|
|
path = urlparse(self.path).path.rstrip("/") or "/"
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
|
|
if path == "/api":
|
|
if not self._auth_ok():
|
|
self._send(401, {"error": "Unauthorized"})
|
|
return
|
|
self._send(200, {"message": "API is working!", "bridge": "ligbox-v2"})
|
|
return
|
|
|
|
if not self._auth_ok():
|
|
self._send(401, {"error": "Unauthorized"})
|
|
return
|
|
|
|
if path == "/api/users":
|
|
code, out, err = run_opencli("user-list", "--json")
|
|
if code == 0 and out:
|
|
try:
|
|
users = json.loads(out)
|
|
self._send(200, {"success": True, "users": users})
|
|
return
|
|
except json.JSONDecodeError:
|
|
pass
|
|
self._send(200, {"success": True, "raw": ok_message(out, err)})
|
|
return
|
|
|
|
m = re.match(r"^/api/users/([a-z0-9]+)$", path)
|
|
if m:
|
|
username = m.group(1)
|
|
code, out, err = run_opencli("domains-user", username)
|
|
self._send(200, {
|
|
"success": code == 0,
|
|
"username": username,
|
|
"domains": out,
|
|
"error": err or None,
|
|
})
|
|
return
|
|
|
|
self._send(404, {"error": "This api route does not exist."})
|
|
|
|
def do_PATCH(self) -> None:
|
|
if not self._auth_ok():
|
|
self._send(401, {"error": "Unauthorized"})
|
|
return
|
|
path = urlparse(self.path).path
|
|
m = re.match(r"^/api/users/([a-z0-9]+)$", path)
|
|
if not m:
|
|
self._send(404, {"error": "This api route does not exist."})
|
|
return
|
|
username = m.group(1)
|
|
data = self._read_json()
|
|
action = data.get("action")
|
|
if action == "suspend":
|
|
code, out, err = run_opencli("user-suspend", username)
|
|
elif action == "unsuspend":
|
|
code, out, err = run_opencli("user-unsuspend", username)
|
|
elif "password" in data:
|
|
code, out, err = run_opencli("user-password", username, data["password"])
|
|
else:
|
|
self._send(400, {"success": False, "error": "Unknown action"})
|
|
return
|
|
if code == 0:
|
|
self._send(200, {"success": True})
|
|
else:
|
|
self._send(500, {"success": False, "error": ok_message(out, err)})
|
|
|
|
def do_DELETE(self) -> None:
|
|
if not self._auth_ok():
|
|
self._send(401, {"error": "Unauthorized"})
|
|
return
|
|
path = urlparse(self.path).path
|
|
m = re.match(r"^/api/users/([a-z0-9]+)$", path)
|
|
if not m:
|
|
self._send(404, {"error": "This api route does not exist."})
|
|
return
|
|
username = m.group(1)
|
|
code, out, err = run_opencli("user-delete", username, "-y")
|
|
if code == 0:
|
|
self._send(200, {"success": True})
|
|
else:
|
|
self._send(500, {"success": False, "error": ok_message(out, err)})
|
|
|
|
|
|
def main() -> None:
|
|
httpd = ThreadingHTTPServer((HOST, PORT), Handler)
|
|
print(f"Ligbox OpenPanel bridge v2 on http://{HOST}:{PORT}", flush=True)
|
|
httpd.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|