Saltar al contenido principal

Comunicación Entre Microservicios

Arquitectura de Comunicación

IRIS utiliza una arquitectura de microservicios con comunicación síncrona orquestada por el API Gateway. Cada microservicio expone APIs REST independientes y el gateway coordina las llamadas secuenciales del pipeline de 6 fases.

Patrón de Orquestación

API Gateway como Orquestador Central

El API Gateway actúa como el director de orquesta, coordinando la ejecución secuencial de las fases del pipeline.

Configuración de Servicios

import os
import httpx
import asyncio
from typing import Dict, Any, Optional
from fastapi import HTTPException
import logging

# Configuración de servicios
SERVICES = {
"image_processor": {
"url": os.getenv("IMAGE_PROCESSOR_URL", "http://localhost:8001"),
"timeout": 30.0,
"retries": 3
},
"ml_embeddings": {
"url": os.getenv("ML_EMBEDDINGS_URL", "http://localhost:8002"),
"timeout": 45.0,
"retries": 2
},
"ml_classifier": {
"url": os.getenv("ML_CLASSIFIER_URL", "http://localhost:8003"),
"timeout": 30.0,
"retries": 3
},
"ocr_extractor": {
"url": os.getenv("OCR_EXTRACTOR_URL", "http://localhost:8004"),
"timeout": 60.0,
"retries": 2
}
}

class ServiceCommunicationManager:
"""
Gestiona la comunicación entre microservicios con manejo de errores,
timeouts, reintentos y circuit breaker patterns
"""

def __init__(self):
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
self.circuit_breakers = {}
self.service_stats = {service: {"requests": 0, "failures": 0, "avg_response_time": 0.0}
for service in SERVICES.keys()}

# Configurar circuit breakers
for service_name in SERVICES.keys():
self.circuit_breakers[service_name] = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=httpx.RequestError
)

async def call_service(self, service_name: str, endpoint: str,
method: str = "POST", **kwargs) -> Dict[str, Any]:
"""
Realiza llamada a un microservicio con manejo completo de errores

Args:
service_name: Nombre del servicio de destino
endpoint: Endpoint específico del servicio
method: Método HTTP (GET, POST, etc.)
**kwargs: Argumentos adicionales para la request

Returns:
response_data: Respuesta del servicio
"""
if service_name not in SERVICES:
raise HTTPException(
status_code=400,
detail=f"Unknown service: {service_name}"
)

service_config = SERVICES[service_name]
url = f"{service_config['url']}{endpoint}"

# Aplicar circuit breaker
@self.circuit_breakers[service_name]
async def make_request():
start_time = time.time()

try:
# Configurar timeout específico del servicio
timeout = httpx.Timeout(service_config["timeout"])

# Realizar la request
async with httpx.AsyncClient(timeout=timeout) as client:
if method.upper() == "GET":
response = await client.get(url, **kwargs)
elif method.upper() == "POST":
response = await client.post(url, **kwargs)
elif method.upper() == "PUT":
response = await client.put(url, **kwargs)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

response.raise_for_status()

# Actualizar estadísticas
response_time = time.time() - start_time
self._update_service_stats(service_name, True, response_time)

return response.json()

except httpx.HTTPStatusError as e:
self._update_service_stats(service_name, False, time.time() - start_time)
logging.error(f"HTTP error calling {service_name}: {e.response.status_code} - {e.response.text}")
raise HTTPException(
status_code=e.response.status_code,
detail=f"Service {service_name} returned error: {e.response.text}"
)
except httpx.RequestError as e:
self._update_service_stats(service_name, False, time.time() - start_time)
logging.error(f"Request error calling {service_name}: {str(e)}")
raise HTTPException(
status_code=503,
detail=f"Service {service_name} is unavailable"
)

# Ejecutar con reintentos
max_retries = service_config["retries"]
for attempt in range(max_retries + 1):
try:
return await make_request()
except HTTPException as e:
if attempt == max_retries:
raise e

