Version: 1.0
Fecha: 25 de Marzo, 2026
Contexto: Fase 2 del plan de migracion backend — creacion del cliente HTTP para SimappeAdmin y reescritura del sistema de conexion a base de datos para soportar BD-por-tenant
Arquitecto: Carlos Alberto Torres Camargo
Clasificacion: Interno — Arquitectura
Documentar la creacion del cliente HTTP que consume el servicio de conexion de SimappeAdmin (/api/v2/database-config/connection) y la reescritura completa de database.py para gestionar pools de conexiones dinamicos por tenant, reemplazando la conexion unica fija actual.
database.py)Archivo: backend/app/database.py (35 lineas)
# Estado actual — UN solo engine, UNA sola BD
engine = create_async_engine(
settings.database_url, # ← Fija: postgresql+asyncpg://datavault_user:datavault_pass@postgres:5432/datavault
echo=settings.debug,
future=True,
pool_size=5, # 5 conexiones base por worker
max_overflow=10, # 10 adicionales en picos
pool_pre_ping=True,
pool_recycle=3600,
pool_timeout=30,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
Limitaciones:
| # | Limitacion | Impacto |
|---|---|---|
| 1 | Una sola DATABASE_URL fija |
Todos los tenants comparten la misma BD |
| 2 | Aislamiento solo por tenant_id en cada tabla |
Violacion de seguridad si falla el filtro |
| 3 | No puede conectar a BDs diferentes por tenant | Incompatible con modelo Simappe |
| 4 | Pool unico | No escala para multi-tenant aislado |
Endpoint: GET /api/v2/database-config/connection
Fuente: SimappeAdmin/v1/databaseconfig/component/DatabaseConfigComponentV2.java — metodo getConnection()
Flujo de resolucion jerarquica:
1. Extraer claims del JWT:
- clientId = customerId
- companyId
- subsidiaryId
- environment
- connectionContext
2. Buscar configuracion en orden de prioridad:
Level 1: Subsidiary-specific (companyId + subsidiaryId) ← Mayor prioridad
Level 2: Company-specific (companyId, sin subsidiaryId)
Level 3: Global (isGlobal=true)
Level 4: Fallback legacy (clientId + environment)
3. Cache resultado (Caffeine: 30 min, 100 entradas max)
4. Retornar DatabaseConfigV2Dto
Respuesta del servicio (DatabaseConfigV2Dto):
{
"id": 42,
"code": "VAULT_TENANT_1",
"name": "DataVault - Empresa ABC",
"url": "jdbc:postgresql://db-host:5432/vault_abc",
"host": "db-host",
"port": 5432,
"database": "vault_abc",
"schema": "public",
"username": "vault_user",
"password": "encrypted_password",
"type": "POSTGRESQL",
"clientId": 100,
"companyId": 200,
"subsidiaryId": null,
"environment": "dev",
"status": "ACTIVE",
"maxPoolSize": 10,
"minIdle": 2,
"connectionTimeout": 30000
}
services/simappe_client.pyArchivo: backend/app/services/simappe_client.py
Accion: Crear archivo nuevo — Cliente HTTP para consumir SimappeAdmin.
# ============================================================
# NUEVO services/simappe_client.py
# ============================================================
import httpx
import logging
import time
from typing import Optional
from dataclasses import dataclass
from app.config import settings
logger = logging.getLogger(__name__)
@dataclass
class TenantDatabaseConfig:
"""Configuracion de BD resuelta desde SimappeAdmin."""
id: int
code: str
name: str
host: str
port: int
database: str
schema: str
username: str
password: str
db_type: str # POSTGRESQL, MYSQL, etc.
client_id: int
company_id: Optional[int]
subsidiary_id: Optional[int]
environment: str
max_pool_size: int = 10
min_idle: int = 2
connection_timeout: int = 30000
@property
def asyncpg_url(self) -> str:
"""Construye URL de conexion para asyncpg."""
return (
f"postgresql+asyncpg://{self.username}:{self.password}"
f"@{self.host}:{self.port}/{self.database}"
)
@property
def cache_key(self) -> str:
"""Clave unica para cache de pool de conexiones."""
parts = [str(self.client_id), self.environment]
if self.company_id:
parts.append(str(self.company_id))
if self.subsidiary_id:
parts.append(str(self.subsidiary_id))
return ":".join(parts)
class _CacheEntry:
"""Entrada del cache con TTL."""
def __init__(self, config: TenantDatabaseConfig, ttl_seconds: int):
self.config = config
self.expires_at = time.monotonic() + ttl_seconds
@property
def is_expired(self) -> bool:
return time.monotonic() >= self.expires_at
class SimappeAdminClient:
"""
Cliente HTTP para consumir el servicio de conexion de SimappeAdmin.
Responsabilidades:
1. Llamar a GET /api/v2/database-config/connection con el JWT
2. Cachear la respuesta (TTL configurable, default 30 minutos)
3. Parsear la respuesta en TenantDatabaseConfig
4. Manejar errores de red y timeouts
Patron: Singleton por proceso (una instancia por worker FastAPI).
"""
def __init__(self):
self._cache: dict[str, _CacheEntry] = {}
self._ttl_seconds = settings.tenant_db_cache_ttl_minutes * 60
self._http_client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""Lazy init del cliente HTTP."""
if self._http_client is None or self._http_client.is_closed:
self._http_client = httpx.AsyncClient(
base_url=settings.simappe_admin_url,
timeout=httpx.Timeout(10.0, connect=5.0),
)
return self._http_client
async def get_tenant_db_config(self, jwt_token: str) -> TenantDatabaseConfig:
"""
Obtiene la configuracion de BD para el tenant del usuario.
Args:
jwt_token: Token JWT de Simappe (sin prefijo "Bearer ")
Returns:
TenantDatabaseConfig con credenciales de BD
Raises:
SimappeAdminError si la llamada falla o no hay config
"""
# 1. Verificar cache
cache_key = self._make_cache_key(jwt_token)
cached = self._cache.get(cache_key)
if cached and not cached.is_expired:
logger.debug(f"Cache hit para tenant DB config: {cache_key}")
return cached.config
# 2. Llamar a SimappeAdmin
logger.info(f"Resolviendo config BD desde SimappeAdmin para key: {cache_key}")
client = await self._get_client()
try:
response = await client.get(
"/api/v2/database-config/connection",
headers={"Authorization": f"Bearer {jwt_token}"},
)
except httpx.RequestError as e:
logger.error(f"Error de red al contactar SimappeAdmin: {e}")
raise SimappeAdminError(
f"No se pudo contactar SimappeAdmin: {e}",
status_code=503,
)
if response.status_code == 404:
raise SimappeAdminError(
"No hay configuracion de BD para este tenant. "
"Verifique que el tenant tenga una BD configurada en SimappeAdmin.",
status_code=404,
)
if response.status_code != 200:
logger.error(
f"SimappeAdmin retorno {response.status_code}: {response.text}"
)
raise SimappeAdminError(
f"Error al resolver config BD: HTTP {response.status_code}",
status_code=response.status_code,
)
# 3. Parsear respuesta
data = response.json()
config = TenantDatabaseConfig(
id=data.get("id", 0),
code=data.get("code", ""),
name=data.get("name", ""),
host=data.get("host", "localhost"),
port=data.get("port", 5432),
database=data.get("database", ""),
schema=data.get("schema", "public"),
username=data.get("username", ""),
password=data.get("password", ""),
db_type=data.get("type", "POSTGRESQL"),
client_id=data.get("clientId", 0),
company_id=data.get("companyId"),
subsidiary_id=data.get("subsidiaryId"),
environment=data.get("environment", "dev"),
max_pool_size=data.get("maxPoolSize", 10),
min_idle=data.get("minIdle", 2),
connection_timeout=data.get("connectionTimeout", 30000),
)
# 4. Validar config
if not config.host or not config.database or not config.username:
raise SimappeAdminError(
f"Config BD incompleta para tenant {config.code}: "
f"host={config.host}, db={config.database}, user={config.username}",
status_code=500,
)
# 5. Cachear
self._cache[cache_key] = _CacheEntry(config, self._ttl_seconds)
# 6. Limpiar entradas expiradas
self._cleanup_expired()
return config
def invalidate_cache(self, cache_key: Optional[str] = None):
"""Invalida cache completo o una entrada especifica."""
if cache_key:
self._cache.pop(cache_key, None)
else:
self._cache.clear()
def _make_cache_key(self, jwt_token: str) -> str:
"""Genera cache key a partir del hash del token (no almacena el token)."""
import hashlib
return hashlib.sha256(jwt_token.encode()).hexdigest()[:16]
def _cleanup_expired(self):
"""Elimina entradas expiradas del cache."""
expired_keys = [
k for k, v in self._cache.items() if v.is_expired
]
for key in expired_keys:
del self._cache[key]
async def close(self):
"""Cierra el cliente HTTP."""
if self._http_client and not self._http_client.is_closed:
await self._http_client.aclose()
class SimappeAdminError(Exception):
"""Error al comunicarse con SimappeAdmin."""
def __init__(self, message: str, status_code: int = 500):
super().__init__(message)
self.status_code = status_code
# Singleton — una instancia por proceso
simappe_admin_client = SimappeAdminClient()
Controles implementados:
| # | Control | Implementacion |
|---|---|---|
| C1 | Cache con TTL | _CacheEntry con time.monotonic() — inmune a cambios de reloj |
| C2 | Limpieza automatica de cache | _cleanup_expired() en cada get_tenant_db_config() |
| C3 | Timeout de red | httpx.Timeout(10s total, 5s connect) |
| C4 | Validacion de respuesta | Verifica host, database, username no vacios |
| C5 | No almacena JWT en cache | Usa hash SHA-256 del token como clave |
| C6 | Error descriptivo si no hay config | Codigo 404 con mensaje claro |
database.py — Pool Dinamico por TenantArchivo: backend/app/database.py
Accion: Reescritura completa — de engine unico a pool por tenant.
# ============================================================
# NUEVO database.py — Pool de conexiones dinamico por tenant
# ============================================================
import time
import logging
from typing import Optional
from collections import OrderedDict
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
AsyncEngine,
)
from sqlalchemy.orm import DeclarativeBase
from fastapi import Depends, HTTPException
from app.config import settings
from app.services.simappe_client import (
simappe_admin_client,
TenantDatabaseConfig,
SimappeAdminError,
)
from app.utils.security import require_company_selected, SimappeUserSession
logger = logging.getLogger(__name__)
class Base(DeclarativeBase):
"""Base declarativa para modelos SQLAlchemy."""
pass
class _PoolEntry:
"""Entrada del pool con timestamp de ultimo uso."""
def __init__(self, engine: AsyncEngine, session_factory: async_sessionmaker):
self.engine = engine
self.session_factory = session_factory
self.last_used = time.monotonic()
def touch(self):
self.last_used = time.monotonic()
class TenantDatabaseManager:
"""
Gestiona pools de conexiones a base de datos por tenant.
Patron: LRU cache de engines. Cuando se alcanza max_pools,
se destruye el pool menos usado recientemente.
Importante:
- Cada tenant tiene su propio engine con su propio pool.
- Los pools se crean bajo demanda (lazy).
- Los pools inactivos se destruyen automaticamente.
- Thread-safe via OrderedDict + asyncio event loop.
"""
def __init__(self, max_pools: int = 10):
self._pools: OrderedDict[str, _PoolEntry] = OrderedDict()
self._max_pools = max_pools
async def get_session_factory(
self, db_config: TenantDatabaseConfig
) -> async_sessionmaker:
"""
Obtiene o crea un session factory para el tenant.
Args:
db_config: Configuracion de BD del tenant (desde SimappeAdmin)
Returns:
async_sessionmaker configurado para la BD del tenant
"""
cache_key = db_config.cache_key
# Verificar si ya existe
if cache_key in self._pools:
entry = self._pools[cache_key]
entry.touch()
# Mover al final (LRU)
self._pools.move_to_end(cache_key)
return entry.session_factory
# Crear nuevo pool
logger.info(
f"Creando pool de conexiones para tenant: "
f"{db_config.code} ({db_config.database}@{db_config.host}:{db_config.port})"
)
# Evictar pool mas antiguo si se alcanzo el limite
if len(self._pools) >= self._max_pools:
await self._evict_oldest()
engine = create_async_engine(
db_config.asyncpg_url,
echo=settings.debug,
future=True,
pool_size=db_config.min_idle,
max_overflow=db_config.max_pool_size - db_config.min_idle,
pool_pre_ping=True,
pool_recycle=3600,
pool_timeout=db_config.connection_timeout // 1000, # ms → s
)
session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
self._pools[cache_key] = _PoolEntry(engine, session_factory)
return session_factory
async def _evict_oldest(self):
"""Destruye el pool menos usado recientemente."""
if not self._pools:
return
oldest_key, oldest_entry = next(iter(self._pools.items()))
logger.info(f"Evictando pool de tenant: {oldest_key}")
await oldest_entry.engine.dispose()
del self._pools[oldest_key]
async def dispose_all(self):
"""Destruye todos los pools. Llamar al apagar la aplicacion."""
for key, entry in self._pools.items():
logger.info(f"Cerrando pool de tenant: {key}")
await entry.engine.dispose()
self._pools.clear()
@property
def active_pools(self) -> int:
return len(self._pools)
# Singleton global
tenant_db_manager = TenantDatabaseManager(
max_pools=settings.max_tenant_pools
)
# ============================================================
# Engine legacy para migraciones Alembic y operaciones admin
# ============================================================
_legacy_engine = create_async_engine(
settings.database_url,
echo=settings.debug,
future=True,
pool_size=3,
max_overflow=5,
pool_pre_ping=True,
)
_legacy_session_factory = async_sessionmaker(
_legacy_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_legacy_db():
"""
Sesion de BD legacy (URL fija).
Usar SOLO para migraciones Alembic y operaciones que no
requieren contexto de tenant.
"""
async with _legacy_session_factory() as session:
try:
yield session
finally:
await session.close()
# ============================================================
# Dependency injection para routers — BD del tenant
# ============================================================
async def get_tenant_db(
user_session: SimappeUserSession = Depends(require_company_selected),
):
"""
FastAPI dependency que inyecta una sesion de BD para el tenant del usuario.
Flujo:
1. Obtiene SimappeUserSession del JWT (via require_company_selected)
2. Llama a SimappeAdmin para resolver config BD (con cache)
3. Obtiene/crea pool para ese tenant
4. Yield sesion del pool correcto
Uso en routers:
@router.get("/files")
async def list_files(
user_session: SimappeUserSession = Depends(require_company_selected),
db: AsyncSession = Depends(get_tenant_db),
):
...
"""
# Obtener token del request context
# Nota: el token ya fue validado por require_company_selected
from starlette.requests import Request
from fastapi import Request as FastAPIRequest
# Re-extraer token para pasar a SimappeAdmin
# (necesitamos el token original para que SimappeAdmin resuelva la BD)
# Esto se inyecta via middleware — ver multitenancy.py
token = getattr(user_session, "_raw_token", "")
if not token:
raise HTTPException(
status_code=500,
detail="Token no disponible en contexto. Error interno.",
)
try:
db_config = await simappe_admin_client.get_tenant_db_config(token)
except SimappeAdminError as e:
logger.error(f"Error al resolver BD del tenant: {e}")
raise HTTPException(
status_code=e.status_code,
detail=str(e),
)
session_factory = await tenant_db_manager.get_session_factory(db_config)
async with session_factory() as session:
try:
yield session
finally:
await session.close()
Archivo: backend/app/main.py
Cambio: Agregar limpieza de pools al apagar.
# ============================================================
# CAMBIO en main.py — lifespan con cleanup
# ============================================================
from app.database import tenant_db_manager
from app.services.simappe_client import simappe_admin_client
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
os.makedirs(settings.upload_path, exist_ok=True)
logger.info("DataVault API iniciando...")
yield
# Shutdown — limpiar recursos
logger.info("Cerrando pools de conexiones de tenants...")
await tenant_db_manager.dispose_all()
await simappe_admin_client.close()
logger.info("DataVault API detenido.")
┌─────────────────────────────────────────┐
│ TenantDatabaseManager │
│ (max_pools = 10, LRU) │
│ │
│ ┌─────────────────────────────────┐ │
Request JWT │ │ Pool: 100:dev:200 │ │
(customerId=100, │ │ ├── Engine: pg://host1/vault_abc │ │
companyId=200) ──▶│ │ ├── pool_size: 2 │ │
│ │ ├── max_overflow: 8 │ │
│ │ └── last_used: 14:30:05 │ │
│ └─────────────────────────────────┘ │
│ │
Request JWT │ ┌─────────────────────────────────┐ │
(customerId=100, │ │ Pool: 100:dev:300 │ │
companyId=300) ──▶│ │ ├── Engine: pg://host2/vault_xyz │ │
│ │ ├── pool_size: 2 │ │
│ │ ├── max_overflow: 8 │ │
│ │ └── last_used: 14:31:12 │ │
│ └─────────────────────────────────┘ │
│ │
│ ... hasta 10 pools concurrentes │
│ Si se llega al limite → evictar LRU │
└─────────────────────────────────────────┘
| # | Riesgo | Prob. | Impacto | Mitigacion |
|---|---|---|---|---|
| R1 | SimappeAdmin no disponible | Media | Bloqueante | Cache de 30 min protege contra caidas cortas. Circuit breaker si hay 3 fallos consecutivos |
| R2 | Demasiados tenants agotan conexiones de PostgreSQL | Media | Alto | max_pools=10 limita pools activos. pool_size=2 por pool = max 20 conexiones |
| R3 | Password de BD en respuesta de SimappeAdmin interceptada | Baja | Critico | HTTPS obligatorio entre servicios. JWT como unico mecanismo de autenticacion |
| R4 | Cache key collision (SHA-256 truncado a 16 chars) | Muy baja | Medio | 16 chars hex = 2^64 combinaciones. Aceptable para <100 tenants |
| R5 | Pool zombie (tenant inactivo pero pool vivo) | Media | Bajo | LRU eviction cuando se alcanza max_pools. Futuro: TTL por pool inactivo |
| Metrica | Donde | Alerta si |
|---|---|---|
datavault.pools.active |
tenant_db_manager.active_pools |
> 8 (80% del limite) |
datavault.simappe.latency_ms |
simappe_client.get_tenant_db_config() |
> 500ms |
datavault.simappe.errors |
Contador de SimappeAdminError |
> 3 en 5 min |
datavault.cache.hit_rate |
Cache hits / total requests | < 70% |
datavault.pool.connections |
SQLAlchemy pool status | > 80% de max_pool_size |
services/simappe_client.py con SimappeAdminClientdatabase.py con TenantDatabaseManagermain.py lifespan con cleanuphttpx a requirements.txtSIMAPPE_ADMIN_URL en variables de entorno| Version | Fecha | Autor | Descripcion |
|---|---|---|---|
| 1.0.0 | 2026-03-25 | Carlos Torres | Creacion del documento de conexion BD dinamica y cliente SimappeAdmin |