Cliente: Centrica Soluciones
Proyecto: Nebula ERP - Componente DataVault
Version: 1.0
Fecha: 16 de marzo, 2026
Autor: Carlos Torres (Arquitecto de Software)
Estado: VIGENTE
El backend de DataVault sigue una arquitectura de servicios desacoplados, donde cada servicio tiene una responsabilidad especifica dentro del ciclo OAIS.
app/
services/
ingest_service.py # Core: Procesamiento SIP -> AIP -> DIP
project_service.py # CRUD de proyectos
preservation_service.py # Estado de preservacion (semaforo)
integrity_checker.py # Verificacion checksums SHA-256/MD5
oais_status_service.py # Estado global OAIS (JSON)
consulta_service.py # Ciclo de vida de consultas DIP
cloud_storage.py # Abstraccion de almacenamiento cloud
company_service.py # Gestion de companias
utils.py # Utilidades generales
clients/
azure_client.py # Cliente Azure Blob Storage
sftp_client.py # Cliente SFTP (Paramiko)
archivematica_client.py # Cliente Archivematica (opcional)
Archivo: app/services/ingest_service.py (~662 lineas)
Responsabilidad: Procesamiento completo del pipeline OAIS
class IngestService:
def __init__(self, db_session, settings):
self.db = db_session
self.settings = settings
def process(self, input_path, user_id, user_fullname, upload_id, project_id):
"""
Pipeline principal:
1. Detecta proyecto
2. Genera ID unico
3. Verifica duplicados
4. Crea SIP con metadata
5. Genera AIP (ZIP)
6. Genera DIP (copia)
7. Actualiza BD
8. Actualiza estado OAIS
"""
| Metodo | Lineas aprox. | Responsabilidad |
|---|---|---|
_generate_oais_id() |
~30 | Genera ID normalizado unico |
_check_duplicates() |
~25 | Verifica filesystem + BD |
_create_sip_structure() |
~80 | Crea directorios y copia archivos |
_generate_descriptive_metadata() |
~40 | Dublin Core JSON |
_generate_technical_metadata() |
~50 | Hashes, tamanos, MIME types |
_generate_preservation_metadata() |
~45 | Politica retencion, checksums |
_create_manifest_xml() |
~35 | Manifiesto METS en XML |
_create_aip() |
~40 | Compresion ZIP del SIP |
_create_dip() |
~30 | Copia del AIP para distribucion |
_update_db_record() |
~25 | Actualiza tabla ingest |
Complejidad ciclomatica: Alta. El metodo process() maneja demasiadas responsabilidades en un solo flujo. Deberia descomponerse en un patron de pipeline o chain of responsibility.
Acoplamiento a filesystem: Toda la logica depende de rutas absolutas en disco. No hay abstraccion de storage que permita migrar a S3-compatible o cualquier otro backend.
Sin retry logic: Si la compresion ZIP falla, no hay reintentos. El error se registra y se continua. Para archivos grandes esto puede ser problematico.
Sesion de BD: La sesion de base de datos se pasa por constructor, pero no hay transaccionalidad explicita en el pipeline completo. Un fallo a mitad de proceso puede dejar estado inconsistente.
Archivo: app/services/project_service.py
Responsabilidad: CRUD de proyectos OAIS
class ProjectService:
def get_all_projects(self) -> List[Project]:
"""Lista todos los proyectos (activos e inactivos)"""
def get_active_projects(self) -> List[Project]:
"""Solo proyectos con is_active=True"""
def get_project_by_name(self, name: str) -> Optional[Project]:
"""Busca proyecto por nombre exacto"""
def create_project(self, name, description, folder_path) -> Project:
"""Crea proyecto y genera estructura de directorios"""
def update_project(self, project_id, **kwargs) -> Project:
"""Actualiza campos del proyecto"""
def delete_project(self, project_id) -> bool:
"""Elimina proyecto (hard delete)"""
SIP/, AIP/, DIP/ en disco. Si la operacion de BD falla despues, quedan directorios huerfanos.Archivo: app/services/preservation_service.py (~364 lineas)
Responsabilidad: Verificacion y reporte del estado de preservacion
El servicio clasifica cada archivo en cuatro niveles:
| Estado | Color | Significado | Criterio |
|---|---|---|---|
| OPTIMAL | GREEN | Intacto y verificado | Archivo existe + checksum valido + metadata completa + retencion activa |
| ATTENTION | YELLOW | Requiere revision | Metadata incompleta o verificacion antigua (>30 dias) |
| CRITICAL | RED | Accion inmediata | Archivo faltante o checksum invalido o retencion expirada |
| UNVERIFIED | GRAY | Sin verificar | Nunca se ha ejecutado verificacion |
class PreservationService:
def verify_file_preservation(self, file_path) -> dict:
"""Ejecuta verificacion completa de un archivo"""
# 1. Verifica existencia del archivo
# 2. Valida checksum contra almacenado
# 3. Verifica completitud de metadata
# 4. Evalua politica de retencion
# 5. Retorna estado con semaforo
def get_critical_files(self) -> List[dict]:
"""Retorna archivos en estado RED"""
def get_attention_files(self) -> List[dict]:
"""Retorna archivos en estado YELLOW"""
def update_file_preservation_status(self, file_id, status):
"""Actualiza preservation_status.json"""
Fortaleza: El sistema de semaforo es un enfoque pragmatico y facil de comunicar a stakeholders no tecnicos.
Debilidad: El estado se persiste en JSON en disco (preservation_status.json), no en base de datos. Con miles de archivos, la lectura/escritura del JSON completo en cada operacion se vuelve un cuello de botella.
Riesgo: No hay mecanismo de notificacion proactiva cuando un archivo cambia a RED. La deteccion depende de que alguien consulte el estado.
Archivo: app/services/integrity_checker.py (~306 lineas)
Responsabilidad: Generacion y verificacion de checksums
El checker soporta tres fuentes de checksums, en orden de prioridad:
.checksum.json (preferido): Archivo dedicado junto al archivo verificadomanifest.txt: Manifiesto de checksums para AIPsmetadata.json: Fallback en metadata del paqueteclass IntegrityChecker:
def calculate_checksum(self, file_path, algorithm='sha256') -> str:
"""Calcula checksum de un archivo individual"""
def calculate_zip_checksum(self, zip_path) -> dict:
"""Calcula checksums de todos los archivos dentro de un ZIP"""
def verify_file_integrity(self, file_path) -> dict:
"""Compara checksum actual vs almacenado"""
# Retorna: {valid: bool, expected: str, actual: str, algorithm: str}
def verify_zip_integrity(self, zip_path) -> dict:
"""Verifica integridad de todos los miembros del ZIP"""
def store_checksum(self, file_path, checksum, algorithm) -> bool:
"""Almacena checksum en .checksum.json"""
def scan_directory(self, directory_path) -> List[dict]:
"""Verificacion masiva de un directorio completo"""
Fortaleza: El soporte de multiples fuentes de checksums es robusto y resiliente ante migraciones de formato.
Debilidad: La lectura de archivos completos para calcular checksums es bloqueante. Para archivos de 10GB+, esto congela el daemon durante minutos. Se recomienda lectura por chunks con yield para progreso incremental.
Riesgo: No hay validacion del algoritmo de checksum al leer desde las fuentes. Si un .checksum.json contiene un hash MD5 pero se espera SHA-256, la verificacion fallara silenciosamente.
Archivo: app/services/consulta_service.py
Responsabilidad: Gestion del ciclo de vida de solicitudes de acceso a DIPs
# Configuracion SMTP
EMAIL_HOST = "smtp.gmail.com"
EMAIL_PORT = 587
EMAIL_USER = "micorreo@gmail.com"
EMAIL_PASS = "tu_app_password_de_16_caracteres"
EMAIL_TO = "equipo-bogota@dominio.com"
Eventos que disparan email:
Problema de seguridad: Las credenciales SMTP estan en .env como texto plano. Para produccion, se requiere integracion con un vault de secretos.
Sin autenticacion de solicitante: El user_id se acepta como string sin validacion contra un directorio de usuarios. Cualquiera puede crear consultas.
Expiracion manual: expire_old_consultas() debe ser invocado explicitamente (no hay cron job configurado). Las consultas pueden permanecer en estado respondido indefinidamente si nadie ejecuta la limpieza.
Archivo: app/services/oais_status_service.py
Responsabilidad: Mantiene archivo JSON con el estado global del sistema
class OAISStatusService:
def update_project_status(self, project_name, file_id):
"""Agrega archivo al inventario del proyecto"""
def add_activity(self, action, file_id, project_name):
"""Registra actividad reciente (max 100 entradas)"""
def update_statistics(self, sips=0, aips=0, dips=0, errors=0):
"""Incrementa contadores globales"""
def cleanup_old_entries(self, max_age_days=30):
"""Elimina registros de actividad mayores a 30 dias"""
Debilidad fundamental: Este servicio mantiene estado critico en un archivo JSON plano. Es vulnerable a:
Recomendacion: Migrar completamente a PostgreSQL. El JSON en disco deberia ser un cache efimero, no la fuente de verdad.
class AzureClient:
def __init__(self, connection_string, container_name):
self.blob_service = BlobServiceClient(...)
self.container = container_name
def upload_blob(self, blob_name, local_file_path) -> bool:
"""Upload con retry implicito del SDK de Azure"""
class SFTPClient:
def __init__(self, host, port, username, password):
self.transport = paramiko.Transport((host, port))
def test_connection(self) -> bool:
"""Verifica conectividad"""
def upload(self, local_path, remote_subdir) -> bool:
"""Upload con barra de progreso (tqdm)"""
Cliente para integracion opcional con Archivematica (sistema OAIS externo). Actualmente no se usa en produccion.
| # | Servicio | Hallazgo | Prioridad |
|---|---|---|---|
| 1 | IngestService | Sin transaccionalidad en pipeline | P1 |
| 2 | IngestService | Complejidad ciclomatica alta en process() | P2 |
| 3 | PreservationService | Estado en JSON, no en BD | P1 |
| 4 | ConsultaService | Credenciales SMTP en texto plano | P1 |
| 5 | ConsultaService | Sin autenticacion de solicitante | P1 |
| 6 | IntegrityChecker | Calculo bloqueante para archivos grandes | P2 |
| 7 | OAISStatusService | Estado critico en JSON plano | P1 |
| 8 | ProjectService | Hard delete sin soft-delete | P3 |
| 9 | Todos | Sin tests unitarios ni de integracion | P1 |
| 10 | Todos | Sin multi-tenancy | P1 |
Departamento de Arquitectura — Centrica Soluciones