Skip to main content

API Integration Guide

This comprehensive guide covers how to integrate IRIS OCR into your applications, including detailed API references, authentication, error handling, and best practices.

API Overview

IRIS provides both high-level and granular APIs:

  • Complete Pipeline API: Process documents end-to-end in one call
  • Phase-by-Phase APIs: Control each processing phase individually
  • Utility APIs: Health checks, service status, and configuration

Base URL and Versioning

Production: https://api.iris-ocr.com/v1
Development: http://localhost:8000

Authentication

API Key Authentication (Production)

# Include API key in header
curl -X POST "https://api.iris-ocr.com/v1/process" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: multipart/form-data" \
-F "file=@document.jpg"

No Authentication (Development)

Local development doesn't require authentication:

curl -X POST "http://localhost:8000/process" \
-F "file=@document.jpg"

Complete Pipeline API

POST /process

Process a document through the complete 6-phase pipeline.

Request Parameters

ParameterTypeRequiredDescription
fileFile✅ YesImage file (JPG, PNG, PDF)
document_typeString❌ NoDocument type hint (cedula_identidad, ficha_residencia, pasaporte)
apply_unwarpingBoolean❌ NoApply geometric correction (default: true)
enhance_qualityBoolean❌ NoApply image enhancement (default: true)
ocr_confidenceFloat❌ NoOCR confidence threshold 0.0-1.0 (default: 0.3)
perform_clusteringBoolean❌ NoInclude clustering analysis (default: false)

Example Requests

Basic Processing:

import requests

def process_document(image_path):
url = "http://localhost:8000/process"

with open(image_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)

return response.json()

result = process_document("cedula.jpg")
print(f"Success: {result['success']}")
print(f"Document Type: {result['document_type']}")

Advanced Processing:

import requests

def process_document_advanced(image_path, document_type=None, confidence=0.3):
url = "http://localhost:8000/process"

files = {'file': open(image_path, 'rb')}
data = {
'apply_unwarping': True,
'enhance_quality': True,
'ocr_confidence': confidence,
'perform_clustering': False
}

if document_type:
data['document_type'] = document_type

try:
response = requests.post(url, files=files, data=data, timeout=60)
response.raise_for_status()
return response.json()
finally:
files['file'].close()

# Process with specific document type
result = process_document_advanced(
"ficha_residencia.jpg",
document_type="ficha_residencia",
confidence=0.2
)

Response Format

{
"success": true,
"pipeline_id": "12345678-1234-5678-9012-123456789012",
"processing_time": 8.45,
"document_type": "ficha_residencia",

"final_extraction": {
"personal_info": {
"nombres": "JUAN CARLOS",
"apellidos": "RODRIGUEZ MARTINEZ",
"nombre_completo": "JUAN CARLOS RODRIGUEZ MARTINEZ",
"cedula_numero": "12.345.678-9",
"fecha_nacimiento": "15/08/1985"
},
"contact_info": {
"direccion": "CALLE 123 #45-67 BARRIO CENTRO",
"telefono": "+57 300 123 4567",
"email": "juan.rodriguez@email.com"
},
"additional_info": {
"estado_civil": "CASADO",
"ocupacion": "INGENIERO",
"nacionalidad": "COLOMBIANA"
},
"metadata": {
"document_type": "ficha_residencia",
"extraction_timestamp": "2024-01-15T14:30:22.123Z",
"processing_time": 4.23,
"overall_confidence": 0.87,
"field_confidence": {
"nombres": 0.94,
"apellidos": 0.91,
"cedula_numero": 0.89,
"direccion": 0.82,
"telefono": 0.76,
"email": 0.85
},
"missing_required_fields": [],
"quality_assessment": {
"quality_score": 0.87,
"quality_level": "excellent",
"completeness_ratio": 0.85,
"recommendations": [
"Extracción de alta calidad - proceder con confianza"
]
}
}
},

"phases": {
"phase_1": {
"status": "completed",
"processing_time": 1.2,
"result": {
"unwarping_applied": true,
"enhancement_applied": true,
"output_dimensions": [2048, 1536]
}
},
"phase_2": {
"status": "completed",
"processing_time": 2.1,
"result": {
"embedding_dimension": 768,
"clustering_performed": false
}
},
"phase_3_4": {
"status": "completed",
"processing_time": 1.8,
"result": {
"predicted_class": "ficha_residencia",
"confidence": 0.94,
"top_3_predictions": [
{"class": "ficha_residencia", "confidence": 0.94},
{"class": "cedula_identidad", "confidence": 0.04},
{"class": "pasaporte", "confidence": 0.02}
]
}
},
"phase_5_6": {
"status": "completed",
"processing_time": 3.35,
"result": {
"text_blocks_found": 15,
"average_ocr_confidence": 0.89,
"fields_extracted": 8
}
}
},

"pipeline_summary": {
"phases_completed": 4,
"total_phases": 4,
"average_phase_time": 2.11,
"bottleneck_phase": "phase_5_6"
}
}

