Saltar al contenido principal

Fase 2: Embeddings y Clustering

Propósito

La Fase 2 del pipeline IRIS se encarga de generar representaciones vectoriales (embeddings) de las imágenes y descubrir automáticamente tipos de documentos mediante clustering no supervisado. Esta fase es clave para la clasificación automática sin necesidad de etiquetas manuales.

Arquitectura del Servicio

Componentes Principales

Tecnologías Utilizadas

  • PyTorch: Framework de machine learning principal
  • TIMM: Modelos pre-entrenados de visión computacional
  • scikit-learn: Algoritmos de clustering y métricas
  • FAISS: Base de datos vectorial para embeddings
  • FastAPI: Framework web para la API REST
  • Transformers: Modelos Vision Transformer (ViT)

Funcionalidades

1. Generación de Embeddings

El servicio utiliza modelos Vision Transformer pre-entrenados para generar representaciones vectoriales densas de las imágenes de documentos.

Modelo Principal: Vision Transformer (ViT)

import torch
import timm
from PIL import Image
import numpy as np

class EmbeddingGenerator:
def __init__(self, model_name="vit_base_patch16_224", device="cuda"):
"""
Inicializa el generador de embeddings con Vision Transformer

Args:
model_name: Nombre del modelo pre-entrenado a usar
device: Dispositivo de computación (cuda/cpu)
"""
self.device = device
self.model = timm.create_model(
model_name,
pretrained=True,
num_classes=0 # Sin clasificación, solo embeddings
)
self.model.to(device)
self.model.eval()

# Obtener transformaciones de imagen requeridas
self.transform = timm.data.resolve_data_config({}, model=self.model)
self.transform = timm.data.create_transform(**self.transform)

def generate_embedding(self, image: Image.Image) -> np.ndarray:
"""
Genera embedding vectorial para una imagen

Args:
image: Imagen PIL del documento

Returns:
embedding: Vector de características (768 dimensiones para ViT-base)
"""
# Preparar imagen
input_tensor = self.transform(image).unsqueeze(0).to(self.device)

# Generar embedding
with torch.no_grad():
embedding = self.model(input_tensor)

# Convertir a numpy y normalizar
embedding = embedding.cpu().numpy().squeeze()
embedding = embedding / np.linalg.norm(embedding) # L2 normalization

return embedding

Características de los Embeddings:

  • Dimensionalidad: 768 dimensiones (ViT-base) o 1024 (ViT-large)
  • Normalización: Vectores L2-normalizados para similitud coseno
  • Reproducibilidad: Embeddings consistentes para la misma imagen
  • Transferencia: Conocimiento pre-entrenado en ImageNet
  • Robustez: Invariante a rotaciones menores y cambios de iluminación

2. Clustering Automático

Una vez generados los embeddings, el sistema aplica algoritmos de clustering para descubrir automáticamente tipos de documentos sin supervisión.

Algoritmo K-means con Selección Automática de K:

from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import numpy as np

class AutomaticClustering:
def __init__(self, min_clusters=2, max_clusters=10, random_state=42):
"""
Clustering automático con selección óptima de K

Args:
min_clusters: Número mínimo de clusters a evaluar
max_clusters: Número máximo de clusters a evaluar
random_state: Semilla para reproducibilidad
"""
self.min_clusters = min_clusters
self.max_clusters = max_clusters
self.random_state = random_state
self.best_k = None
self.best_model = None
self.silhouette_scores = {}

def find_optimal_clusters(self, embeddings: np.ndarray) -> dict:
"""
Encuentra el número óptimo de clusters usando silhouette score

Args:
embeddings: Matriz de embeddings (n_samples, n_features)

Returns:
results: Diccionario con resultados del clustering
"""
best_score = -1
results = {
"silhouette_scores": {},
"cluster_assignments": {},
"optimal_k": None,
"best_silhouette_score": None
}

for k in range(self.min_clusters, min(self.max_clusters + 1, len(embeddings))):
# Entrenar K-means
kmeans = KMeans(
n_clusters=k,
random_state=self.random_state,
n_init=10,
max_iter=300
)
cluster_labels = kmeans.fit_predict(embeddings)

# Calcular silhouette score
silhouette_avg = silhouette_score(embeddings, cluster_labels)