# Esperar antes del siguiente intento (exponential backoff)
wait_time = (2 ** attempt) * 1.0
logging.warning(f"Retry {attempt + 1}/{max_retries} for {service_name} in {wait_time}s")
await asyncio.sleep(wait_time)

def _update_service_stats(self, service_name: str, success: bool, response_time: float):
"""Actualiza estadísticas del servicio"""
stats = self.service_stats[service_name]
stats["requests"] += 1

if not success:
stats["failures"] += 1

# Calcular promedio móvil del tiempo de respuesta
if stats["requests"] == 1:
stats["avg_response_time"] = response_time
else:
stats["avg_response_time"] = (stats["avg_response_time"] * 0.9) + (response_time * 0.1)

async def health_check_all_services(self) -> Dict[str, Dict[str, Any]]:
"""
Verifica el estado de salud de todos los servicios
"""
health_results = {}

async def check_service_health(service_name: str, service_config: Dict):
try:
start_time = time.time()
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
response = await client.get(f"{service_config['url']}/health")
response_time = time.time() - start_time

health_results[service_name] = {
"status": "healthy" if response.status_code == 200 else "unhealthy",
"response_time": response_time,
"details": response.json() if response.status_code == 200 else None,
"error": None
}
except Exception as e:
health_results[service_name] = {
"status": "unavailable",
"response_time": None,
"details": None,
"error": str(e)
}

# Verificar todos los servicios en paralelo
tasks = [
check_service_health(name, config)
for name, config in SERVICES.items()
]
await asyncio.gather(*tasks)

return health_results

def get_service_statistics(self) -> Dict[str, Any]:
"""
Obtiene estadísticas de comunicación con servicios
"""
return {
"service_stats": self.service_stats,
"circuit_breaker_states": {
name: {
"state": breaker.current_state,
"failure_count": breaker.failure_count,
"last_failure_time": breaker.last_failure_time
}
for name, breaker in self.circuit_breakers.items()
}
}

class CircuitBreaker:
"""
Implementación del patrón Circuit Breaker para servicios
"""

def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60,
expected_exception: type = Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception

self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN

async def __call__(self, func):
"""Decorator que aplica circuit breaker a una función"""
if self.state == "OPEN":
if self._should_attempt_reset():
self.state = "HALF_OPEN"
else:
raise HTTPException(
status_code=503,
detail="Service temporarily unavailable (circuit breaker open)"
)

try:
result = await func()
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e

def _should_attempt_reset(self) -> bool:
"""Determina si debe intentar resetear el circuit breaker"""
return (self.last_failure_time and
time.time() - self.last_failure_time >= self.recovery_timeout)

def _on_success(self):
"""Maneja éxito de llamada"""
self.failure_count = 0
self.state = "CLOSED"

def _on_failure(self):
"""Maneja fallo de llamada"""
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = "OPEN"

@property
def current_state(self) -> str:
return self.state

Pipeline de Comunicación Secuencial

Orquestación del Pipeline Completo

import time
import uuid
from fastapi import UploadFile
from typing import Dict, Any, Optional

class PipelineOrchestrator:
"""
Orquesta la ejecución completa del pipeline de 6 fases
"""

def __init__(self):
self.comm_manager = ServiceCommunicationManager()
self.active_pipelines = {}

async def execute_complete_pipeline(self, image_file: UploadFile,
pipeline_config: Optional[Dict] = None) -> Dict[str, Any]:
"""
Ejecuta el pipeline completo de 6 fases

Args:
image_file: Archivo de imagen a procesar
pipeline_config: Configuración opcional del pipeline

Returns:
pipeline_result: Resultado completo del pipeline
"""
pipeline_id = str(uuid.uuid4())
start_time = time.time()

# Registrar pipeline activo
self.active_pipelines[pipeline_id] = {
"start_time": start_time,
"current_phase": 0,
"status": "running"
}

try:
# Preparar archivo para envío
file_content = await image_file.read()
files = {"file": (image_file.filename, file_content, image_file.content_type)}

