API Integration Guide
This comprehensive guide covers how to integrate IRIS OCR into your applications, including detailed API references, authentication, error handling, and best practices.
API Overview
IRIS provides both high-level and granular APIs:
- Complete Pipeline API: Process documents end-to-end in one call
- Phase-by-Phase APIs: Control each processing phase individually
- Utility APIs: Health checks, service status, and configuration
Base URL and Versioning
Production: https://api.iris-ocr.com/v1
Development: http://localhost:8000
Authentication
API Key Authentication (Production)
# Include API key in header
curl -X POST "https://api.iris-ocr.com/v1/process" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: multipart/form-data" \
-F "file=@document.jpg"
No Authentication (Development)
Local development doesn't require authentication:
curl -X POST "http://localhost:8000/process" \
-F "file=@document.jpg"
Complete Pipeline API
POST /process
Process a document through the complete 6-phase pipeline.
Request Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
file | File | ✅ Yes | Image file (JPG, PNG, PDF) |
document_type | String | ❌ No | Document type hint (cedula_identidad, ficha_residencia, pasaporte) |
apply_unwarping | Boolean | ❌ No | Apply geometric correction (default: true) |
enhance_quality | Boolean | ❌ No | Apply image enhancement (default: true) |
ocr_confidence | Float | ❌ No | OCR confidence threshold 0.0-1.0 (default: 0.3) |
perform_clustering | Boolean | ❌ No | Include clustering analysis (default: false) |
Example Requests
Basic Processing:
import requests
def process_document(image_path):
url = "http://localhost:8000/process"
with open(image_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
return response.json()
result = process_document("cedula.jpg")
print(f"Success: {result['success']}")
print(f"Document Type: {result['document_type']}")
Advanced Processing:
import requests
def process_document_advanced(image_path, document_type=None, confidence=0.3):
url = "http://localhost:8000/process"
files = {'file': open(image_path, 'rb')}
data = {
'apply_unwarping': True,
'enhance_quality': True,
'ocr_confidence': confidence,
'perform_clustering': False
}
if document_type:
data['document_type'] = document_type
try:
response = requests.post(url, files=files, data=data, timeout=60)
response.raise_for_status()
return response.json()
finally:
files['file'].close()
# Process with specific document type
result = process_document_advanced(
"ficha_residencia.jpg",
document_type="ficha_residencia",
confidence=0.2
)
Response Format
{
"success": true,
"pipeline_id": "12345678-1234-5678-9012-123456789012",
"processing_time": 8.45,
"document_type": "ficha_residencia",
"final_extraction": {
"personal_info": {
"nombres": "JUAN CARLOS",
"apellidos": "RODRIGUEZ MARTINEZ",
"nombre_completo": "JUAN CARLOS RODRIGUEZ MARTINEZ",
"cedula_numero": "12.345.678-9",
"fecha_nacimiento": "15/08/1985"
},
"contact_info": {
"direccion": "CALLE 123 #45-67 BARRIO CENTRO",
"telefono": "+57 300 123 4567",
"email": "juan.rodriguez@email.com"
},
"additional_info": {
"estado_civil": "CASADO",
"ocupacion": "INGENIERO",
"nacionalidad": "COLOMBIANA"
},
"metadata": {
"document_type": "ficha_residencia",
"extraction_timestamp": "2024-01-15T14:30:22.123Z",
"processing_time": 4.23,
"overall_confidence": 0.87,
"field_confidence": {
"nombres": 0.94,
"apellidos": 0.91,
"cedula_numero": 0.89,
"direccion": 0.82,
"telefono": 0.76,
"email": 0.85
},
"missing_required_fields": [],
"quality_assessment": {
"quality_score": 0.87,
"quality_level": "excellent",
"completeness_ratio": 0.85,
"recommendations": [
"Extracción de alta calidad - proceder con confianza"
]
}
}
},
"phases": {
"phase_1": {
"status": "completed",
"processing_time": 1.2,
"result": {
"unwarping_applied": true,
"enhancement_applied": true,
"output_dimensions": [2048, 1536]
}
},
"phase_2": {
"status": "completed",
"processing_time": 2.1,
"result": {
"embedding_dimension": 768,
"clustering_performed": false
}
},
"phase_3_4": {
"status": "completed",
"processing_time": 1.8,
"result": {
"predicted_class": "ficha_residencia",
"confidence": 0.94,
"top_3_predictions": [
{"class": "ficha_residencia", "confidence": 0.94},
{"class": "cedula_identidad", "confidence": 0.04},
{"class": "pasaporte", "confidence": 0.02}
]
}
},
"phase_5_6": {
"status": "completed",
"processing_time": 3.35,
"result": {
"text_blocks_found": 15,
"average_ocr_confidence": 0.89,
"fields_extracted": 8
}
}
},
"pipeline_summary": {
"phases_completed": 4,
"total_phases": 4,
"average_phase_time": 2.11,
"bottleneck_phase": "phase_5_6"
}
}
Phase-by-Phase APIs
For applications requiring more granular control, you can call each phase individually.
Phase 1: Image Processing
POST /image-processor/process
Preprocess and correct document images.
import requests
def phase1_image_processing(image_path):
url = "http://localhost:8001/process"
with open(image_path, 'rb') as f:
files = {'file': f}
data = {
'apply_unwarping': True,
'enhance_quality': True,
'target_dpi': 300
}
response = requests.post(url, files=files, data=data)
return response.json()
# Process image
result = phase1_image_processing("document.jpg")
processed_image_data = result['processed_image_data']
Phase 2: Embeddings and Clustering
POST /ml-embeddings/embed
Generate embeddings for document images.
def phase2_generate_embeddings(image_path):
url = "http://localhost:8002/embed"
with open(image_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
return response.json()
# Generate embeddings
result = phase2_generate_embeddings("processed_image.jpg")
embedding_vector = result['embedding'] # 768-dimensional vector
POST /ml-embeddings/cluster
Perform clustering analysis on multiple embeddings.
def phase2_clustering(embeddings_list, k=3):
url = "http://localhost:8002/cluster"
data = {
'embeddings': embeddings_list,
'k': k,
'method': 'kmeans'
}
response = requests.post(url, json=data)
return response.json()
# Cluster multiple embeddings
cluster_result = phase2_clustering([embedding1, embedding2, embedding3])
cluster_labels = cluster_result['cluster_labels']
Phase 3-4: Document Classification
POST /ml-classifier/classify
Classify document type using trained models.
def phase3_4_classification(image_path):
url = "http://localhost:8003/classify"
with open(image_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
return response.json()
# Classify document
result = phase3_4_classification("processed_image.jpg")
document_type = result['predicted_class']
confidence = result['confidence']
Phase 5-6: OCR and JSON Extraction
POST /ocr-extractor/extract
Extract text using specialized OCR.
def phase5_ocr_extraction(image_path, document_type):
url = "http://localhost:8004/extract"
with open(image_path, 'rb') as f:
files = {'file': f}
data = {
'document_type': document_type,
'confidence_threshold': 0.3
}
response = requests.post(url, files=files, data=data)
return response.json()
# Extract text
ocr_result = phase5_ocr_extraction("processed_image.jpg", "ficha_residencia")
text_blocks = ocr_result['text_blocks']
POST /ocr-extractor/extract_json
Complete OCR with structured JSON extraction.
def phase6_json_extraction(image_path, document_type):
url = "http://localhost:8004/extract_json"
with open(image_path, 'rb') as f:
files = {'file': f}
data = {'document_type': document_type}
response = requests.post(url, files=files, data=data)
return response.json()
# Extract structured data
result = phase6_json_extraction("processed_image.jpg", "ficha_residencia")
structured_data = result['structured_document']
Error Handling
Error Response Format
{
"success": false,
"error": {
"code": "IMAGE_PROCESSING_FAILED",
"message": "Failed to process image",
"details": "Image format not supported or corrupted",
"retryable": true,
"timestamp": "2024-01-15T14:30:22.123Z"
},
"pipeline_id": "12345678-1234-5678-9012-123456789012"
}
Error Codes Reference
| Error Code | Description | Retryable | Action |
|---|---|---|---|
IMAGE_PROCESSING_FAILED | Image preprocessing failed | ✅ Yes | Check image format/quality |
EMBEDDING_GENERATION_FAILED | ML embedding failed | ✅ Yes | Retry with different image |
CLASSIFICATION_FAILED | Document classification failed | ❌ No | Check if document type is supported |
OCR_EXTRACTION_FAILED | OCR text extraction failed | ✅ Yes | Try with better quality image |
SERVICE_UNAVAILABLE | Microservice temporarily down | ✅ Yes | Retry after brief delay |
VALIDATION_ERROR | Input validation failed | ❌ No | Fix request parameters |
FILE_TOO_LARGE | Image file exceeds size limit | ❌ No | Reduce image file size |
UNSUPPORTED_FORMAT | File format not supported | ❌ No | Convert to JPG/PNG |
RATE_LIMIT_EXCEEDED | Too many requests | ✅ Yes | Wait and retry |
Implementing Robust Error Handling
import time
import requests
from typing import Dict, Any, Optional
class IRISClient:
def __init__(self, base_url: str = "http://localhost:8000", api_key: Optional[str] = None):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
def process_document(self, image_path: str, max_retries: int = 3, **kwargs) -> Dict[str, Any]:
"""
Process document with automatic retry for transient errors
"""
for attempt in range(max_retries):
try:
with open(image_path, 'rb') as f:
files = {'file': f}
response = self.session.post(
f"{self.base_url}/process",
files=files,
data=kwargs,
timeout=60
)
if response.status_code == 200:
result = response.json()
if result['success']:
return result
elif self._is_retryable_error(result):
print(f"Retryable error (attempt {attempt + 1}): {result['error']['message']}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
raise IRISAPIError(result['error'])
elif response.status_code == 429: # Rate limit
if attempt < max_retries - 1:
time.sleep(5 * (attempt + 1))
continue
else:
raise IRISAPIError({"code": "RATE_LIMIT_EXCEEDED", "message": "Too many requests"})
elif response.status_code >= 500: # Server error
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
else:
raise IRISAPIError({"code": "SERVER_ERROR", "message": f"Server error: {response.status_code}"})
else:
# Client error - don't retry
try:
error_data = response.json()
except:
error_data = {"code": "HTTP_ERROR", "message": f"HTTP {response.status_code}"}
raise IRISAPIError(error_data)
except requests.RequestException as e:
if attempt < max_retries - 1:
print(f"Network error (attempt {attempt + 1}): {str(e)}")
time.sleep(2 ** attempt)
continue
else:
raise IRISAPIError({"code": "NETWORK_ERROR", "message": str(e)})
raise IRISAPIError({"code": "MAX_RETRIES_EXCEEDED", "message": "All retry attempts failed"})
def _is_retryable_error(self, result: Dict[str, Any]) -> bool:
"""Check if error is retryable based on error code"""
error_code = result.get('error', {}).get('code', '')
retryable_codes = {
'IMAGE_PROCESSING_FAILED',
'EMBEDDING_GENERATION_FAILED',
'OCR_EXTRACTION_FAILED',
'SERVICE_UNAVAILABLE'
}
return error_code in retryable_codes
class IRISAPIError(Exception):
def __init__(self, error_data: Dict[str, Any]):
self.code = error_data.get('code', 'UNKNOWN')
self.message = error_data.get('message', 'Unknown error')
self.details = error_data.get('details')
self.retryable = error_data.get('retryable', False)
super().__init__(f"{self.code}: {self.message}")
# Usage example
client = IRISClient("http://localhost:8000")
try:
result = client.process_document(
"my_document.jpg",
document_type="ficha_residencia",
ocr_confidence=0.3
)
print("Processing successful!")
print(f"Extracted name: {result['final_extraction']['personal_info']['nombre_completo']}")
except IRISAPIError as e:
print(f"API Error: {e}")
print(f"Error code: {e.code}")
print(f"Retryable: {e.retryable}")
Batch Processing
Processing Multiple Documents
import concurrent.futures
from typing import List, Dict, Any
import os
class IRISBatchProcessor:
def __init__(self, base_url: str = "http://localhost:8000", max_workers: int = 3):
self.client = IRISClient(base_url)
self.max_workers = max_workers
def process_batch(self, image_paths: List[str], **kwargs) -> Dict[str, Any]:
"""
Process multiple documents concurrently
"""
results = {}
def process_single_document(path: str) -> tuple:
try:
result = self.client.process_document(path, **kwargs)
return path, {"success": True, "data": result}
except IRISAPIError as e:
return path, {"success": False, "error": str(e), "code": e.code}
except Exception as e:
return path, {"success": False, "error": str(e), "code": "UNKNOWN"}
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_path = {
executor.submit(process_single_document, path): path
for path in image_paths
}
# Collect results
for future in concurrent.futures.as_completed(future_to_path):
path, result = future.result()
results[path] = result
return self._generate_batch_summary(results)
def _generate_batch_summary(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary statistics for batch processing"""
total = len(results)
successful = sum(1 for r in results.values() if r['success'])
failed = total - successful
error_breakdown = {}
processing_times = []
for path, result in results.items():
if result['success']:
processing_times.append(result['data']['processing_time'])
else:
error_code = result.get('code', 'UNKNOWN')
error_breakdown[error_code] = error_breakdown.get(error_code, 0) + 1
return {
"summary": {
"total_documents": total,
"successful": successful,
"failed": failed,
"success_rate": round(successful / total * 100, 2) if total > 0 else 0,
"average_processing_time": round(sum(processing_times) / len(processing_times), 2) if processing_times else 0
},
"error_breakdown": error_breakdown,
"detailed_results": results
}
# Usage example
processor = IRISBatchProcessor(max_workers=5)
image_files = [
"documents/cedula_001.jpg",
"documents/ficha_002.jpg",
"documents/pasaporte_003.jpg"
]
batch_results = processor.process_batch(
image_files,
apply_unwarping=True,
ocr_confidence=0.3
)
print(f"Processed {batch_results['summary']['total_documents']} documents")
print(f"Success rate: {batch_results['summary']['success_rate']}%")
print(f"Average processing time: {batch_results['summary']['average_processing_time']}s")
Utility APIs
Health Checks
GET /health
Check API Gateway health.
def check_health():
response = requests.get("http://localhost:8000/health")
return response.json()
health = check_health()
print(f"Status: {health['status']}")
GET /services
Check all microservices health.
def check_all_services():
response = requests.get("http://localhost:8000/services")
return response.json()
services = check_all_services()
for service, status in services.items():
print(f"{service}: {status['status']}")
Pipeline Status
GET /pipeline/status/{pipeline_id}
Check the status of a running pipeline.
def check_pipeline_status(pipeline_id):
response = requests.get(f"http://localhost:8000/pipeline/status/{pipeline_id}")
return response.json()
status = check_pipeline_status("12345678-1234-5678-9012-123456789012")
print(f"Status: {status['status']}")
print(f"Current phase: {status['current_phase']}")
Production Considerations
Rate Limiting
Production environments implement rate limiting:
- Free tier: 100 requests per hour
- Basic tier: 1,000 requests per hour
- Premium tier: 10,000 requests per hour
Image Size Limits
- Maximum file size: 10MB
- Recommended resolution: 1500-3000px on longer side
- Supported formats: JPG, PNG, PDF (first page only)
Security Best Practices
# Always validate file types before upload
import mimetypes
def validate_image_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
allowed_types = ['image/jpeg', 'image/png', 'application/pdf']
return mime_type in allowed_types
# Use environment variables for API keys
import os
api_key = os.getenv('IRIS_API_KEY')
# Implement request timeouts
response = requests.post(url, files=files, timeout=60)
Monitoring and Logging
import logging
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_with_logging(image_path):
start_time = time.time()
try:
logger.info(f"Starting processing for {image_path}")
result = client.process_document(image_path)
processing_time = time.time() - start_time
logger.info(f"Successfully processed {image_path} in {processing_time:.2f}s")
logger.info(f"Document type: {result['document_type']}")
logger.info(f"Quality level: {result['final_extraction']['metadata']['quality_assessment']['quality_level']}")
return result
except IRISAPIError as e:
logger.error(f"Failed to process {image_path}: {e}")
raise
SDK and Libraries
Python SDK (Recommended)
pip install iris-ocr-sdk
from iris_ocr import IRISClient
client = IRISClient(api_key="your-api-key")
result = client.process("document.jpg")
JavaScript/Node.js SDK
npm install iris-ocr-js
const IRIS = require('iris-ocr-js');
const client = new IRIS({apiKey: 'your-api-key'});
const result = await client.process('document.jpg');
Support and Resources
- API Documentation: Complete reference at
/docsendpoint - Postman Collection: Import from
docs/postman/IRIS_API.json - Code Examples: Available in
examples/directory - GitHub Issues: Report integration issues
- Developer Support: Contact support for integration help
This guide provides everything you need to integrate IRIS OCR into your applications. Start with the simple pipeline API and move to phase-by-phase processing as your requirements become more complex.