Phase-by-Phase APIs

For applications requiring more granular control, you can call each phase individually.

Phase 1: Image Processing

POST /image-processor/process

Preprocess and correct document images.

import requests

def phase1_image_processing(image_path):
url = "http://localhost:8001/process"

with open(image_path, 'rb') as f:
files = {'file': f}
data = {
'apply_unwarping': True,
'enhance_quality': True,
'target_dpi': 300
}
response = requests.post(url, files=files, data=data)

return response.json()

# Process image
result = phase1_image_processing("document.jpg")
processed_image_data = result['processed_image_data']

Phase 2: Embeddings and Clustering

POST /ml-embeddings/embed

Generate embeddings for document images.

def phase2_generate_embeddings(image_path):
url = "http://localhost:8002/embed"

with open(image_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)

return response.json()

# Generate embeddings
result = phase2_generate_embeddings("processed_image.jpg")
embedding_vector = result['embedding'] # 768-dimensional vector

POST /ml-embeddings/cluster

Perform clustering analysis on multiple embeddings.

def phase2_clustering(embeddings_list, k=3):
url = "http://localhost:8002/cluster"

data = {
'embeddings': embeddings_list,
'k': k,
'method': 'kmeans'
}

response = requests.post(url, json=data)
return response.json()

# Cluster multiple embeddings
cluster_result = phase2_clustering([embedding1, embedding2, embedding3])
cluster_labels = cluster_result['cluster_labels']

Phase 3-4: Document Classification

POST /ml-classifier/classify

Classify document type using trained models.

def phase3_4_classification(image_path):
url = "http://localhost:8003/classify"

with open(image_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)

return response.json()

# Classify document
result = phase3_4_classification("processed_image.jpg")
document_type = result['predicted_class']
confidence = result['confidence']

Phase 5-6: OCR and JSON Extraction

POST /ocr-extractor/extract

Extract text using specialized OCR.

def phase5_ocr_extraction(image_path, document_type):
url = "http://localhost:8004/extract"

with open(image_path, 'rb') as f:
files = {'file': f}
data = {
'document_type': document_type,
'confidence_threshold': 0.3
}
response = requests.post(url, files=files, data=data)

return response.json()

# Extract text
ocr_result = phase5_ocr_extraction("processed_image.jpg", "ficha_residencia")
text_blocks = ocr_result['text_blocks']

POST /ocr-extractor/extract_json

Complete OCR with structured JSON extraction.

def phase6_json_extraction(image_path, document_type):
url = "http://localhost:8004/extract_json"

with open(image_path, 'rb') as f:
files = {'file': f}
data = {'document_type': document_type}
response = requests.post(url, files=files, data=data)

return response.json()

# Extract structured data
result = phase6_json_extraction("processed_image.jpg", "ficha_residencia")
structured_data = result['structured_document']

Error Handling

Error Response Format

{
"success": false,
"error": {
"code": "IMAGE_PROCESSING_FAILED",
"message": "Failed to process image",
"details": "Image format not supported or corrupted",
"retryable": true,
"timestamp": "2024-01-15T14:30:22.123Z"
},
"pipeline_id": "12345678-1234-5678-9012-123456789012"
}

Error Codes Reference

Error CodeDescriptionRetryableAction
IMAGE_PROCESSING_FAILEDImage preprocessing failed✅ YesCheck image format/quality
EMBEDDING_GENERATION_FAILEDML embedding failed✅ YesRetry with different image
CLASSIFICATION_FAILEDDocument classification failed❌ NoCheck if document type is supported
OCR_EXTRACTION_FAILEDOCR text extraction failed✅ YesTry with better quality image
SERVICE_UNAVAILABLEMicroservice temporarily down✅ YesRetry after brief delay
VALIDATION_ERRORInput validation failed❌ NoFix request parameters
FILE_TOO_LARGEImage file exceeds size limit❌ NoReduce image file size
UNSUPPORTED_FORMATFile format not supported❌ NoConvert to JPG/PNG
RATE_LIMIT_EXCEEDEDToo many requests✅ YesWait and retry