pipeline_result = {
"pipeline_id": pipeline_id,
"phases": {},
"total_processing_time": 0,
"success": False
}

# FASE 1: Preprocesamiento de imagen
logging.info(f"[PIPELINE-{pipeline_id}] Iniciando Fase 1: Image Processing")
self.active_pipelines[pipeline_id]["current_phase"] = 1

phase1_result = await self.comm_manager.call_service(
"image_processor",
"/process",
method="POST",
files=files,
data={
"apply_unwarping": pipeline_config.get("apply_unwarping", True) if pipeline_config else True,
"enhance_quality": pipeline_config.get("enhance_quality", True) if pipeline_config else True
}
)

if not phase1_result.get("success"):
raise Exception("Phase 1 failed: Image processing")

pipeline_result["phases"]["phase_1"] = {
"status": "completed",
"processing_time": phase1_result.get("processing_time", 0),
"result": phase1_result
}

# Usar imagen procesada para fases siguientes
processed_image_data = phase1_result.get("processed_image_data")
if processed_image_data:
files = {"file": ("processed_image.jpg", processed_image_data, "image/jpeg")}

# FASE 2: Embeddings y Clustering
logging.info(f"[PIPELINE-{pipeline_id}] Iniciando Fase 2: Embeddings & Clustering")
self.active_pipelines[pipeline_id]["current_phase"] = 2

# Generar embeddings
embeddings_result = await self.comm_manager.call_service(
"ml_embeddings",
"/embed",
method="POST",
files=files
)

if not embeddings_result.get("success"):
raise Exception("Phase 2 failed: Embeddings generation")

# Realizar clustering (opcional, puede omitirse en pipeline individual)
cluster_result = None
if pipeline_config and pipeline_config.get("perform_clustering", False):
cluster_result = await self.comm_manager.call_service(
"ml_embeddings",
"/cluster",
method="POST",
json={
"embeddings": [embeddings_result["embedding"]],
"k": pipeline_config.get("cluster_k", 3)
}
)

pipeline_result["phases"]["phase_2"] = {
"status": "completed",
"processing_time": embeddings_result.get("processing_time", 0),
"result": {
"embeddings": embeddings_result,
"clustering": cluster_result
}
}

# FASE 3-4: Clasificación
logging.info(f"[PIPELINE-{pipeline_id}] Iniciando Fase 3-4: Classification")
self.active_pipelines[pipeline_id]["current_phase"] = 3

classification_result = await self.comm_manager.call_service(
"ml_classifier",
"/classify",
method="POST",
files=files
)

if not classification_result.get("success"):
raise Exception("Phase 3-4 failed: Classification")

pipeline_result["phases"]["phase_3_4"] = {
"status": "completed",
"processing_time": classification_result.get("processing_time", 0),
"result": classification_result
}

# Obtener tipo de documento clasificado
document_type = classification_result.get("predicted_class", "unknown")

# FASE 5-6: OCR y Extracción JSON
logging.info(f"[PIPELINE-{pipeline_id}] Iniciando Fase 5-6: OCR & JSON Extraction")
self.active_pipelines[pipeline_id]["current_phase"] = 5

ocr_result = await self.comm_manager.call_service(
"ocr_extractor",
"/extract_json",
method="POST",
files=files,
data={
"document_type": document_type,
"confidence_threshold": pipeline_config.get("ocr_confidence", 0.3) if pipeline_config else 0.3
}
)

if not ocr_result.get("success"):
raise Exception("Phase 5-6 failed: OCR extraction")

pipeline_result["phases"]["phase_5_6"] = {
"status": "completed",
"processing_time": ocr_result.get("processing_time", 0),
"result": ocr_result
}

# Compilar resultado final
total_time = time.time() - start_time
pipeline_result.update({
"success": True,
"total_processing_time": total_time,
"document_type": document_type,
"final_extraction": ocr_result.get("structured_document"),
"pipeline_summary": {
"phases_completed": 4,
"total_phases": 4,
"average_phase_time": total_time / 4,
"bottleneck_phase": self._identify_bottleneck_phase(pipeline_result["phases"])
}
})

