Saltar al contenido principal

Fases 5-6: OCR y Extracción JSON

Propósito

Las Fases 5 y 6 del pipeline IRIS constituyen la etapa final donde se extrae el texto de los documentos clasificados y se estructura la información en formato JSON utilizable. La Fase 5 aplica OCR especializado según el tipo de documento, mientras que la Fase 6 extrae campos específicos y los organiza en estructuras de datos coherentes.

Arquitectura del Servicio

Componentes Principales

Tecnologías Utilizadas

  • PaddleOCR: Motor principal de OCR multiidioma
  • OpenCV: Procesamiento adicional de imágenes
  • spaCy: Reconocimiento de entidades nombradas
  • Regex: Patrones para extracción de campos específicos
  • FastAPI: API REST para servicios de extracción
  • Pydantic: Validación y serialización de datos

Funcionalidades

Fase 5: OCR Especializado

1. Configuración Dinámica de OCR

El sistema utiliza configuraciones específicas de PaddleOCR optimizadas para cada tipo de documento identificado en la Fase 4.

OCR Configuration Manager:

import paddle
from paddleocr import PaddleOCR
import cv2
import numpy as np
from typing import Dict, List, Tuple, Optional

class OCRConfigurationManager:
"""
Gestiona configuraciones específicas de OCR para diferentes tipos de documentos
"""

def __init__(self):
self.ocr_instances = {}
self.document_configs = {
'ficha_residencia': {
'lang': 'es',
'det_db_thresh': 0.3,
'det_db_box_thresh': 0.5,
'det_db_unclip_ratio': 1.6,
'rec_batch_num': 6,
'use_angle_cls': True,
'use_space_char': True,
'drop_score': 0.3
},
'cedula_identidad': {
'lang': 'es',
'det_db_thresh': 0.2,
'det_db_box_thresh': 0.6,
'det_db_unclip_ratio': 2.0,
'rec_batch_num': 8,
'use_angle_cls': True,
'use_space_char': True,
'drop_score': 0.2
},
'pasaporte': {
'lang': 'en',
'det_db_thresh': 0.3,
'det_db_box_thresh': 0.6,
'det_db_unclip_ratio': 1.8,
'rec_batch_num': 6,
'use_angle_cls': True,
'use_space_char': True,
'drop_score': 0.3
},
'default': {
'lang': 'es',
'det_db_thresh': 0.3,
'det_db_box_thresh': 0.5,
'det_db_unclip_ratio': 1.5,
'rec_batch_num': 6,
'use_angle_cls': True,
'use_space_char': True,
'drop_score': 0.3
}
}

def get_ocr_instance(self, document_type: str) -> PaddleOCR:
"""
Obtiene o crea una instancia de OCR específica para el tipo de documento

Args:
document_type: Tipo de documento clasificado

Returns:
ocr_instance: Instancia de PaddleOCR configurada
"""
# Normalizar tipo de documento
doc_type = document_type.lower().replace(' ', '_')

# Usar configuración específica o default
config = self.document_configs.get(doc_type, self.document_configs['default'])

# Crear clave única para cache
cache_key = f"{doc_type}_{hash(str(sorted(config.items())))}"

# Verificar si ya existe en cache
if cache_key not in self.ocr_instances:
print(f"[OCR] Creando nueva instancia para {document_type}")
self.ocr_instances[cache_key] = PaddleOCR(**config)

return self.ocr_instances[cache_key]

def preprocess_image_for_ocr(self, image: np.ndarray, document_type: str) -> np.ndarray:
"""
Aplica preprocesamiento específico según el tipo de documento

Args:
image: Imagen de entrada
document_type: Tipo de documento

Returns:
processed_image: Imagen optimizada para OCR
"""
doc_type = document_type.lower().replace(' ', '_')

if doc_type == 'cedula_identidad':
# Para cédulas: mejorar contraste y nitidez
processed = self._enhance_id_card(image)
elif doc_type == 'pasaporte':
# Para pasaportes: manejar textos pequeños y marcas de agua
processed = self._enhance_passport(image)
elif doc_type == 'ficha_residencia':
# Para fichas: balance entre texto impreso y manuscrito
processed = self._enhance_residence_form(image)
else:
# Procesamiento general
processed = self._general_enhancement(image)

return processed

def _enhance_id_card(self, image: np.ndarray) -> np.ndarray:
"""Optimización específica para cédulas de identidad"""
# Convertir a escala de grises
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image

