Version: 1.0
Fecha: 25 de Marzo, 2026
Contexto: Fase 1 del plan de migracion backend — reemplazo del sistema JWT propio de DataVault por validacion de tokens emitidos por SimappeOAuth2Server
Arquitecto: Carlos Alberto Torres Camargo
Clasificacion: Interno — Arquitectura
Documentar con precision tecnica los cambios necesarios para que el backend FastAPI de DataVault deje de generar y validar sus propios tokens JWT, y en su lugar valide tokens emitidos por SimappeOAuth2Server, extrayendo los claims enriquecidos que Simappe incluye en el JWT.
utils/auth.py)Archivo: backend/app/utils/auth.py (53 lineas)
# Estado actual — SERA REEMPLAZADO
from jose import JWTError, jwt
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def create_refresh_token(data: dict):
"""Create JWT refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def verify_token(token: str) -> Optional[dict]:
"""Verify and decode JWT token."""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return payload
except JWTError:
return None
Problemas identificados:
| # | Problema | Impacto |
|---|---|---|
| 1 | Usa SECRET_KEY local (HS256) — no compatible con JWT Simappe |
Bloqueante |
| 2 | Genera tokens propios (create_access_token, create_refresh_token) — innecesario |
Eliminar |
| 3 | Claims minimalistas: solo sub (user_id) y type |
No tiene customerId, companyId, etc. |
| 4 | verify_token() valida contra clave local |
No puede validar tokens externos |
Fuente: SimappeOauth2Server/v1/component/JwtHelper.java — metodo createToken()
El JWT de Simappe contiene un UserSession serializado como claims:
UserSession userSession = UserSession.builder()
.id(userDto.getId()) // Long — ID del usuario
.username(userDto.getUsername()) // String — username
.firstName(userDto.getFirstName()) // String
.lastName(userDto.getLastName()) // String
.email(userDto.getEmail()) // String
.customerId(userDto.getCustomer().getId()) // Long — clientId (tenant principal)
.cityId(userDto.getCity().getId()) // Long
.roles(userDto.getRoles()) // List<RoleDto> — roles asignados
.environment(environment) // String — "dev", "prod"
.connectionContext(determineConnectionContext(...)) // Enum — BUSINESS, ADMIN, ANALYTICS
.companyBaseCurrency(...) // String — moneda base
.customerConsolidationCurrency(...) // String — moneda consolidacion
.preferredCurrency(...) // String — moneda preferida
.build();
Claims adicionales despues de seleccionar compania (metodo addCompany()):
// Cuando el usuario selecciona una compania, el token se re-firma con:
userSession.setCompanyId(companyId); // Long — ID de la compania (= tenantId en Nebula)
userSession.setSubsidiaryId(subsidiaryId); // Long — ID de la filial (opcional)
| Claim Simappe | Tipo | Uso en DataVault | Obligatorio |
|---|---|---|---|
id |
Long | Identificador del usuario | Si |
username |
String | Nombre de usuario para auditoria | Si |
firstName + lastName |
String | Nombre completo para UI | No |
email |
String | Email del usuario | Si |
customerId |
Long | clientId — tenant principal en SimappeAdmin | Si |
companyId |
Long | tenantId — compania seleccionada (BD especifica) | Si |
subsidiaryId |
Long | Filial dentro de la compania | No |
environment |
String | Ambiente (dev, prod) para resolucion de BD |
Si |
connectionContext |
Enum | Tipo de conexion (BUSINESS, ADMIN, ANALYTICS) |
Si |
roles |
List | Roles del usuario para autorizacion | Si |
config.py — Agregar Configuracion SimappeArchivo: backend/app/config.py
Lineas afectadas: 6-14 (seccion JWT Configuration)
# ============================================================
# ANTES (estado actual)
# ============================================================
class Settings(BaseSettings):
# Database
database_url: str = "postgresql+asyncpg://datavault_user:datavault_pass@postgres:5432/datavault"
# JWT Configuration
secret_key: str = "dev-secret-key-change-in-production"
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
# ============================================================
# DESPUES (estado objetivo)
# ============================================================
class Settings(BaseSettings):
# Database — ya no se usa como URL fija, se resuelve desde SimappeAdmin
# Se mantiene como fallback para migraciones Alembic
database_url: str = "postgresql+asyncpg://datavault_user:datavault_pass@postgres:5432/datavault"
# Simappe Integration
simappe_admin_url: str = "http://simappe-admin:8081"
simappe_oauth2_url: str = "http://simappe-oauth2:8080"
simappe_jwt_secret: str = "" # Secreto compartido para validar JWT Simappe (HS256)
simappe_jwt_algorithm: str = "HS256"
# Periodo de gracia: aceptar JWT propio durante transicion
legacy_jwt_enabled: bool = True # Desactivar despues de 2 semanas
legacy_secret_key: str = "dev-secret-key-change-in-production"
# Cache
tenant_db_cache_ttl_minutes: int = 30
max_tenant_pools: int = 10
Control: Verificar que simappe_jwt_secret NO sea el valor por defecto en produccion.
utils/auth.py — Reescritura CompletaArchivo: backend/app/utils/auth.py
Accion: Reescribir completamente. Eliminar generacion de tokens, mantener solo validacion.
# ============================================================
# NUEVO utils/auth.py — Validacion de JWT Simappe
# ============================================================
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, field
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.config import settings
import logging
logger = logging.getLogger(__name__)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@dataclass
class SimappeUserSession:
"""Representacion del UserSession extraido del JWT de Simappe."""
id: int # ID del usuario en Simappe
username: str # Nombre de usuario
first_name: str = "" # Nombre
last_name: str = "" # Apellido
email: str = "" # Email
customer_id: int = 0 # clientId — tenant principal
company_id: Optional[int] = None # companyId — compania seleccionada
subsidiary_id: Optional[int] = None # subsidiaryId — filial (opcional)
environment: str = "dev" # Ambiente
connection_context: str = "BUSINESS" # Tipo de conexion
roles: list = field(default_factory=list) # Roles del usuario
@property
def full_name(self) -> str:
return f"{self.first_name} {self.last_name}".strip()
@property
def has_company(self) -> bool:
"""Verifica si el usuario ya selecciono una compania."""
return self.company_id is not None and self.company_id > 0
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash. Se mantiene para compatibilidad."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password. Se mantiene para compatibilidad."""
return pwd_context.hash(password)
def validate_simappe_jwt(token: str) -> Optional[SimappeUserSession]:
"""
Valida un JWT emitido por SimappeOAuth2Server y extrae el UserSession.
Flujo:
1. Decodifica el token con la clave de Simappe
2. Verifica expiracion
3. Extrae claims y construye SimappeUserSession
4. Valida campos obligatorios
Returns:
SimappeUserSession si el token es valido, None si no.
"""
try:
payload = jwt.decode(
token,
settings.simappe_jwt_secret,
algorithms=[settings.simappe_jwt_algorithm]
)
# Extraer claims del JWT Simappe
user_session = SimappeUserSession(
id=payload.get("id", 0),
username=payload.get("username", ""),
first_name=payload.get("firstName", ""),
last_name=payload.get("lastName", ""),
email=payload.get("email", ""),
customer_id=payload.get("customerId", 0),
company_id=payload.get("companyId"),
subsidiary_id=payload.get("subsidiaryId"),
environment=payload.get("environment", "dev"),
connection_context=payload.get("connectionContext", "BUSINESS"),
roles=_extract_roles(payload.get("roles", [])),
)
# Validar campos obligatorios
if not user_session.id or not user_session.username:
logger.warning(f"JWT valido pero sin campos obligatorios: id={user_session.id}")
return None
if not user_session.customer_id:
logger.warning(f"JWT sin customerId para usuario {user_session.username}")
return None
return user_session
except JWTError as e:
logger.debug(f"Error al validar JWT Simappe: {e}")
return None
def validate_legacy_jwt(token: str) -> Optional[dict]:
"""
Valida JWT legacy de DataVault durante periodo de gracia.
TEMPORAL — eliminar despues de 2 semanas de migracion.
"""
if not settings.legacy_jwt_enabled:
return None
try:
payload = jwt.decode(
token,
settings.legacy_secret_key,
algorithms=[settings.simappe_jwt_algorithm]
)
if payload.get("type") != "access":
return None
return payload
except JWTError:
return None
def _extract_roles(roles_data) -> list:
"""
Extrae nombres de roles del payload JWT.
Simappe envía roles como lista de objetos: [{"id": 1, "name": "ADMIN"}, ...]
o como lista de strings: ["ADMIN", "USER"]
"""
if not roles_data:
return []
result = []
for role in roles_data:
if isinstance(role, dict):
role_name = role.get("name", role.get("authority", ""))
if role_name:
result.append(role_name)
elif isinstance(role, str):
result.append(role)
return result
Validaciones implementadas:
| # | Validacion | Que pasa si falla | Codigo HTTP |
|---|---|---|---|
| V1 | Token decodificable con clave Simappe | Intenta legacy JWT. Si ambos fallan → 401 | 401 |
| V2 | Campo id presente y no-zero |
Token invalido → 401 | 401 |
| V3 | Campo username presente |
Token invalido → 401 | 401 |
| V4 | Campo customerId presente |
Sin tenant → 401 | 401 |
| V5 | Campo companyId presente |
Sin compania seleccionada → 403 | 403 |
utils/security.py — Reescritura con Claims SimappeArchivo: backend/app/utils/security.py
Accion: Reescribir para usar SimappeUserSession en lugar de buscar usuario en BD local.
# ============================================================
# NUEVO utils/security.py — Security con JWT Simappe
# ============================================================
from typing import Optional
from fastapi import Depends, Header, HTTPException, Query, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.utils.auth import (
validate_simappe_jwt,
validate_legacy_jwt,
SimappeUserSession,
)
from app.config import settings
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer()
async def get_current_user_session(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> SimappeUserSession:
"""
Obtiene la sesion del usuario desde el JWT de Simappe.
Flujo:
1. Intenta validar como JWT Simappe
2. Si falla y legacy esta habilitado, intenta JWT legacy
3. Si ambos fallan, lanza 401
Returns:
SimappeUserSession con todos los claims del token.
"""
token = credentials.credentials
# Intentar JWT Simappe primero
user_session = validate_simappe_jwt(token)
if user_session:
return user_session
# Fallback a JWT legacy durante periodo de gracia
if settings.legacy_jwt_enabled:
legacy_payload = validate_legacy_jwt(token)
if legacy_payload:
logger.warning(
f"Usuario {legacy_payload.get('sub')} usando JWT legacy. "
"Debe migrar a SimappeOAuth2."
)
# Construir SimappeUserSession minimo desde JWT legacy
return SimappeUserSession(
id=0, # No disponible en JWT legacy
username=legacy_payload.get("sub", ""),
email=legacy_payload.get("email", ""),
customer_id=0, # No disponible
environment="dev",
roles=["LEGACY_USER"],
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalido o expirado",
headers={"WWW-Authenticate": "Bearer"},
)
async def require_company_selected(
user_session: SimappeUserSession = Depends(get_current_user_session),
) -> SimappeUserSession:
"""
Verifica que el usuario ya haya seleccionado una compania.
Necesario para resolver la BD del tenant.
"""
if not user_session.has_company:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Debe seleccionar una compania antes de acceder a este recurso. "
"Use el endpoint de seleccion de compania en SimappeClient.",
)
return user_session
async def get_current_user_session_optional(
authorization: Optional[str] = Header(None, alias="Authorization"),
token: Optional[str] = Query(None),
) -> SimappeUserSession:
"""
Obtiene sesion del usuario desde Authorization header o query param.
Util para endpoints de descarga de archivos.
"""
token_value: Optional[str] = None
if authorization:
parts = authorization.split()
if len(parts) == 2 and parts[0].lower() == "bearer":
token_value = parts[1]
if not token_value and token:
token_value = token
if not token_value:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No autenticado",
headers={"WWW-Authenticate": "Bearer"},
)
user_session = validate_simappe_jwt(token_value)
if user_session:
return user_session
if settings.legacy_jwt_enabled:
legacy = validate_legacy_jwt(token_value)
if legacy:
return SimappeUserSession(
id=0,
username=legacy.get("sub", ""),
customer_id=0,
environment="dev",
roles=["LEGACY_USER"],
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalido",
headers={"WWW-Authenticate": "Bearer"},
)
Cambio critico en dependencias de los routers:
# ============================================================
# ANTES — en cada router
# ============================================================
from app.utils.security import get_current_active_user
@router.get("/files")
async def list_files(
current_user: User = Depends(get_current_active_user), # ← Busca en BD local
db: AsyncSession = Depends(get_db)
):
...
# ============================================================
# DESPUES — en cada router
# ============================================================
from app.utils.security import require_company_selected, SimappeUserSession
@router.get("/files")
async def list_files(
user_session: SimappeUserSession = Depends(require_company_selected), # ← Claims JWT
db: AsyncSession = Depends(get_tenant_db) # ← BD del tenant (ver documento 02)
):
...
api/auth.py — Reduccion de EndpointsArchivo: backend/app/api/auth.py (447 lineas → ~80 lineas)
Accion: Eliminar endpoints de login/register/refresh/logout locales. Mantener solo /me y /profile.
Endpoints a ELIMINAR:
| Endpoint | Lineas | Razon de eliminacion |
|---|---|---|
POST /register |
22-42 | El registro ocurre en SimappeAdmin |
POST /login |
44-86 | El login ocurre en SimappeOAuth2 |
POST /refresh |
88-101 | El refresh ocurre en SimappeOAuth2 |
POST /logout |
103-111 | El logout se maneja en SimappeClient |
PUT /change-password |
184-204 | Se gestiona en SimappeAdmin |
POST /password-reset/request |
207-223 | Se gestiona en SimappeAdmin |
POST /password-reset/confirm |
226-251 | Se gestiona en SimappeAdmin |
Endpoints a MANTENER (adaptados):
| Endpoint | Lineas | Adaptacion |
|---|---|---|
GET /me |
113-132 | Retornar datos directamente del JWT claims, sin buscar en BD |
PUT /profile |
134-182 | Evaluar: ¿DataVault necesita perfil propio o se usa el de Simappe? |
# ============================================================
# NUEVO api/auth.py — Solo endpoints necesarios
# ============================================================
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from app.utils.security import get_current_user_session, SimappeUserSession
router = APIRouter()
@router.get("/me")
async def get_current_user_info(
user_session: SimappeUserSession = Depends(get_current_user_session),
):
"""
Retorna informacion del usuario actual desde los claims del JWT Simappe.
No requiere consulta a base de datos.
"""
return JSONResponse(content={
"id": user_session.id,
"username": user_session.username,
"email": user_session.email,
"full_name": user_session.full_name,
"customer_id": user_session.customer_id,
"company_id": user_session.company_id,
"subsidiary_id": user_session.subsidiary_id,
"environment": user_session.environment,
"roles": user_session.roles,
})
main.py — Actualizacion de CORS y HealthArchivo: backend/app/main.py
Lineas afectadas: 26-45 (CORS), 64-69 (health)
# ============================================================
# CAMBIO EN CORS — Agregar origenes de Simappe/Nebula
# ============================================================
app.add_middleware(
CORSMiddleware,
allow_origins=[
# Desarrollo local
"http://localhost",
"http://localhost:80",
"http://localhost:3000",
"http://localhost:4200", # ← NUEVO: Angular dev server
"http://localhost:4201", # ← NUEVO: Angular secondary
# Gateway
"https://gateway.nebula.local", # ← NUEVO: Gateway interno
# Produccion
"https://datavaultweb.com",
"https://www.datavaultweb.com",
"https://nebula.centricasoluciones.com", # ← NUEVO: Dominio Nebula
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================
# CAMBIO EN HEALTH — Compatible con patron Simappe/Actuator
# ============================================================
@app.get("/actuator/health")
@app.get("/health")
async def health_check():
"""Health check compatible con Simappe/Eureka."""
return {
"status": "UP",
"components": {
"datavault": {"status": "UP"},
"environment": settings.environment,
}
}
| # | Riesgo | Control | Verificacion |
|---|---|---|---|
| R1 | Clave JWT de Simappe incorrecta → todos los requests fallan | Verificar decodificacion al arrancar el servicio | Test al levantar: decodificar token de prueba |
| R2 | Claims del JWT con formato diferente al esperado | Parseo defensivo con defaults | Tests unitarios con payloads reales de Simappe |
| R3 | Periodo de gracia permite bypass de seguridad | legacy_jwt_enabled = False despues de 2 semanas |
Feature flag en config, alarma calendario |
| R4 | Token expirado aceptado | python-jose valida exp automaticamente |
Test con token de 1 segundo de vida |
| R5 | Usuario sin companyId accede a datos |
require_company_selected guard en todos los endpoints de datos |
Test 403 sin companyId |
simappe_jwt_secret del equipo de Simappeconfig.pyutils/auth.py con SimappeUserSessionutils/security.py con dependencias Simappeapi/auth.py (eliminar login/register/refresh/reset)main.py (CORS + /actuator/health)validate_simappe_jwt()legacy_jwt_enabled tras 2 semanas| Version | Fecha | Autor | Descripcion |
|---|---|---|---|
| 1.0.0 | 2026-03-25 | Carlos Torres | Creacion del documento de migracion de autenticacion |