# Actualizar estado del pipeline
self.active_pipelines[pipeline_id]["status"] = "completed"
self.active_pipelines[pipeline_id]["end_time"] = time.time()

return pipeline_result

except Exception as e:
# Manejar errores del pipeline
error_time = time.time()
pipeline_result = {
"pipeline_id": pipeline_id,
"success": False,
"error": str(e),
"failed_at_phase": self.active_pipelines[pipeline_id]["current_phase"],
"total_processing_time": error_time - start_time,
"phases": pipeline_result.get("phases", {})
}

self.active_pipelines[pipeline_id]["status"] = "failed"
self.active_pipelines[pipeline_id]["error"] = str(e)
self.active_pipelines[pipeline_id]["end_time"] = error_time

logging.error(f"[PIPELINE-{pipeline_id}] Failed at phase {self.active_pipelines[pipeline_id]['current_phase']}: {str(e)}")

return pipeline_result

def _identify_bottleneck_phase(self, phases: Dict[str, Dict]) -> str:
"""Identifica la fase que tomó más tiempo"""
max_time = 0
bottleneck_phase = None

for phase_name, phase_data in phases.items():
phase_time = phase_data.get("processing_time", 0)
if phase_time > max_time:
max_time = phase_time
bottleneck_phase = phase_name

return bottleneck_phase or "unknown"

async def get_pipeline_status(self, pipeline_id: str) -> Dict[str, Any]:
"""Obtiene el estado actual de un pipeline"""
if pipeline_id not in self.active_pipelines:
return {"error": "Pipeline not found"}

pipeline_info = self.active_pipelines[pipeline_id]
current_time = time.time()

return {
"pipeline_id": pipeline_id,
"status": pipeline_info["status"],
"current_phase": pipeline_info["current_phase"],
"elapsed_time": current_time - pipeline_info["start_time"],
"estimated_remaining_time": self._estimate_remaining_time(pipeline_info),
"error": pipeline_info.get("error")
}

def _estimate_remaining_time(self, pipeline_info: Dict) -> Optional[float]:
"""Estima el tiempo restante basado en fases completadas"""
if pipeline_info["status"] != "running":
return None

current_phase = pipeline_info["current_phase"]
elapsed_time = time.time() - pipeline_info["start_time"]

# Estimación simple basada en tiempo promedio por fase
avg_time_per_phase = 8.0 # segundos estimados por fase
remaining_phases = 4 - current_phase

return max(0, remaining_phases * avg_time_per_phase)

def get_active_pipelines_summary(self) -> Dict[str, Any]:
"""Obtiene resumen de todos los pipelines activos"""
current_time = time.time()

summary = {
"total_pipelines": len(self.active_pipelines),
"running": 0,
"completed": 0,
"failed": 0,
"average_processing_time": 0,
"active_pipelines": []
}

total_processing_time = 0
completed_count = 0

for pipeline_id, info in self.active_pipelines.items():
status = info["status"]
summary[status] += 1

if status in ["completed", "failed"] and "end_time" in info:
processing_time = info["end_time"] - info["start_time"]
total_processing_time += processing_time
completed_count += 1

if status == "running":
summary["active_pipelines"].append({
"pipeline_id": pipeline_id,
"current_phase": info["current_phase"],
"elapsed_time": current_time - info["start_time"]
})

if completed_count > 0:
summary["average_processing_time"] = total_processing_time / completed_count

return summary

Protocolos de Comunicación

Formato de Mensajes Estándar

Todos los servicios utilizan un formato de respuesta consistente:

from pydantic import BaseModel
from typing import Optional, Dict, Any
from datetime import datetime