# Ecualización adaptativa para manejar reflectores
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)

# Sharpening para texto pequeño
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(enhanced, -1, kernel)

# Reducción de ruido manteniendo bordes
denoised = cv2.bilateralFilter(sharpened, 9, 75, 75)

return denoised

def _enhance_passport(self, image: np.ndarray) -> np.ndarray:
"""Optimización específica para pasaportes"""
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image

# Eliminar marcas de agua y patrones de fondo
# Filtro morfológico para eliminar líneas finas
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
processed = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)

# Threshold adaptativo para manejar iluminación irregular
adaptive = cv2.adaptiveThreshold(
processed, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)

# Invertir si es necesario (texto oscuro sobre fondo claro)
if np.mean(adaptive) < 127:
adaptive = cv2.bitwise_not(adaptive)

return adaptive

def _enhance_residence_form(self, image: np.ndarray) -> np.ndarray:
"""Optimización específica para fichas de residencia"""
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image

# Balance para texto impreso y manuscrito
# Ecualización de histograma
equalized = cv2.equalizeHist(gray)

# Filtro Gaussiano suave para reducir ruido sin perder detalles
blurred = cv2.GaussianBlur(equalized, (3, 3), 0)

# Sharpening moderado
kernel = np.array([[0,-1,0], [-1,5,-1], [0,-1,0]])
sharpened = cv2.filter2D(blurred, -1, kernel)

return sharpened

def _general_enhancement(self, image: np.ndarray) -> np.ndarray:
"""Procesamiento general para documentos no específicos"""
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image

# Ecualización adaptativa estándar
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)

# Reducción de ruido básica
denoised = cv2.fastNlMeansDenoising(enhanced)

return denoised

2. Motor de OCR Especializado

Implementación del extractor de texto que maneja diferentes configuraciones y idiomas.

OCR Text Extractor:

class OCRTextExtractor:
"""
Extractor de texto especializado que utiliza configuraciones específicas
"""

def __init__(self):
self.config_manager = OCRConfigurationManager()
self.extraction_history = []

def extract_text(self, image: np.ndarray, document_type: str,
confidence_threshold: float = 0.3) -> Dict:
"""
Extrae texto de una imagen usando OCR especializado

Args:
image: Imagen del documento
document_type: Tipo de documento clasificado
confidence_threshold: Umbral mínimo de confianza

Returns:
extraction_result: Resultado completo de la extracción
"""
start_time = time.time()

# Preprocesar imagen
processed_image = self.config_manager.preprocess_image_for_ocr(
image, document_type
)

# Obtener instancia OCR específica
ocr_instance = self.config_manager.get_ocr_instance(document_type)

# Ejecutar OCR
raw_result = ocr_instance.ocr(processed_image, cls=True)

# Procesar resultados
extraction_result = self._process_ocr_results(
raw_result, confidence_threshold, document_type
)

# Agregar metadata
extraction_result.update({
'document_type': document_type,
'processing_time': time.time() - start_time,
'image_dimensions': {
'width': image.shape[1],
'height': image.shape[0]
},
'confidence_threshold': confidence_threshold
})

# Guardar en historial
self.extraction_history.append({
'timestamp': time.time(),
'document_type': document_type,
'success': extraction_result['success'],
'text_blocks_found': len(extraction_result['text_blocks'])
})

return extraction_result

def _process_ocr_results(self, raw_result: List, confidence_threshold: float,
document_type: str) -> Dict:
"""
Procesa los resultados crudos de PaddleOCR

Args:
raw_result: Resultado crudo de PaddleOCR
confidence_threshold: Umbral de confianza
document_type: Tipo de documento

Returns:
processed_result: Resultado procesado y estructurado
"""
if not raw_result or not raw_result[0]:
return {
'success': False,
'error': 'No text detected',
'text_blocks': [],
'full_text': '',
'average_confidence': 0.0
}

text_blocks = []
all_confidences = []
full_text_lines = []

for line_result in raw_result[0]:
# Extraer coordenadas de la caja
box_coords = line_result[0]
text_info = line_result[1]

text = text_info[0] if isinstance(text_info, tuple) else text_info
confidence = text_info[1] if isinstance(text_info, tuple) and len(text_info) > 1 else 1.0

