Cliente: Centrica Soluciones
Proyecto: Nebula ERP - Componente DataVault
Version: 1.0
Fecha: 16 de marzo, 2026
Autor: Carlos Torres (Arquitecto de Software)
Estado: VIGENTE
DataVault opera mediante daemons (procesos de larga duracion) que ejecutan tareas automaticas sin intervencion humana.
| Daemon | Archivo | Funcion | Frecuencia |
|---|---|---|---|
| Watcher | watcher_datavault.py |
Detecta y procesa nuevos SIPs | Cada 5s (configurable) |
| Cloud Sync DIP | cloud_sync_daemon.py |
Sincroniza DIPs a Azure | Cada 5 min |
| Cloud Sync SIP | cloud_sync_daemon_sip.py |
Sincroniza SIPs a Azure | Cada 1 min |
| Cloud Sync AIP | cloud_sync_daemon_aip.py |
Sincroniza AIPs a Azure | Cada 5 min |
| Preservation Verifier | preservation_verifier.py |
Verificacion periodica de integridad | Manual/Cron |
El watcher es el corazon operativo de DataVault. Monitorea continuamente el directorio de ingesta y dispara el pipeline OAIS cuando detecta nuevos paquetes.
Archivo: app/watcher_datavault.py (~500+ lineas)
Directorio vigilado: $DATAVAULT_REPO_ROOT/ingest_sip/ (default: /home/egs/datavault/Repositorio/ingest_sip/)
INICIO
|
v
[Cargar configuracion desde .env]
|
v
[Conectar a base de datos]
|
v
+---> [Escanear ingest_sip/]
| |
| v
| [Para cada carpeta encontrada:]
| |
| +-- Es subdirectorio prohibido (AIP, DIP, SIP, metadata)?
| | |
| | Si -> Eliminar y continuar
| | No -> Continuar
| |
| +-- La carpeta es estable? (tamano/archivos no cambian en 5s)
| | |
| | No -> Saltar (siguiente ciclo)
| | Si -> Continuar
| |
| +-- Detectar proyecto por prefijo de nombre
| |
| +-- Invocar IngestService.process()
| | |
| | Exito -> Mover a _procesados/
| | Error -> Mover a _errores/ (con backup)
| |
| +-- Actualizar oais_status.json
| |
| +-- Log de resultado
| |
| v
| [Siguiente carpeta]
| |
| v
+--- [Esperar WATCH_INTERVAL_SECONDS]
El watcher implementa reconexion automatica ante fallos de BD:
# Pseudocodigo del manejo de sesion
try:
session = get_session()
procesar_carpeta(session)
session.commit()
except SQLAlchemyError:
session.rollback()
session = recrear_sesion() # Reconexion automatica
# Salida dual: archivo + stderr
logging.basicConfig(
filename=os.environ.get('DATAVAULT_WATCHER_LOG', '/home/egs/datavault/watcher_err.log'),
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Tambien escribe a stderr para captura por systemd
| Variable | Default | Descripcion |
|---|---|---|
WATCH_INTERVAL_SECONDS |
5 | Intervalo de escaneo |
DATAVAULT_REPO_ROOT |
/home/egs/datavault/Repositorio |
Raiz del repositorio |
DATAVAULT_WATCHER_LOG |
/home/egs/datavault/watcher_err.log |
Archivo de log |
DATABASE_URL |
SQLite local | Conexion a BD |
| Aspecto | Hallazgo | Impacto |
|---|---|---|
| Mecanismo de deteccion | Polling (os.listdir + os.path.getsize) cada N segundos | CPU desperdiciado. En Linux usar inotify via watchdog |
| Procesamiento secuencial | Un SIP a la vez. Sin paralelismo | Cuello de botella con volumen alto |
| Sin PID file | No hay mecanismo de lock. Multiples instancias pueden ejecutarse simultaneamente | Procesamiento duplicado de SIPs |
| Sin health endpoint | No expone estado de salud | No integrable con Prometheus/monitoring |
| Limpieza de subdirectorios | Elimina AIP/DIP/SIP/metadata dentro de ingest_sip/ | Proteccion correcta pero podria eliminar archivos validos si el usuario nombra una carpeta "SIP_lote_01" |
| Reconexion de BD | Implementada | Buena practica para daemons de larga duracion |
# /etc/systemd/system/datavault-watcher.service
[Unit]
Description=DataVault OAIS Watcher Daemon
After=network.target postgresql.service
[Service]
Type=simple
User=egs
Group=egs
WorkingDirectory=/home/egs/datavault
Environment=DATAVAULT_REPO_ROOT=/home/egs/datavault/Repositorio
Environment=WATCH_INTERVAL_SECONDS=5
ExecStart=/usr/bin/python3 datavault/app/watcher_datavault.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
cloud_sync_daemon.py)Funcion: Sincroniza archivos DIP (ZIPs) a Azure Blob Storage.
Ciclo:
Cada 5 minutos:
1. Escanear directorio DIP de cada proyecto
2. Para cada ZIP encontrado:
a. Calcular SHA-256
b. Buscar en cloud_sync.db si ya fue sincronizado
c. Si hash cambio o es nuevo -> upload a Azure
d. Actualizar cloud_sync.db
3. Log de resumen
cloud_sync_daemon_sip.py)Funcion: Sincroniza carpetas SIP completas a Azure.
Diferencias con DIP daemon:
cloud_sync_daemon_aip.py)Funcion: Sincroniza archivos AIP (ZIPs) a Azure.
Identico al DIP daemon excepto por el directorio fuente.
Todos los daemons comparten (o tienen instancias separadas de) una base SQLite:
logs/cloud_sync.db
logs/cloud_sync_aip.log
logs/cloud_sync.log
| Problema | Severidad | Detalle |
|---|---|---|
| Tres daemons para la misma tarea | MEDIA | Deberia ser un unico daemon parametrizable por tipo (SIP/AIP/DIP) |
| SQLite compartido | ALTA | Si multiples daemons escriben a cloud_sync.db simultaneamente, riesgo de "database locked" |
| Sin backpressure | MEDIA | Si hay 10,000 archivos pendientes, el daemon intenta procesar todos en un ciclo |
| Sin metricas | MEDIA | No hay contadores de bytes transferidos, latencia, o tasa de error |
| Sin dead letter queue | MEDIA | Archivos que fallan repetidamente siguen intentandose indefinidamente |
# Propuesta: cloud_sync_unified.py
class CloudSyncDaemon:
def __init__(self, sync_type: str, interval_seconds: int):
"""
sync_type: 'SIP' | 'AIP' | 'DIP'
interval_seconds: frecuencia de escaneo
"""
def run(self):
"""Loop principal unificado"""
# Ejecucion via argumentos
# python cloud_sync_unified.py --type SIP --interval 60
# python cloud_sync_unified.py --type AIP --interval 300
# python cloud_sync_unified.py --type DIP --interval 300
Archivo: preservation_verifier.py
Ejecucion: Manual o via cron job
Recorre todos los paquetes AIP del repositorio y verifica su integridad mediante checksums.
1. Listar todos los proyectos activos
2. Para cada proyecto:
a. Listar AIPs en {PROJECT}/AIP/
b. Para cada AIP:
- Leer checksum almacenado
- Calcular checksum actual
- Comparar
- Registrar resultado en preservation_status.json
3. Generar reporte de estado
Recomendacion cron:
# Verificacion diaria a las 2AM
0 2 * * * /usr/bin/python3 /home/egs/datavault/datavault/preservation_verifier.py >> /var/log/datavault/preservation.log 2>&1
| Script | Funcion |
|---|---|
check_project.py |
Inspecciona estado de un proyecto especifico |
check_projects.py |
Lista todos los proyectos y su estado |
check_oais_status.py |
Muestra oais_status.json formateado |
check_server_status.py |
Verifica estado general del servidor |
init_db.py |
Inicializa esquema de base de datos |
monitor_cloud_sync.py |
Muestra estado de sincronizacion cloud |
test_imports.py |
Verifica que las dependencias estan instaladas |
Observacion: Estos scripts deberian consolidarse en un CLI unificado:
# Propuesta con Click/Typer
datavault project list
datavault project check DIGITALIZACION
datavault oais status
datavault server health
datavault db init
datavault cloud status
Ubuntu Server (192.168.0.16)
|
+-- PostgreSQL (puerto 5432)
| datavault (base de datos)
|
+-- Python Processes:
| |
| +-- watcher_datavault.py [PID: ???] (systemd service)
| +-- cloud_sync_daemon_sip.py [PID: ???] (systemd service)
| +-- cloud_sync_daemon_aip.py [PID: ???] (systemd service)
| +-- cloud_sync_daemon.py [PID: ???] (systemd service)
| +-- FastAPI (uvicorn) [PID: ???] (puerto 8000)
|
+-- Filesystem:
/home/egs/datavault/
Repositorio/
ingest_sip/ (WATCH_DIR)
_procesados/
_errores/
DEFAULT/
DIGITALIZACION/
TRASFERENCIA_2022/
logs/
cloud_sync.db
cloud_sync.log
cloud_sync_aip.log
| # | Daemon | Hallazgo | Prioridad | Esfuerzo |
|---|---|---|---|---|
| 1 | Watcher | Polling en lugar de inotify | P2 | 2 dias |
| 2 | Watcher | Sin PID file / lock | P1 | 0.5 dias |
| 3 | Watcher | Sin health endpoint | P2 | 1 dia |
| 4 | Cloud Sync (todos) | Tres daemons duplicados | P3 | 3 dias |
| 5 | Cloud Sync (todos) | SQLite concurrente | P2 | 1 dia |
| 6 | Preservation Verifier | Sin ejecucion automatica | P1 | 0.5 dias |
| 7 | Preservation Verifier | Sin notificaciones | P2 | 1 dia |
| 8 | Scripts utilitarios | Sin CLI unificado | P3 | 2 dias |
| 9 | Todos | Sin systemd services configurados | P1 | 1 dia |
| 10 | Todos | Sin monitoring (Prometheus) | P2 | 2 dias |
Departamento de Arquitectura — Centrica Soluciones