Implementing Robust Error Handling

import time
import requests
from typing import Dict, Any, Optional

class IRISClient:
def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()

if api_key:
self.session.headers.update({"Authorization": f"Bearer {api_key}"})

def process_document(self, image_path: str, max_retries: int = 3, **kwargs) -> Dict[str, Any]:
"""
Process document with automatic retry for transient errors
"""
for attempt in range(max_retries):
try:
with open(image_path, 'rb') as f:
files = {'file': f}
response = self.session.post(
f"{self.base_url}/process",
files=files,
data=kwargs,
timeout=60
)

if response.status_code == 200:
result = response.json()
if result['success']:
return result
elif self._is_retryable_error(result):
print(f"Retryable error (attempt {attempt + 1}): {result['error']['message']}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
raise IRISAPIError(result['error'])

elif response.status_code == 429: # Rate limit
if attempt < max_retries - 1:
time.sleep(5 * (attempt + 1))
continue
else:
raise IRISAPIError({"code": "RATE_LIMIT_EXCEEDED", "message": "Too many requests"})

elif response.status_code >= 500: # Server error
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
else:
raise IRISAPIError({"code": "SERVER_ERROR", "message": f"Server error: {response.status_code}"})

else:
# Client error - don't retry
try:
error_data = response.json()
except:
error_data = {"code": "HTTP_ERROR", "message": f"HTTP {response.status_code}"}
raise IRISAPIError(error_data)

except requests.RequestException as e:
if attempt < max_retries - 1:
print(f"Network error (attempt {attempt + 1}): {str(e)}")
time.sleep(2 ** attempt)
continue
else:
raise IRISAPIError({"code": "NETWORK_ERROR", "message": str(e)})

raise IRISAPIError({"code": "MAX_RETRIES_EXCEEDED", "message": "All retry attempts failed"})

def _is_retryable_error(self, result: Dict[str, Any]) -> bool:
"""Check if error is retryable based on error code"""
error_code = result.get('error', {}).get('code', '')
retryable_codes = {
'IMAGE_PROCESSING_FAILED',
'EMBEDDING_GENERATION_FAILED',
'OCR_EXTRACTION_FAILED',
'SERVICE_UNAVAILABLE'
}
return error_code in retryable_codes

class IRISAPIError(Exception):
def __init__(self, error_data: Dict[str, Any]):
self.code = error_data.get('code', 'UNKNOWN')
self.message = error_data.get('message', 'Unknown error')
self.details = error_data.get('details')
self.retryable = error_data.get('retryable', False)

super().__init__(f"{self.code}: {self.message}")

# Usage example
client = IRISClient("http://localhost:8000")

try:
result = client.process_document(
"my_document.jpg",
document_type="ficha_residencia",
ocr_confidence=0.3
)
print("Processing successful!")
print(f"Extracted name: {result['final_extraction']['personal_info']['nombre_completo']}")

except IRISAPIError as e:
print(f"API Error: {e}")
print(f"Error code: {e.code}")
print(f"Retryable: {e.retryable}")

Batch Processing

Processing Multiple Documents

import concurrent.futures
from typing import List, Dict, Any
import os

class IRISBatchProcessor:
def __init__(self, base_url: str = "http://localhost:8000", max_workers: int = 3):
self.client = IRISClient(base_url)
self.max_workers = max_workers

def process_batch(self, image_paths: List[str], **kwargs) -> Dict[str, Any]:
"""
Process multiple documents concurrently
"""
results = {}

def process_single_document(path: str) -> tuple:
try:
result = self.client.process_document(path, **kwargs)
return path, {"success": True, "data": result}
except IRISAPIError as e:
return path, {"success": False, "error": str(e), "code": e.code}
except Exception as e:
return path, {"success": False, "error": str(e), "code": "UNKNOWN"}

with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_path = {
executor.submit(process_single_document, path): path
for path in image_paths
}

# Collect results
for future in concurrent.futures.as_completed(future_to_path):
path, result = future.result()
results[path] = result

return self._generate_batch_summary(results)

def _generate_batch_summary(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary statistics for batch processing"""
total = len(results)
successful = sum(1 for r in results.values() if r['success'])
failed = total - successful

error_breakdown = {}
processing_times = []

for path, result in results.items():
if result['success']:
processing_times.append(result['data']['processing_time'])
else:
error_code = result.get('code', 'UNKNOWN')
error_breakdown[error_code] = error_breakdown.get(error_code, 0) + 1

return {
"summary": {
"total_documents": total,
"successful": successful,
"failed": failed,
"success_rate": round(successful / total * 100, 2) if total > 0 else 0,
"average_processing_time": round(sum(processing_times) / len(processing_times), 2) if processing_times else 0
},
"error_breakdown": error_breakdown,
"detailed_results": results
}

# Usage example
processor = IRISBatchProcessor(max_workers=5)

image_files = [
"documents/cedula_001.jpg",
"documents/ficha_002.jpg",
"documents/pasaporte_003.jpg"
]

batch_results = processor.process_batch(
image_files,
apply_unwarping=True,
ocr_confidence=0.3
)

print(f"Processed {batch_results['summary']['total_documents']} documents")
print(f"Success rate: {batch_results['summary']['success_rate']}%")
print(f"Average processing time: {batch_results['summary']['average_processing_time']}s")

Utility APIs

Health Checks

GET /health

Check API Gateway health.

def check_health():
response = requests.get("http://localhost:8000/health")
return response.json()

health = check_health()
print(f"Status: {health['status']}")

GET /services

Check all microservices health.

def check_all_services():
response = requests.get("http://localhost:8000/services")
return response.json()

services = check_all_services()
for service, status in services.items():
print(f"{service}: {status['status']}")

Pipeline Status

GET /pipeline/status/{pipeline_id}

Check the status of a running pipeline.

def check_pipeline_status(pipeline_id):
response = requests.get(f"http://localhost:8000/pipeline/status/{pipeline_id}")
return response.json()

status = check_pipeline_status("12345678-1234-5678-9012-123456789012")
print(f"Status: {status['status']}")
print(f"Current phase: {status['current_phase']}")

Production Considerations

Rate Limiting

Production environments implement rate limiting:

  • Free tier: 100 requests per hour
  • Basic tier: 1,000 requests per hour
  • Premium tier: 10,000 requests per hour

Image Size Limits

  • Maximum file size: 10MB
  • Recommended resolution: 1500-3000px on longer side
  • Supported formats: JPG, PNG, PDF (first page only)

Security Best Practices

# Always validate file types before upload
import mimetypes

def validate_image_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
allowed_types = ['image/jpeg', 'image/png', 'application/pdf']
return mime_type in allowed_types

# Use environment variables for API keys
import os
api_key = os.getenv('IRIS_API_KEY')

# Implement request timeouts
response = requests.post(url, files=files, timeout=60)

Monitoring and Logging

import logging
import time

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_with_logging(image_path):
start_time = time.time()

try:
logger.info(f"Starting processing for {image_path}")
result = client.process_document(image_path)

processing_time = time.time() - start_time
logger.info(f"Successfully processed {image_path} in {processing_time:.2f}s")
logger.info(f"Document type: {result['document_type']}")
logger.info(f"Quality level: {result['final_extraction']['metadata']['quality_assessment']['quality_level']}")

return result

except IRISAPIError as e:
logger.error(f"Failed to process {image_path}: {e}")
raise

SDK and Libraries

pip install iris-ocr-sdk
from iris_ocr import IRISClient

client = IRISClient(api_key="your-api-key")
result = client.process("document.jpg")

JavaScript/Node.js SDK

npm install iris-ocr-js
const IRIS = require('iris-ocr-js');

const client = new IRIS({apiKey: 'your-api-key'});
const result = await client.process('document.jpg');

Support and Resources

  • API Documentation: Complete reference at /docs endpoint
  • Postman Collection: Import from docs/postman/IRIS_API.json
  • Code Examples: Available in examples/ directory
  • GitHub Issues: Report integration issues
  • Developer Support: Contact support for integration help

This guide provides everything you need to integrate IRIS OCR into your applications. Start with the simple pipeline API and move to phase-by-phase processing as your requirements become more complex.