# Filtrar por confianza
if confidence >= confidence_threshold:
text_block = {
'text': text.strip(),
'confidence': float(confidence),
'bbox': {
'x1': int(min([point[0] for point in box_coords])),
'y1': int(min([point[1] for point in box_coords])),
'x2': int(max([point[0] for point in box_coords])),
'y2': int(max([point[1] for point in box_coords]))
},
'coordinates': [[int(point[0]), int(point[1])] for point in box_coords]
}

text_blocks.append(text_block)
all_confidences.append(confidence)
full_text_lines.append(text.strip())

# Ordenar bloques de texto por posición (top-to-bottom, left-to-right)
text_blocks.sort(key=lambda block: (block['bbox']['y1'], block['bbox']['x1']))

return {
'success': len(text_blocks) > 0,
'text_blocks': text_blocks,
'full_text': '\n'.join([block['text'] for block in text_blocks]),
'average_confidence': np.mean(all_confidences) if all_confidences else 0.0,
'total_blocks': len(text_blocks),
'filtered_blocks': len(raw_result[0]) - len(text_blocks) if raw_result[0] else 0
}

Fase 6: Extracción y Estructuración JSON

1. Sistema de Patrones de Extracción

Utiliza patrones específicos para extraer campos estructurados según el tipo de documento.

Field Extraction Engine:

import re
import spacy
from datetime import datetime
from typing import Dict, List, Optional, Any

class FieldExtractionEngine:
"""
Motor de extracción de campos específicos usando patrones y NLP
"""

def __init__(self):
# Cargar modelo de spaCy para NER
try:
self.nlp = spacy.load("es_core_news_sm")
except OSError:
print("[WARNING] Modelo spaCy no encontrado. Usando patrones básicos.")
self.nlp = None

# Patrones regex para diferentes tipos de campos
self.field_patterns = {
'cedula_numero': [
r'\b\d{1,2}[-.]?\d{3}[-.]?\d{3}[-.]?\d{1}\b', # Formato estándar
r'\b[A-Z]{1,2}[-.]?\d{6,8}\b', # Con prefijo de letra
r'\bC\.?I\.?:?\s*([A-Z]?[-.]?\d{6,8})\b' # Con prefijo C.I.
],
'nombres': [
r'NOMBRES?:?\s*([A-ZÁÉÍÓÚÑ][a-záéíóúñ]+(?:\s+[A-ZÁÉÍÓÚÑ][a-záéíóúñ]+)*)',
r'PRIMER\s+NOMBRE:?\s*([A-ZÁÉÍÓÚÑ][a-záéíóúñ]+)',
r'NOMBRE:?\s*([A-ZÁÉÍÓÚÑ][a-záéíóúñ]+(?:\s+[A-ZÁÉÍÓÚÑ][a-záéíóúñ]+)*)'
],
'apellidos': [
r'APELLIDOS?:?\s*([A-ZÁÉÍÓÚÑ][a-záéíóúñ]+(?:\s+[A-ZÁÉÍÓÚÑ][a-záéíóúñ]+)*)',
r'PRIMER\s+APELLIDO:?\s*([A-ZÁÉÍÓÚÑ][a-záéíóúñ]+)',
r'APELLIDO:?\s*([A-ZÁÉÍÓÚÑ][a-záéíóúñ]+(?:\s+[A-ZÁÉÍÓÚÑ][a-záéíóúñ]+)*)'
],
'fecha_nacimiento': [
r'FECHA\s+(?:DE\s+)?NACIMIENTO:?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{4})',
r'NACIMIENTO:?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{4})',
r'F\.?\s*NAC\.?:?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{4})'
],
'direccion': [
r'DIRECCIÓN:?\s*([^\\n]+(?:Calle|Avenida|Carrera|Diagonal|Transversal)[^\\n]*)',
r'DOMICILIO:?\s*([^\\n]+)',
r'RESIDENCIA:?\s*([^\\n]+)'
],
'telefono': [
r'TELÉFONO:?\s*(\+?\d{1,4}[-.\s]?\d{3,4}[-.\s]?\d{3,4}[-.\s]?\d{2,4})',
r'TEL\.?:?\s*(\+?\d{1,4}[-.\s]?\d{3,4}[-.\s]?\d{3,4}[-.\s]?\d{2,4})',
r'CELULAR:?\s*(\+?\d{1,4}[-.\s]?\d{3,4}[-.\s]?\d{3,4}[-.\s]?\d{2,4})'
],
'email': [
r'E-?MAIL:?\s*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})',
r'CORREO:?\s*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
],
'estado_civil': [
r'ESTADO\s+CIVIL:?\s*(SOLTERO|CASADO|DIVORCIADO|VIUDO|UNIÓN\s+LIBRE)',
r'E\.?\s*CIVIL:?\s*(SOLTERO|CASADO|DIVORCIADO|VIUDO|UNIÓN\s+LIBRE)'
]
}