results["silhouette_scores"][k] = silhouette_avg
results["cluster_assignments"][k] = cluster_labels.tolist()

# Actualizar mejor resultado
if silhouette_avg > best_score:
best_score = silhouette_avg
self.best_k = k
self.best_model = kmeans
results["optimal_k"] = k
results["best_silhouette_score"] = silhouette_avg

return results

def predict_cluster(self, embedding: np.ndarray) -> int:
"""
Predice el cluster para un nuevo embedding

Args:
embedding: Vector de características

Returns:
cluster_id: ID del cluster asignado
"""
if self.best_model is None:
raise ValueError("Modelo no entrenado. Ejecutar find_optimal_clusters primero.")

return self.best_model.predict(embedding.reshape(1, -1))[0]

Métricas de Evaluación:

  • Silhouette Score: Medida de cohesión y separación de clusters
  • Inertia: Suma de distancias cuadráticas intra-cluster
  • Calinski-Harabasz Index: Ratio de dispersión entre/dentro clusters
  • Davies-Bouldin Index: Medida de similaridad promedio entre clusters

3. Análisis de Clustering

El sistema proporciona análisis detallado de los clusters descubiertos para interpretar los tipos de documentos.

Análisis de Clusters:

def analyze_clusters(embeddings: np.ndarray, cluster_labels: np.ndarray, 
image_paths: list) -> dict:
"""
Analiza las características de los clusters encontrados

Args:
embeddings: Embeddings de las imágenes
cluster_labels: Etiquetas de cluster asignadas
image_paths: Rutas de las imágenes correspondientes

Returns:
analysis: Análisis detallado de clusters
"""
n_clusters = len(np.unique(cluster_labels))
analysis = {
"n_clusters": n_clusters,
"cluster_sizes": {},
"cluster_centroids": {},
"cluster_spread": {},
"representative_images": {},
"silhouette_per_cluster": {}
}

for cluster_id in range(n_clusters):
# Índices de imágenes en este cluster
cluster_mask = cluster_labels == cluster_id
cluster_embeddings = embeddings[cluster_mask]
cluster_images = [image_paths[i] for i in np.where(cluster_mask)[0]]

# Estadísticas básicas
analysis["cluster_sizes"][cluster_id] = len(cluster_embeddings)

# Centroide del cluster
centroid = np.mean(cluster_embeddings, axis=0)
analysis["cluster_centroids"][cluster_id] = centroid.tolist()

# Dispersión del cluster (desviación estándar promedio)
distances_to_centroid = np.linalg.norm(
cluster_embeddings - centroid, axis=1
)
analysis["cluster_spread"][cluster_id] = {
"mean_distance": float(np.mean(distances_to_centroid)),
"std_distance": float(np.std(distances_to_centroid)),
"max_distance": float(np.max(distances_to_centroid))
}

# Imagen más representativa (más cercana al centroide)
closest_idx = np.argmin(distances_to_centroid)
original_idx = np.where(cluster_mask)[0][closest_idx]
analysis["representative_images"][cluster_id] = {
"image_path": image_paths[original_idx],
"distance_to_centroid": float(distances_to_centroid[closest_idx])
}

# Silhouette score para este cluster
if len(cluster_embeddings) > 1:
cluster_silhouette = silhouette_score(
embeddings, cluster_labels, sample_size=len(embeddings)
)
analysis["silhouette_per_cluster"][cluster_id] = cluster_silhouette

return analysis

API Endpoints

POST /embed

Genera embeddings para una o múltiples imágenes.

Request:

curl -X POST "http://localhost:8002/embed" \
-H "Content-Type: multipart/form-data" \
-F "file=@document.jpg"

Batch Request:

curl -X POST "http://localhost:8002/embed" \
-H "Content-Type: multipart/form-data" \
-F "files=@doc1.jpg" \
-F "files=@doc2.jpg" \
-F "files=@doc3.jpg"

Response:

{
"success": true,
"processing_time": 0.45,
"embeddings": [
{
"filename": "document.jpg",
"embedding": [0.123, -0.456, 0.789, ...],
"embedding_dim": 768,
"model_version": "vit_base_patch16_224"
}
],
"total_processed": 1
}

POST /cluster

Ejecuta clustering automático en un conjunto de embeddings.

Request:

curl -X POST "http://localhost:8002/cluster" \
-H "Content-Type: application/json" \
-d '{
"embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...], ...],
"min_clusters": 2,
"max_clusters": 8,
"image_paths": ["img1.jpg", "img2.jpg", ...]
}'

Response:

{
"success": true,
"processing_time": 1.23,
"clustering_results": {
"optimal_k": 3,
"best_silhouette_score": 0.71,
"cluster_assignments": [0, 1, 0, 2, 1, 0, 2],
"silhouette_scores": {
"2": 0.65,
"3": 0.71,
"4": 0.68,
"5": 0.62
}
},
"cluster_analysis": {
"n_clusters": 3,
"cluster_sizes": {
"0": 3,
"1": 2,
"2": 2
},
"representative_images": {
"0": {
"image_path": "img1.jpg",
"distance_to_centroid": 0.15
},
"1": {
"image_path": "img2.jpg",
"distance_to_centroid": 0.12
},
"2": {
"image_path": "img4.jpg",
"distance_to_centroid": 0.18
}
}
}
}

POST /embed_and_cluster

Endpoint combinado que genera embeddings y ejecuta clustering en una sola operación.

Request:

curl -X POST "http://localhost:8002/embed_and_cluster" \
-H "Content-Type: multipart/form-data" \
-F "files=@doc1.jpg" \
-F "files=@doc2.jpg" \
-F "files=@doc3.jpg" \
-F "min_clusters=2" \
-F "max_clusters=5"

GET /models

Lista los modelos disponibles para generar embeddings.

Response:

{
"available_models": [
{
"name": "vit_base_patch16_224",
"embedding_dim": 768,
"description": "Vision Transformer Base, patches 16x16",
"pretrained": "ImageNet-21k+1k",
"status": "loaded"
},
{
"name": "vit_large_patch16_224",
"embedding_dim": 1024,
"description": "Vision Transformer Large, patches 16x16",
"pretrained": "ImageNet-21k+1k",
"status": "available"
}
],
"current_model": "vit_base_patch16_224"
}

POST /switch_model

Cambia el modelo utilizado para generar embeddings.

Request:

{
"model_name": "vit_large_patch16_224"
}

Configuración

Variables de Entorno

# Configuración del servicio
ML_EMBEDDINGS_PORT=8002
ML_EMBEDDINGS_HOST=0.0.0.0
ML_EMBEDDINGS_WORKERS=2

# Configuración de modelos
DEFAULT_MODEL=vit_base_patch16_224
MODEL_CACHE_DIR=/app/data/models/embeddings
EMBEDDING_DIMENSION=768

# Configuración de clustering
DEFAULT_MIN_CLUSTERS=2
DEFAULT_MAX_CLUSTERS=10
CLUSTERING_RANDOM_STATE=42
SILHOUETTE_SAMPLE_SIZE=1000

# Configuración de PyTorch
PYTORCH_DEVICE=cuda
TORCH_NUM_THREADS=4
CUDA_VISIBLE_DEVICES=0

# Configuración de memoria
MAX_BATCH_SIZE=32
EMBEDDING_CACHE_SIZE=1000
MODEL_MEMORY_FRACTION=0.8

Optimización de Performance

1. Batch Processing

def process_batch_embeddings(images: List[Image.Image], 
batch_size: int = 16) -> np.ndarray:
"""
Procesa múltiples imágenes en batches para mejor eficiencia
"""
embeddings = []

for i in range(0, len(images), batch_size):
batch = images[i:i + batch_size]

# Preparar tensores del batch
batch_tensors = torch.stack([
self.transform(img) for img in batch
]).to(self.device)

# Generar embeddings del batch
with torch.no_grad():
batch_embeddings = self.model(batch_tensors)

embeddings.append(batch_embeddings.cpu().numpy())

return np.vstack(embeddings)

2. Model Caching

from functools import lru_cache
import pickle
import os

class ModelCache:
def __init__(self, cache_dir: str):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)

@lru_cache(maxsize=3)
def load_model(self, model_name: str):
"""Cache de modelos cargados en memoria"""
cache_path = os.path.join(self.cache_dir, f"{model_name}.pkl")

if os.path.exists(cache_path):
with open(cache_path, 'rb') as f:
return pickle.load(f)

# Cargar y cachear modelo
model = timm.create_model(model_name, pretrained=True, num_classes=0)