class ServiceResponse(BaseModel):
"""Formato estándar de respuesta entre servicios"""
success: bool
processing_time: float
timestamp: str = datetime.utcnow().isoformat()
service_name: str
service_version: str = "1.0.0"
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
warnings: Optional[List[str]] = None
metadata: Optional[Dict[str, Any]] = None

class ServiceHealthResponse(BaseModel):
"""Formato de respuesta para health checks"""
status: str # healthy, degraded, unhealthy
timestamp: str = datetime.utcnow().isoformat()
service_name: str
version: str
uptime_seconds: float
checks: Dict[str, str] # dependency checks
metrics: Optional[Dict[str, Any]] = None

Manejo de Errores Inter-Servicios

class ServiceErrorHandler:
"""
Maneja errores estándar entre servicios con códigos específicos
"""

ERROR_CODES = {
"IMAGE_PROCESSING_FAILED": {
"code": "IP001",
"message": "Failed to process image",
"retry": True
},
"EMBEDDING_GENERATION_FAILED": {
"code": "ML001",
"message": "Failed to generate embeddings",
"retry": True
},
"CLASSIFICATION_FAILED": {
"code": "ML002",
"message": "Failed to classify document",
"retry": False
},
"OCR_EXTRACTION_FAILED": {
"code": "OCR001",
"message": "Failed to extract text",
"retry": True
},
"SERVICE_UNAVAILABLE": {
"code": "SVC001",
"message": "Service temporarily unavailable",
"retry": True
},
"VALIDATION_ERROR": {
"code": "VAL001",
"message": "Input validation failed",
"retry": False
}
}

@classmethod
def create_error_response(cls, error_type: str, details: str = None) -> Dict[str, Any]:
"""Crea respuesta de error estándar"""
error_info = cls.ERROR_CODES.get(error_type, {
"code": "GEN001",
"message": "Unknown error",
"retry": False
})

return {
"success": False,
"error": {
"code": error_info["code"],
"message": error_info["message"],
"details": details,
"retryable": error_info["retry"],
"timestamp": datetime.utcnow().isoformat()
}
}

Monitoreo y Observabilidad

Trazabilidad de Requests

import uuid
from contextvars import ContextVar
from fastapi import Request, Response
import time

# Context variable para tracking de requests
request_id_var: ContextVar[str] = ContextVar('request_id', default='')

class RequestTrackingMiddleware:
"""
Middleware para rastrear requests a través de servicios
"""

def __init__(self, app):
self.app = app

async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# Generar o extraer request ID
request_id = self._extract_or_generate_request_id(scope)
request_id_var.set(request_id)

# Agregar request ID a los headers de respuesta
async def send_wrapper(message):
if message["type"] == "http.response.start":
headers = list(message.get("headers", []))
headers.append([b"X-Request-ID", request_id.encode()])
message["headers"] = headers
await send(message)

await self.app(scope, receive, send_wrapper)
else:
await self.app(scope, receive, send)

def _extract_or_generate_request_id(self, scope) -> str:
"""Extrae request ID de headers o genera uno nuevo"""
headers = dict(scope.get("headers", []))

# Buscar en headers existentes
request_id_header = headers.get(b"x-request-id") or headers.get(b"X-Request-ID")
if request_id_header:
return request_id_header.decode()

# Generar nuevo request ID
return str(uuid.uuid4())

class ServiceMetricsCollector:
"""
Recolecta métricas de comunicación entre servicios
"""

def __init__(self):
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"average_response_time": 0.0,
"service_calls": {},
"error_rates": {}
}

def record_service_call(self, service_name: str, success: bool,
response_time: float, error_code: str = None):
"""Registra una llamada a servicio"""
self.metrics["total_requests"] += 1

if success:
self.metrics["successful_requests"] += 1
else:
self.metrics["failed_requests"] += 1

# Actualizar tiempo promedio de respuesta
total_requests = self.metrics["total_requests"]
current_avg = self.metrics["average_response_time"]
self.metrics["average_response_time"] = (
(current_avg * (total_requests - 1) + response_time) / total_requests
)

