"""Cliente Odoo 16 XML-RPC — atribuição de perfis via res.groups / res.users.""" from __future__ import annotations import os import secrets import xmlrpc.client from typing import Any from app.vm123.role_map import DESK_ROLE_ODOO_GROUP_NAMES, DESK_ROLE_ODOO_XMLIDS ODOO_URL = os.getenv("ODOO_URL", "http://10.10.10.123:8069").rstrip("/") ODOO_DB = os.getenv("ODOO_DB", "ligbox") ODOO_LOGIN = os.getenv("ODOO_LOGIN", "admin@ligbox.com.br") ODOO_API_KEY = os.getenv("ODOO_API_KEY", os.getenv("ODOO_PASSWORD", "")) ODOO_PUBLIC_URL = os.getenv( "ODOO_PUBLIC_URL", "https://financeiro.ligbox.com.br/odoo/web/login?db=ligbox", ) class OdooConfigError(RuntimeError): pass class OdooProvisionError(RuntimeError): pass def _configured() -> bool: return bool(ODOO_API_KEY and ODOO_LOGIN and ODOO_DB) def _client(): if not _configured(): raise OdooConfigError("ODOO_LOGIN / ODOO_API_KEY não configurados no Desk") common = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/common", allow_none=True) uid = common.authenticate(ODOO_DB, ODOO_LOGIN, ODOO_API_KEY, {}) if not uid: raise OdooConfigError("falha autenticação Odoo — verifique credenciais") models = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/object", allow_none=True) return uid, models def _resolve_xmlid(uid: int, models, xmlid: str) -> int | None: if "." not in xmlid: return None module, name = xmlid.split(".", 1) rows = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "ir.model.data", "search_read", [[("module", "=", module), ("name", "=", name)]], {"fields": ["res_id"], "limit": 1}, ) if rows: return int(rows[0]["res_id"]) return None def _resolve_group_names(uid: int, models, names: tuple[str, ...]) -> list[int]: ids: list[int] = [] for label in names: rows = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "res.groups", "search_read", [[("full_name", "=", label)]], {"fields": ["id"], "limit": 1}, ) if not rows: rows = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "res.groups", "search_read", [[("name", "=", label)]], {"fields": ["id"], "limit": 1}, ) if rows: ids.append(int(rows[0]["id"])) return ids def group_ids_for_desk_role(role: str) -> list[int]: """Resolve group IDs Odoo para função Desk. Levanta se apps não instaladas.""" uid, models = _client() xmlids = DESK_ROLE_ODOO_XMLIDS.get(role, ()) group_ids: list[int] = [] missing_xmlids: list[str] = [] for xid in xmlids: gid = _resolve_xmlid(uid, models, xid) if gid: group_ids.append(gid) else: missing_xmlids.append(xid) if group_ids: return group_ids # fallback por nome names = DESK_ROLE_ODOO_GROUP_NAMES.get(role, ()) group_ids = _resolve_group_names(uid, models, names) if group_ids: return group_ids hint = ", ".join(missing_xmlids) or role raise OdooProvisionError( f"grupos Odoo não encontrados para role={role} ({hint}). " "Instale apps Sales/Accounting no Odoo ou crie grupos custom." ) def list_role_model(role: str) -> dict[str, Any]: """Introspecção — grupos mapeados e estado das apps (para Roger / debug).""" if not _configured(): return {"configured": False, "role": role, "groups": [], "note": "ODOO_API_KEY ausente"} uid, models = _client() xmlids = DESK_ROLE_ODOO_XMLIDS.get(role, ()) resolved = [] for xid in xmlids: gid = _resolve_xmlid(uid, models, xid) item: dict[str, Any] = {"xmlid": xid, "group_id": gid} if gid: g = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "res.groups", "read", [[gid]], {"fields": ["name", "full_name"]}, )[0] item["name"] = g.get("full_name") or g.get("name") else: item["missing"] = True resolved.append(item) installed = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "ir.module.module", "search_read", [[("name", "in", ["sale", "sale_management", "account", "crm"]), ("state", "=", "installed")]], {"fields": ["name", "state"], "limit": 20}, ) return { "configured": True, "role": role, "db": ODOO_DB, "public_url": ODOO_PUBLIC_URL, "groups": resolved, "installed_sales_account_modules": [m["name"] for m in installed], } def find_partner_by_email(email: str) -> dict[str, Any] | None: uid, models = _client() rows = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "res.partner", "search_read", [[("email", "=ilike", email.strip())]], {"fields": ["id", "name", "email", "vat"], "limit": 1}, ) return rows[0] if rows else None def find_user_by_login(login: str) -> dict[str, Any] | None: uid, models = _client() rows = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "res.users", "search_read", [[("login", "=", login.strip().lower())]], {"fields": ["id", "name", "login", "groups_id"], "limit": 1}, ) return rows[0] if rows else None def upsert_internal_user( *, email: str, name: str, desk_role: str, password: str | None = None, ) -> dict[str, Any]: """Cria ou actualiza utilizador interno Ligbox com groups_id conforme função Desk.""" uid, models = _client() login = email.strip().lower() group_ids = group_ids_for_desk_role(desk_role) # Internal User (base.group_user) — xmlid base.group_user base_user_gid = _resolve_xmlid(uid, models, "base.group_user") if base_user_gid and base_user_gid not in group_ids: group_ids = [base_user_gid, *group_ids] existing = find_user_by_login(login) groups_cmd = [(6, 0, group_ids)] if existing: models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "res.users", "write", [[existing["id"]], {"name": name, "groups_id": groups_cmd}], ) odoo_uid = int(existing["id"]) created = False else: pwd = password or secrets.token_urlsafe(16) odoo_uid = models.execute_kw( ODOO_DB, uid, ODOO_API_KEY, "res.users", "create", [ { "name": name, "login": login, "email": login, "password": pwd, "groups_id": groups_cmd, } ], ) created = True return { "odoo_uid": odoo_uid, "login": login, "created": created, "group_ids": group_ids, "login_url": ODOO_PUBLIC_URL, }