# Configuraciones específicas por tipo de documento
self.document_field_configs = {
'ficha_residencia': {
'required_fields': ['nombres', 'apellidos', 'cedula_numero', 'direccion'],
'optional_fields': ['fecha_nacimiento', 'telefono', 'email', 'estado_civil'],
'field_order': ['cedula_numero', 'nombres', 'apellidos', 'fecha_nacimiento',
'direccion', 'telefono', 'email', 'estado_civil']
},
'cedula_identidad': {
'required_fields': ['nombres', 'apellidos', 'cedula_numero', 'fecha_nacimiento'],
'optional_fields': ['direccion'],
'field_order': ['cedula_numero', 'nombres', 'apellidos', 'fecha_nacimiento', 'direccion']
},
'pasaporte': {
'required_fields': ['nombres', 'apellidos', 'fecha_nacimiento'],
'optional_fields': ['cedula_numero'],
'field_order': ['nombres', 'apellidos', 'fecha_nacimiento', 'cedula_numero']
}
}

def extract_fields(self, text: str, document_type: str,
text_blocks: List[Dict] = None) -> Dict:
"""
Extrae campos específicos del texto usando patrones y NLP

Args:
text: Texto completo extraído por OCR
document_type: Tipo de documento
text_blocks: Bloques de texto con posiciones (opcional)

Returns:
extracted_fields: Campos extraídos y estructurados
"""
# Obtener configuración para el tipo de documento
doc_type = document_type.lower().replace(' ', '_')
config = self.document_field_configs.get(doc_type, {
'required_fields': list(self.field_patterns.keys()),
'optional_fields': [],
'field_order': list(self.field_patterns.keys())
})

extracted_fields = {}
field_confidence = {}
extraction_metadata = {
'document_type': document_type,
'extraction_method': 'pattern_matching',
'total_patterns_tried': 0,
'successful_extractions': 0
}

# Extraer campos usando patrones regex
for field_name in config['field_order']:
if field_name in self.field_patterns:
field_value, confidence = self._extract_field_with_patterns(
text, field_name, text_blocks
)

if field_value:
extracted_fields[field_name] = field_value
field_confidence[field_name] = confidence
extraction_metadata['successful_extractions'] += 1

extraction_metadata['total_patterns_tried'] += len(self.field_patterns[field_name])

# Validar campos requeridos
missing_required = [
field for field in config['required_fields']
if field not in extracted_fields
]

# Usar NLP como fallback para campos faltantes
if missing_required and self.nlp:
nlp_fields = self._extract_with_nlp(text, missing_required)
for field, value in nlp_fields.items():
if field not in extracted_fields:
extracted_fields[field] = value
field_confidence[field] = 0.7 # Confianza moderada para NLP
extraction_metadata['extraction_method'] = 'hybrid'

# Calcular confianza general
overall_confidence = np.mean(list(field_confidence.values())) if field_confidence else 0.0

return {
'success': len(extracted_fields) > 0,
'extracted_fields': extracted_fields,
'field_confidence': field_confidence,
'overall_confidence': overall_confidence,
'missing_required_fields': [
field for field in config['required_fields']
if field not in extracted_fields
],
'extraction_metadata': extraction_metadata
}

def _extract_field_with_patterns(self, text: str, field_name: str,
text_blocks: List[Dict] = None) -> Tuple[Optional[str], float]:
"""
Extrae un campo específico usando patrones regex

Args:
text: Texto donde buscar
field_name: Nombre del campo a extraer
text_blocks: Bloques de texto con metadata de posición

Returns:
field_value: Valor extraído (None si no se encuentra)
confidence: Confianza de la extracción (0.0-1.0)
"""
patterns = self.field_patterns.get(field_name, [])

for i, pattern in enumerate(patterns):
matches = re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE)

for match in matches:
extracted_value = match.group(1) if match.groups() else match.group(0)
extracted_value = extracted_value.strip()