with open(cache_path, 'wb') as f:
pickle.dump(model, f)

return model

3. Vector Database Integration

import faiss
import numpy as np

class EmbeddingDatabase:
def __init__(self, embedding_dim: int = 768):
"""
Base de datos vectorial para búsqueda eficiente de embeddings
"""
self.embedding_dim = embedding_dim
self.index = faiss.IndexFlatIP(embedding_dim) # Inner product (cosine similarity)
self.metadata = []

def add_embeddings(self, embeddings: np.ndarray, metadata: List[dict]):
"""Añadir embeddings a la base de datos"""
# Normalizar embeddings para similitud coseno
embeddings = embeddings.astype('float32')
faiss.normalize_L2(embeddings)

# Añadir al índice
self.index.add(embeddings)
self.metadata.extend(metadata)

def search_similar(self, query_embedding: np.ndarray, k: int = 5) -> List[dict]:
"""Buscar embeddings más similares"""
query_embedding = query_embedding.astype('float32').reshape(1, -1)
faiss.normalize_L2(query_embedding)

distances, indices = self.index.search(query_embedding, k)

results = []
for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
if idx < len(self.metadata):
result = self.metadata[idx].copy()
result['similarity_score'] = float(distance)
result['rank'] = i + 1
results.append(result)

return results

Interpretación de Resultados

Análisis de Silhouette Score

El silhouette score es la métrica principal para evaluar la calidad del clustering:

  • Score > 0.7: Clustering excelente, clusters bien separados
  • Score 0.5-0.7: Clustering bueno, separación razonable
  • Score 0.3-0.5: Clustering moderado, algunos overlaps
  • Score < 0.3: Clustering pobre, considerar diferentes parámetros

Interpretación de Clusters

def interpret_clusters(cluster_analysis: dict, 
confidence_threshold: float = 0.6) -> dict:
"""
Interpreta automáticamente los tipos de documentos en cada cluster
"""
interpretations = {}

for cluster_id, cluster_info in cluster_analysis["cluster_sizes"].items():
cluster_size = cluster_info
representative = cluster_analysis["representative_images"][cluster_id]
spread = cluster_analysis["cluster_spread"][cluster_id]

# Análisis de cohesión
if spread["mean_distance"] < 0.3:
cohesion = "high"
elif spread["mean_distance"] < 0.5:
cohesion = "medium"
else:
cohesion = "low"

# Análisis de tamaño
if cluster_size < 3:
size_assessment = "small"
elif cluster_size < 10:
size_assessment = "medium"
else:
size_assessment = "large"

interpretations[cluster_id] = {
"document_type": f"Type_{cluster_id}", # Placeholder
"confidence": cohesion,
"size_assessment": size_assessment,
"representative_image": representative["image_path"],
"characteristics": {
"cohesion": cohesion,
"avg_distance_to_center": spread["mean_distance"],
"cluster_density": 1.0 / (spread["std_distance"] + 1e-6)
}
}

return interpretations

Recomendaciones Automáticas

def generate_clustering_recommendations(clustering_results: dict) -> List[str]:
"""
Genera recomendaciones basadas en los resultados del clustering
"""
recommendations = []

silhouette_score = clustering_results["best_silhouette_score"]
n_clusters = clustering_results["optimal_k"]

# Recomendaciones basadas en silhouette score
if silhouette_score < 0.3:
recommendations.append(
"❌ Clustering pobre. Considerar: más datos, diferentes modelos, o preprocesamiento adicional."
)
elif silhouette_score < 0.5:
recommendations.append(
"⚠️ Clustering moderado. Puede beneficiarse de más datos o ajuste de parámetros."
)
else:
recommendations.append(
"✅ Clustering exitoso. Los clusters identificados son significativos."
)

# Recomendaciones basadas en número de clusters
if n_clusters <= 2:
recommendations.append(
"🔍 Pocos clusters detectados. Verificar si hay suficiente diversidad en los datos."
)
elif n_clusters >= 8:
recommendations.append(
"📊 Muchos clusters detectados. Considerar si algunos pueden ser sub-tipos del mismo documento."
)

return recommendations

La Fase 2 es crucial para el pipeline IRIS ya que permite el descubrimiento automático de tipos de documentos, eliminando la necesidad de clasificación manual y permitiendo que el sistema se adapte a nuevos tipos de documentos automáticamente.