# Registrar métricas por servicio
if service_name not in self.metrics["service_calls"]:
self.metrics["service_calls"][service_name] = {
"total": 0,
"successful": 0,
"failed": 0,
"avg_response_time": 0.0
}

service_stats = self.metrics["service_calls"][service_name]
service_stats["total"] += 1

if success:
service_stats["successful"] += 1
else:
service_stats["failed"] += 1

# Registrar código de error
if error_code:
if service_name not in self.metrics["error_rates"]:
self.metrics["error_rates"][service_name] = {}

error_stats = self.metrics["error_rates"][service_name]
error_stats[error_code] = error_stats.get(error_code, 0) + 1

# Actualizar tiempo promedio por servicio
service_total = service_stats["total"]
service_avg = service_stats["avg_response_time"]
service_stats["avg_response_time"] = (
(service_avg * (service_total - 1) + response_time) / service_total
)

def get_metrics_summary(self) -> Dict[str, Any]:
"""Obtiene resumen de métricas"""
total = self.metrics["total_requests"]
success_rate = (self.metrics["successful_requests"] / total * 100) if total > 0 else 0

return {
"overview": {
"total_requests": total,
"success_rate_percent": round(success_rate, 2),
"average_response_time_ms": round(self.metrics["average_response_time"] * 1000, 2)
},
"by_service": self.metrics["service_calls"],
"error_breakdown": self.metrics["error_rates"]
}

Configuración de Servicios

Variables de Entorno para Comunicación

# URLs de servicios
IMAGE_PROCESSOR_URL=http://localhost:8001
ML_EMBEDDINGS_URL=http://localhost:8002
ML_CLASSIFIER_URL=http://localhost:8003
OCR_EXTRACTOR_URL=http://localhost:8004

# Configuración de timeouts (segundos)
DEFAULT_SERVICE_TIMEOUT=30
IMAGE_PROCESSOR_TIMEOUT=30
ML_EMBEDDINGS_TIMEOUT=45
ML_CLASSIFIER_TIMEOUT=30
OCR_EXTRACTOR_TIMEOUT=60

# Configuración de reintentos
DEFAULT_RETRY_COUNT=3
CIRCUIT_BREAKER_THRESHOLD=5
CIRCUIT_BREAKER_TIMEOUT=60

# Configuración de conexiones
MAX_CONNECTIONS_PER_SERVICE=20
CONNECTION_KEEPALIVE_TIMEOUT=30

# Monitoreo
ENABLE_REQUEST_TRACKING=true
ENABLE_METRICS_COLLECTION=true
METRICS_RETENTION_HOURS=24

Docker Compose para Servicios

version: '3.8'

services:
api-gateway:
build: ./packages/api-gateway
ports:
- "8000:8000"
environment:
- IMAGE_PROCESSOR_URL=http://image-processor:8001
- ML_EMBEDDINGS_URL=http://ml-embeddings:8002
- ML_CLASSIFIER_URL=http://ml-classifier:8003
- OCR_EXTRACTOR_URL=http://ocr-extractor:8004
depends_on:
- image-processor
- ml-embeddings
- ml-classifier
- ocr-extractor
networks:
- iris-network

image-processor:
build: ./packages/image-processor
ports:
- "8001:8001"
networks:
- iris-network

ml-embeddings:
build: ./packages/ml-embeddings
ports:
- "8002:8002"
networks:
- iris-network

ml-classifier:
build: ./packages/ml-classifier
ports:
- "8003:8003"
volumes:
- ./data/models:/app/data/models
networks:
- iris-network

ocr-extractor:
build: ./packages/ocr-extractor
ports:
- "8004:8004"
volumes:
- paddle_models:/root/.paddlex
networks:
- iris-network

networks:
iris-network:
driver: bridge

volumes:
paddle_models:

La comunicación entre microservicios en IRIS está diseñada para ser robusta, escalable y observable, proporcionando un pipeline confiable para el procesamiento completo de documentos.