# Validar el valor extraído
if self._validate_field_value(field_name, extracted_value):
# Calcular confianza basada en:
# - Posición del patrón (primeros patrones = mayor confianza)
# - Claridad del match
# - Posición en el texto (si hay text_blocks)
confidence = self._calculate_extraction_confidence(
field_name, extracted_value, i, len(patterns), text_blocks, match.span()
)

return extracted_value, confidence

return None, 0.0

def _validate_field_value(self, field_name: str, value: str) -> bool:
"""
Valida que un valor extraído sea coherente para el tipo de campo
"""
if not value or len(value.strip()) < 2:
return False

validation_rules = {
'cedula_numero': lambda v: len(re.sub(r'[^0-9]', '', v)) >= 6,
'nombres': lambda v: len(v) >= 2 and re.match(r'^[A-Za-záéíóúñÁÉÍÓÚÑ\s]+$', v),
'apellidos': lambda v: len(v) >= 2 and re.match(r'^[A-Za-záéíóúñÁÉÍÓÚÑ\s]+$', v),
'fecha_nacimiento': lambda v: self._validate_date(v),
'telefono': lambda v: len(re.sub(r'[^0-9]', '', v)) >= 7,
'email': lambda v: '@' in v and '.' in v.split('@')[-1]
}

validator = validation_rules.get(field_name)
return validator(value) if validator else True

def _validate_date(self, date_str: str) -> bool:
"""Valida formato de fecha"""
try:
# Intentar parsear diferentes formatos de fecha
date_formats = ['%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d', '%Y/%m/%d']
for fmt in date_formats:
try:
parsed_date = datetime.strptime(date_str, fmt)
# Verificar que la fecha sea razonable (entre 1900 y año actual + 1)
current_year = datetime.now().year
return 1900 <= parsed_date.year <= current_year + 1
except ValueError:
continue
return False
except:
return False

def _calculate_extraction_confidence(self, field_name: str, value: str,
pattern_index: int, total_patterns: int,
text_blocks: List[Dict], match_span: Tuple[int, int]) -> float:
"""
Calcula la confianza de una extracción basada en múltiples factores
"""
base_confidence = 1.0 - (pattern_index * 0.1) # Primeros patrones = mayor confianza

# Factor de validación
validation_factor = 1.0 if self._validate_field_value(field_name, value) else 0.5

# Factor de claridad (longitud razonable, sin caracteres extraños)
clarity_factor = min(1.0, max(0.3, (len(value) - 1) / 20))

# Factor de posición (si está disponible)
position_factor = 1.0
if text_blocks:
# Buscar el bloque que contiene este match
for block in text_blocks:
if match_span[0] <= len(block['text']) and block['confidence']:
position_factor = block['confidence']
break

return min(1.0, base_confidence * validation_factor * clarity_factor * position_factor)

def _extract_with_nlp(self, text: str, missing_fields: List[str]) -> Dict[str, str]:
"""
Usa spaCy NLP como fallback para extraer campos faltantes
"""
if not self.nlp:
return {}

doc = self.nlp(text)
nlp_extractions = {}

# Extraer entidades nombradas
persons = [ent.text for ent in doc.ents if ent.label_ == "PER"]

# Mapear entidades a campos faltantes
if 'nombres' in missing_fields and persons:
# Usar la primera entidad persona como nombre
nlp_extractions['nombres'] = persons[0]

if 'apellidos' in missing_fields and len(persons) > 1:
# Usar la segunda entidad persona como apellido
nlp_extractions['apellidos'] = persons[1]

return nlp_extractions

2. Generador de JSON Estructurado

Convierte los campos extraídos en estructuras JSON bien definidas.

JSON Structure Generator:

from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, Any, List
from datetime import datetime
import json

class PersonalInfo(BaseModel):
"""Información personal básica"""
nombres: Optional[str] = Field(None, description="Nombres de la persona")
apellidos: Optional[str] = Field(None, description="Apellidos de la persona")
nombre_completo: Optional[str] = Field(None, description="Nombre completo combinado")
cedula_numero: Optional[str] = Field(None, description="Número de cédula o documento")
fecha_nacimiento: Optional[str] = Field(None, description="Fecha de nacimiento")

@validator('nombre_completo', pre=True, always=True)
def generate_full_name(cls, v, values):
if v:
return v
nombres = values.get('nombres', '')
apellidos = values.get('apellidos', '')
if nombres and apellidos:
return f"{nombres} {apellidos}".strip()
return nombres or apellidos or None

class ContactInfo(BaseModel):
"""Información de contacto"""
direccion: Optional[str] = Field(None, description="Dirección de residencia")
telefono: Optional[str] = Field(None, description="Número de teléfono")
email: Optional[str] = Field(None, description="Correo electrónico")

class AdditionalInfo(BaseModel):
"""Información adicional"""
estado_civil: Optional[str] = Field(None, description="Estado civil")
ocupacion: Optional[str] = Field(None, description="Ocupación o profesión")
nacionalidad: Optional[str] = Field(None, description="Nacionalidad")

class ExtractionMetadata(BaseModel):
"""Metadata de la extracción"""
document_type: str = Field(..., description="Tipo de documento procesado")
extraction_timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
processing_time: Optional[float] = Field(None, description="Tiempo de procesamiento en segundos")
overall_confidence: float = Field(0.0, ge=0.0, le=1.0, description="Confianza general de la extracción")
field_confidence: Dict[str, float] = Field(default_factory=dict, description="Confianza por campo")
missing_required_fields: List[str] = Field(default_factory=list, description="Campos requeridos faltantes")
ocr_stats: Optional[Dict[str, Any]] = Field(None, description="Estadísticas del OCR")

class StructuredDocument(BaseModel):
"""Documento completamente estructurado"""
personal_info: PersonalInfo = Field(default_factory=PersonalInfo)
contact_info: ContactInfo = Field(default_factory=ContactInfo)
additional_info: AdditionalInfo = Field(default_factory=AdditionalInfo)
metadata: ExtractionMetadata
raw_text: Optional[str] = Field(None, description="Texto completo extraído")

class JSONStructureGenerator:
"""
Genera estructuras JSON válidas y bien formateadas a partir de campos extraídos
"""

def __init__(self):
self.field_mappings = {
# Mapeo de campos extraídos a categorías del modelo
'personal_info': ['nombres', 'apellidos', 'cedula_numero', 'fecha_nacimiento'],
'contact_info': ['direccion', 'telefono', 'email'],
'additional_info': ['estado_civil', 'ocupacion', 'nacionalidad']
}

def generate_structured_json(self, extraction_result: Dict,
raw_text: str, processing_time: float) -> Dict:
"""
Genera JSON estructurado a partir de los resultados de extracción

Args:
extraction_result: Resultado de la extracción de campos
raw_text: Texto completo extraído
processing_time: Tiempo total de procesamiento

Returns:
structured_json: JSON estructurado y validado
"""
extracted_fields = extraction_result.get('extracted_fields', {})

# Separar campos por categorías
categorized_fields = self._categorize_fields(extracted_fields)

# Crear modelos Pydantic
personal_info = PersonalInfo(**categorized_fields['personal_info'])
contact_info = ContactInfo(**categorized_fields['contact_info'])
additional_info = AdditionalInfo(**categorized_fields['additional_info'])

# Crear metadata
metadata = ExtractionMetadata(
document_type=extraction_result.get('extraction_metadata', {}).get('document_type', 'unknown'),
processing_time=processing_time,
overall_confidence=extraction_result.get('overall_confidence', 0.0),
field_confidence=extraction_result.get('field_confidence', {}),
missing_required_fields=extraction_result.get('missing_required_fields', []),
ocr_stats={
'total_blocks': extraction_result.get('total_blocks', 0),
'average_confidence': extraction_result.get('average_confidence', 0.0),
'extraction_method': extraction_result.get('extraction_metadata', {}).get('extraction_method', 'unknown')
}
)

# Crear documento estructurado
structured_doc = StructuredDocument(
personal_info=personal_info,
contact_info=contact_info,
additional_info=additional_info,
metadata=metadata,
raw_text=raw_text
)

# Convertir a diccionario JSON
json_dict = structured_doc.dict(exclude_none=True)

# Agregar información de calidad
json_dict['quality_assessment'] = self._assess_extraction_quality(
structured_doc, extraction_result
)

return json_dict

def _categorize_fields(self, extracted_fields: Dict[str, str]) -> Dict[str, Dict]:
"""
Categoriza los campos extraídos según el modelo de datos
"""
categorized = {
'personal_info': {},
'contact_info': {},
'additional_info': {}
}

for field_name, field_value in extracted_fields.items():
# Buscar en qué categoría pertenece el campo
for category, fields in self.field_mappings.items():
if field_name in fields:
categorized[category][field_name] = field_value
break
else:
# Si no se encuentra, agregarlo a additional_info
categorized['additional_info'][field_name] = field_value

return categorized

def _assess_extraction_quality(self, structured_doc: StructuredDocument,
extraction_result: Dict) -> Dict:
"""
Evalúa la calidad general de la extracción
"""
# Contar campos completados
total_possible_fields = len(self.field_mappings['personal_info']) + \
len(self.field_mappings['contact_info']) + \
len(self.field_mappings['additional_info'])

completed_fields = 0
if structured_doc.personal_info.dict(exclude_none=True):
completed_fields += len(structured_doc.personal_info.dict(exclude_none=True))
if structured_doc.contact_info.dict(exclude_none=True):
completed_fields += len(structured_doc.contact_info.dict(exclude_none=True))
if structured_doc.additional_info.dict(exclude_none=True):
completed_fields += len(structured_doc.additional_info.dict(exclude_none=True))

completeness_ratio = completed_fields / total_possible_fields if total_possible_fields > 0 else 0

# Evaluar confianza promedio
field_confidences = list(structured_doc.metadata.field_confidence.values())
avg_confidence = np.mean(field_confidences) if field_confidences else 0.0

# Evaluar presencia de campos críticos
critical_fields = ['nombres', 'apellidos', 'cedula_numero']
critical_present = sum(1 for field in critical_fields
if getattr(structured_doc.personal_info, field, None))
critical_ratio = critical_present / len(critical_fields)

# Calcular score general de calidad
quality_score = (completeness_ratio * 0.4 + avg_confidence * 0.4 + critical_ratio * 0.2)

# Determinar nivel de calidad
if quality_score >= 0.8:
quality_level = "excellent"
elif quality_score >= 0.6:
quality_level = "good"
elif quality_score >= 0.4:
quality_level = "fair"
else:
quality_level = "poor"

return {
'quality_score': round(quality_score, 3),
'quality_level': quality_level,
'completeness_ratio': round(completeness_ratio, 3),
'average_confidence': round(avg_confidence, 3),
'critical_fields_ratio': round(critical_ratio, 3),
'total_fields_extracted': completed_fields,
'missing_critical_fields': [
field for field in critical_fields
if not getattr(structured_doc.personal_info, field, None)
],
'recommendations': self._generate_quality_recommendations(
quality_level, structured_doc.metadata.missing_required_fields
)
}

def _generate_quality_recommendations(self, quality_level: str,
missing_fields: List[str]) -> List[str]:
"""
Genera recomendaciones basadas en la calidad de extracción
"""
recommendations = []

if quality_level == "poor":
recommendations.append("Verificar calidad de la imagen de entrada")
recommendations.append("Considerar reprocesar con diferentes parámetros de OCR")
elif quality_level == "fair":
recommendations.append("Validar manualmente los campos extraídos")
recommendations.append("Verificar campos con baja confianza")

if missing_fields:
recommendations.append(f"Revisar campos faltantes: {', '.join(missing_fields)}")

if not recommendations:
recommendations.append("Extracción de alta calidad - proceder con confianza")

return recommendations

API Endpoints

POST /extract

Extrae texto usando OCR especializado.

Request:

curl -X POST "http://localhost:8004/extract" \
-H "Content-Type: multipart/form-data" \
-F "file=@document.jpg" \
-F "document_type=ficha_residencia" \
-F "confidence_threshold=0.3"

Response:

{
"success": true,
"processing_time": 3.45,
"document_type": "ficha_residencia",
"text_extraction": {
"success": true,
"text_blocks": [
{
"text": "NOMBRES: JUAN CARLOS",
"confidence": 0.94,
"bbox": {"x1": 150, "y1": 200, "x2": 400, "y2": 230},
"coordinates": [[150, 200], [400, 200], [400, 230], [150, 230]]
}
],
"full_text": "NOMBRES: JUAN CARLOS\nAPELLIDOS: RODRIGUEZ MARTINEZ\n...",
"average_confidence": 0.89,
"total_blocks": 15
}
}

POST /extract_json

Endpoint completo que extrae texto y estructura JSON.

Request:

curl -X POST "http://localhost:8004/extract_json" \
-H "Content-Type: multipart/form-data" \
-F "file=@ficha_residencia.jpg" \
-F "document_type=ficha_residencia"

Response:

{
"success": true,
"processing_time": 4.23,
"structured_document": {
"personal_info": {
"nombres": "JUAN CARLOS",
"apellidos": "RODRIGUEZ MARTINEZ",
"nombre_completo": "JUAN CARLOS RODRIGUEZ MARTINEZ",
"cedula_numero": "12.345.678-9",
"fecha_nacimiento": "15/08/1985"
},
"contact_info": {
"direccion": "CALLE 123 #45-67 BARRIO CENTRO",
"telefono": "+57 300 123 4567",
"email": "juan.rodriguez@email.com"
},
"additional_info": {
"estado_civil": "CASADO"
},
"metadata": {
"document_type": "ficha_residencia",
"extraction_timestamp": "2024-01-15T14:30:22.123Z",
"processing_time": 4.23,
"overall_confidence": 0.87,
"field_confidence": {
"nombres": 0.94,
"apellidos": 0.91,
"cedula_numero": 0.89,
"direccion": 0.82
},
"missing_required_fields": [],
"ocr_stats": {
"total_blocks": 15,
"average_confidence": 0.89,
"extraction_method": "pattern_matching"
}
},
"quality_assessment": {
"quality_score": 0.87,
"quality_level": "excellent",
"completeness_ratio": 0.85,
"average_confidence": 0.87,
"critical_fields_ratio": 1.0,
"total_fields_extracted": 7,
"missing_critical_fields": [],
"recommendations": [
"Extracción de alta calidad - proceder con confianza"
]
}
}
}

GET /ocr_configs

Lista las configuraciones de OCR disponibles por tipo de documento.

Response:

{
"available_configs": {
"ficha_residencia": {
"lang": "es",
"det_db_thresh": 0.3,
"optimized_for": "mixed_text_handwritten"
},
"cedula_identidad": {
"lang": "es",
"det_db_thresh": 0.2,
"optimized_for": "small_printed_text"
},
"pasaporte": {
"lang": "en",
"det_db_thresh": 0.3,
"optimized_for": "machine_readable_text"
}
},
"cached_instances": 3,
"total_extractions": 1247
}

Configuración y Optimización

Variables de Entorno

# Configuración del servicio
OCR_EXTRACTOR_PORT=8004
OCR_EXTRACTOR_HOST=0.0.0.0
OCR_EXTRACTOR_WORKERS=2

# Configuración de PaddleOCR
PADDLE_OCR_CACHE_DIR=/app/data/models/paddle
DEFAULT_OCR_LANG=es
DEFAULT_CONFIDENCE_THRESHOLD=0.3

# Configuración de extracción de campos
ENABLE_NLP_FALLBACK=true
SPACY_MODEL=es_core_news_sm
FIELD_EXTRACTION_TIMEOUT=30

# Configuración de performance
MAX_IMAGE_SIZE=4096
OCR_BATCH_SIZE=1
ENABLE_GPU_OCR=true

Optimización de Memoria

class OCRInstanceManager:
"""Gestión optimizada de instancias OCR para evitar memory leaks"""

def __init__(self, max_instances=5):
self.max_instances = max_instances
self.instances = {}
self.usage_count = {}
self.last_used = {}

def get_instance(self, config_key):
"""Obtiene instancia con manejo de memoria"""
current_time = time.time()

if config_key in self.instances:
self.last_used[config_key] = current_time
self.usage_count[config_key] += 1
return self.instances[config_key]

# Limpiar instancias viejas si se alcanza el límite
if len(self.instances) >= self.max_instances:
self._cleanup_old_instances()

# Crear nueva instancia
config = self._get_config_for_key(config_key)
instance = PaddleOCR(**config)

self.instances[config_key] = instance
self.usage_count[config_key] = 1
self.last_used[config_key] = current_time

return instance

def _cleanup_old_instances(self):
"""Elimina instancias menos usadas y más antiguas"""
if not self.instances:
return

# Encontrar la instancia menos usada recientemente
oldest_key = min(self.last_used.keys(), key=lambda k: self.last_used[k])

# Eliminar instancia
del self.instances[oldest_key]
del self.usage_count[oldest_key]
del self.last_used[oldest_key]

# Forzar garbage collection
import gc
gc.collect()

Las Fases 5-6 completan el pipeline IRIS transformando imágenes de documentos en estructuras JSON utilizables, combinando OCR especializado con extracción inteligente de campos para obtener información estructurada de alta calidad.