Saltar al contenido principal

Performance Optimization and Monitoring

This comprehensive guide covers performance optimization strategies, monitoring implementation, and troubleshooting procedures for the IRIS OCR platform to ensure optimal performance and resource utilization.

Performance Architecture Overview

Application Performance Optimization

FastAPI Performance Tuning

# packages/api-gateway/app.py - Optimized FastAPI Configuration
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
import uvicorn
import asyncio
import aioredis
from contextlib import asynccontextmanager
import time
import httpx
from prometheus_client import Counter, Histogram, Gauge
import logging

# Performance Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Active connections')
ML_PROCESSING_TIME = Histogram('ml_processing_duration_seconds', 'ML processing time', ['service'])

class PerformanceMiddleware:
"""Custom middleware for performance monitoring and optimization"""

def __init__(self, app: FastAPI):
self.app = app

async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return

start_time = time.time()
request = Request(scope, receive)

# Track active connections
ACTIVE_CONNECTIONS.inc()

try:
response = await self.app(scope, receive, send)

# Record metrics
duration = time.time() - start_time
REQUEST_DURATION.observe(duration)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=getattr(response, 'status_code', 'unknown')
).inc()

finally:
ACTIVE_CONNECTIONS.dec()

class OptimizedConnectionPool:
"""Optimized HTTP connection pool for microservice communication"""

def __init__(self):
# Connection pool with optimized settings
self.limits = httpx.Limits(
max_keepalive_connections=50,
max_connections=100,
keepalive_expiry=30.0
)

# Timeout configuration for different operations
self.timeout = httpx.Timeout(
connect=5.0, # Connection timeout
read=30.0, # Read timeout for normal operations
write=10.0, # Write timeout
pool=5.0 # Pool timeout
)

# ML operations need longer timeouts
self.ml_timeout = httpx.Timeout(
connect=10.0,
read=300.0, # 5 minutes for ML processing
write=30.0,
pool=10.0
)

# Create client with optimizations
self.client = httpx.AsyncClient(
limits=self.limits,
timeout=self.timeout,
http2=True, # Enable HTTP/2
verify=False # For internal services (use proper certs in production)
)

self.ml_client = httpx.AsyncClient(
limits=self.limits,
timeout=self.ml_timeout,
http2=True,
verify=False
)

async def make_request(self, method: str, url: str, is_ml: bool = False, **kwargs):
"""Make optimized HTTP request with retry logic"""
client = self.ml_client if is_ml else self.client
max_retries = 3

for attempt in range(max_retries):
try:
response = await client.request(method, url, **kwargs)
response.raise_for_status()
return response

except httpx.TimeoutException:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff

except httpx.HTTPStatusError as e:
if e.response.status_code in [500, 502, 503, 504] and attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise

# Global connection pool instance
connection_pool = OptimizedConnectionPool()

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Optimized application lifespan management"""
# Startup optimizations
print("🚀 Starting IRIS API Gateway with performance optimizations...")

# Pre-warm ML services
await pre_warm_ml_services()

# Initialize connection pools
await init_connection_pools()

# Setup caching
await setup_redis_cache()

yield

# Cleanup
await connection_pool.client.aclose()
await connection_pool.ml_client.aclose()
print("🛑 API Gateway shutdown complete")

# Optimized FastAPI application
app = FastAPI(
title="IRIS OCR API Gateway",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs" if settings.DEBUG else None, # Disable docs in production
redoc_url=None # Disable redoc
)

# Performance middleware
app.add_middleware(PerformanceMiddleware)

# Compression middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)

# CORS with optimizations
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
max_age=3600 # Cache preflight requests
)

# Connection pool initialization
async def init_connection_pools():
"""Initialize and warm up connection pools"""
# Test connectivity to all services
services = [
("http://image-processor:8001/health", False),
("http://ml-embeddings:8002/health", True),
("http://ml-classifier:8003/health", True),
("http://ocr-extractor:8004/health", False)
]

for url, is_ml in services:
try:
await connection_pool.make_request("GET", url, is_ml=is_ml)
print(f"✅ Connected to {url}")
except Exception as e:
print(f"❌ Failed to connect to {url}: {e}")

# Redis caching setup
redis_client = None

async def setup_redis_cache():
"""Setup Redis caching with connection pooling"""
global redis_client

redis_client = aioredis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
max_connections=20,
retry_on_timeout=True
)

# Test connection
try:
await redis_client.ping()
print("✅ Redis cache connected")
except Exception as e:
print(f"❌ Redis connection failed: {e}")
redis_client = None

# Caching utilities
async def get_cached_result(cache_key: str):
"""Get result from cache"""
if not redis_client:
return None

try:
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
except Exception as e:
print(f"Cache get error: {e}")

return None

async def set_cached_result(cache_key: str, result: dict, ttl: int = 3600):
"""Set result in cache"""
if not redis_client:
return

try:
await redis_client.setex(
cache_key,
ttl,
json.dumps(result, default=str)
)
except Exception as e:
print(f"Cache set error: {e}")

# ML service pre-warming
async def pre_warm_ml_services():
"""Pre-warm ML services to reduce cold start latency"""
print("🔥 Pre-warming ML services...")

# Create small dummy image for warming
dummy_image = create_dummy_image()

# Warm up ML services
warmup_tasks = [
warm_up_embeddings_service(dummy_image),
warm_up_classifier_service(dummy_image),
warm_up_ocr_service(dummy_image)
]

await asyncio.gather(*warmup_tasks, return_exceptions=True)
print("🔥 ML services pre-warming complete")

async def warm_up_embeddings_service(dummy_image):
"""Warm up embeddings service"""
try:
files = {"file": ("dummy.jpg", dummy_image, "image/jpeg")}
await connection_pool.make_request(
"POST",
"http://ml-embeddings:8002/embed",
files=files,
is_ml=True
)
print("✅ Embeddings service warmed up")
except Exception as e:
print(f"⚠️ Embeddings warmup failed: {e}")

def create_dummy_image():
"""Create small dummy image for service warmup"""
import io
from PIL import Image

# Create 100x100 white image
img = Image.new('RGB', (100, 100), color='white')
buffer = io.BytesIO()
img.save(buffer, format='JPEG')
buffer.seek(0)
return buffer.getvalue()

Machine Learning Performance Optimization

# packages/ml-classifier/optimizations.py
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
from typing import List, Dict, Optional
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import queue

class OptimizedInferenceEngine:
"""Optimized inference engine for document classification"""

def __init__(self, model_path: str, device: str = 'cuda', batch_size: int = 8):
self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
self.batch_size = batch_size
self.model = self._load_optimized_model(model_path)
self.transform = self._create_optimized_transforms()

# Model optimization
self._optimize_model()

# Batch processing queue
self.batch_queue = queue.Queue(maxsize=100)
self.result_queues = {}
self.batch_processor_thread = None
self._start_batch_processor()

def _load_optimized_model(self, model_path: str) -> nn.Module:
"""Load and optimize model for inference"""
# Load model
model = torch.load(model_path, map_location=self.device)
model.eval()

# Model optimizations
if hasattr(torch, 'jit'):
# TorchScript compilation for faster inference
model = torch.jit.script(model)

return model

def _optimize_model(self):
"""Apply various model optimizations"""
# Enable inference mode
torch.inference_mode(True)

# Optimize for inference
if hasattr(torch.backends, 'cudnn'):
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = False

# Mixed precision inference (if supported)
if self.device.type == 'cuda':
self.use_amp = True
self.scaler = torch.cuda.amp.GradScaler()
else:
self.use_amp = False

def _create_optimized_transforms(self) -> transforms.Compose:
"""Create optimized image transforms"""
return transforms.Compose([
transforms.Resize((224, 224), antialias=True),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])

def _start_batch_processor(self):
"""Start background batch processor"""
def batch_worker():
batch_buffer = []
request_ids = []

while True:
try:
# Collect items for batch
while len(batch_buffer) < self.batch_size:
try:
item = self.batch_queue.get(timeout=0.1)
if item is None: # Shutdown signal
return

batch_buffer.append(item['image'])
request_ids.append(item['request_id'])
except queue.Empty:
if batch_buffer: # Process partial batch
break

if batch_buffer:
# Process batch
results = self._process_batch(batch_buffer)

# Return results to respective queues
for request_id, result in zip(request_ids, results):
if request_id in self.result_queues:
self.result_queues[request_id].put(result)

# Clear batch
batch_buffer.clear()
request_ids.clear()

except Exception as e:
print(f"Batch processing error: {e}")

self.batch_processor_thread = threading.Thread(target=batch_worker)
self.batch_processor_thread.daemon = True
self.batch_processor_thread.start()

def _process_batch(self, images: List[torch.Tensor]) -> List[Dict]:
"""Process batch of images efficiently"""
try:
# Stack images into batch tensor
batch_tensor = torch.stack(images).to(self.device)

with torch.no_grad():
if self.use_amp:
with torch.cuda.amp.autocast():
outputs = self.model(batch_tensor)
else:
outputs = self.model(batch_tensor)

# Get predictions
probabilities = torch.softmax(outputs, dim=1)
predictions = torch.argmax(probabilities, dim=1)
confidences = torch.max(probabilities, dim=1)[0]

# Convert to results
results = []
for i in range(len(images)):
results.append({
'prediction': predictions[i].item(),
'confidence': confidences[i].item(),
'probabilities': probabilities[i].cpu().numpy().tolist()
})

return results

except Exception as e:
print(f"Batch inference error: {e}")
return [{'error': str(e)} for _ in images]

async def classify_async(self, image: torch.Tensor) -> Dict:
"""Asynchronous classification with batching"""
import uuid
import asyncio

request_id = str(uuid.uuid4())
result_queue = queue.Queue(maxsize=1)
self.result_queues[request_id] = result_queue

# Submit to batch queue
self.batch_queue.put({
'image': image,
'request_id': request_id
})

# Wait for result asynchronously
while True:
try:
result = result_queue.get_nowait()
del self.result_queues[request_id]
return result
except queue.Empty:
await asyncio.sleep(0.01) # Small delay to prevent busy waiting

class ModelCacheManager:
"""Intelligent model caching and management"""

def __init__(self, cache_size: int = 3):
self.cache_size = cache_size
self.model_cache = {}
self.access_times = {}
self.lock = threading.Lock()

def get_model(self, model_type: str, model_path: str) -> OptimizedInferenceEngine:
"""Get model from cache or load if not cached"""
with self.lock:
cache_key = f"{model_type}_{model_path}"

if cache_key in self.model_cache:
self.access_times[cache_key] = time.time()
return self.model_cache[cache_key]

# Load new model
model = OptimizedInferenceEngine(model_path)

# Manage cache size
if len(self.model_cache) >= self.cache_size:
self._evict_least_used()

self.model_cache[cache_key] = model
self.access_times[cache_key] = time.time()

return model

def _evict_least_used(self):
"""Evict least recently used model"""
if not self.access_times:
return

oldest_key = min(self.access_times.keys(),
key=lambda k: self.access_times[k])

del self.model_cache[oldest_key]
del self.access_times[oldest_key]

# Global model cache
model_cache = ModelCacheManager()

class GPUMemoryManager:
"""GPU memory optimization and management"""

@staticmethod
def clear_cache():
"""Clear GPU cache"""
if torch.cuda.is_available():
torch.cuda.empty_cache()

@staticmethod
def get_memory_stats() -> Dict:
"""Get GPU memory statistics"""
if not torch.cuda.is_available():
return {"gpu_available": False}

return {
"gpu_available": True,
"allocated": torch.cuda.memory_allocated(),
"cached": torch.cuda.memory_reserved(),
"max_allocated": torch.cuda.max_memory_allocated(),
"device_count": torch.cuda.device_count()
}

@staticmethod
def optimize_memory():
"""Optimize GPU memory usage"""
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()

Database Performance Optimization

# Database optimization configuration
import asyncpg
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
import json
from typing import Optional, List, Dict

class OptimizedDatabaseManager:
"""Optimized database connection and query management"""

def __init__(self, database_url: str):
# Optimized engine configuration
self.engine = create_async_engine(
database_url,
# Connection pool settings
pool_size=20,
max_overflow=30,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True,

# Query optimization
echo=False, # Disable SQL logging in production
future=True,

# Asyncpg specific optimizations
connect_args={
"server_settings": {
"jit": "off", # Disable JIT for simple queries
"application_name": "iris_ocr",
},
"command_timeout": 60,
"server_timeout": 60,
}
)

# Async session factory
self.async_session = sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)

# Query cache
self.query_cache = {}
self.cache_ttl = 300 # 5 minutes

async def execute_optimized_query(self, query: str, params: Dict = None) -> List[Dict]:
"""Execute optimized database query with caching"""
cache_key = self._generate_cache_key(query, params)

# Check cache first
cached_result = self._get_from_cache(cache_key)
if cached_result:
return cached_result

async with self.async_session() as session:
try:
result = await session.execute(query, params or {})
rows = result.fetchall()

# Convert to dict format
result_data = [dict(row) for row in rows]

# Cache result
self._set_in_cache(cache_key, result_data)

return result_data

except Exception as e:
await session.rollback()
raise e

def _generate_cache_key(self, query: str, params: Dict) -> str:
"""Generate cache key for query"""
import hashlib
query_hash = hashlib.md5(f"{query}{json.dumps(params, sort_keys=True)}".encode()).hexdigest()
return f"query:{query_hash}"

def _get_from_cache(self, cache_key: str) -> Optional[List[Dict]]:
"""Get result from cache"""
if cache_key in self.query_cache:
cached_data, timestamp = self.query_cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_data
else:
del self.query_cache[cache_key]
return None

def _set_in_cache(self, cache_key: str, data: List[Dict]):
"""Set result in cache"""
self.query_cache[cache_key] = (data, time.time())

# Cleanup old cache entries
if len(self.query_cache) > 1000:
self._cleanup_cache()

def _cleanup_cache(self):
"""Cleanup expired cache entries"""
current_time = time.time()
expired_keys = [
key for key, (_, timestamp) in self.query_cache.items()
if current_time - timestamp > self.cache_ttl
]

for key in expired_keys:
del self.query_cache[key]

# Database indexes for performance
DATABASE_INDEXES = """
-- Create indexes for common queries
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_created_at
ON documents(created_at DESC);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_status
ON documents(status) WHERE status IN ('processing', 'completed', 'failed');

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_type
ON documents(document_type);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_processing_logs_document_id
ON processing_logs(document_id);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_processing_logs_timestamp
ON processing_logs(timestamp DESC);

-- Partial indexes for common filters
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_recent
ON documents(created_at) WHERE created_at > NOW() - INTERVAL '7 days';

-- Composite indexes for complex queries
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_type_status_created
ON documents(document_type, status, created_at DESC);

-- JSON indexes for metadata searches
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_metadata_gin
ON documents USING gin(metadata);
"""

# Query optimization examples
OPTIMIZED_QUERIES = {
"get_recent_documents": """
SELECT d.id, d.filename, d.document_type, d.status, d.created_at
FROM documents d
WHERE d.created_at > $1
ORDER BY d.created_at DESC
LIMIT $2
""",

"get_processing_stats": """
SELECT
document_type,
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_processing_time
FROM documents
WHERE created_at > $1
GROUP BY document_type
""",

"get_performance_metrics": """
SELECT
DATE_TRUNC('hour', timestamp) as hour,
service_name,
AVG(response_time) as avg_response_time,
MAX(response_time) as max_response_time,
COUNT(*) as request_count
FROM performance_logs
WHERE timestamp > $1
GROUP BY DATE_TRUNC('hour', timestamp), service_name
ORDER BY hour DESC
"""
}

System Performance Monitoring

Comprehensive Monitoring Stack

# monitoring/performance_monitor.py
import asyncio
import psutil
import time
import logging
from prometheus_client import Gauge, Counter, Histogram, CollectorRegistry
from typing import Dict, List
import aiohttp
import json

# Prometheus metrics
REGISTRY = CollectorRegistry()

# System metrics
CPU_USAGE = Gauge('system_cpu_usage_percent', 'CPU usage percentage', registry=REGISTRY)
MEMORY_USAGE = Gauge('system_memory_usage_percent', 'Memory usage percentage', registry=REGISTRY)
DISK_USAGE = Gauge('system_disk_usage_percent', 'Disk usage percentage', ['mount'], registry=REGISTRY)
NETWORK_IO = Counter('system_network_io_bytes', 'Network I/O bytes', ['direction'], registry=REGISTRY)

# Application metrics
REQUEST_LATENCY = Histogram('app_request_latency_seconds', 'Request latency', ['service'], registry=REGISTRY)
ERROR_RATE = Counter('app_errors_total', 'Total errors', ['service', 'error_type'], registry=REGISTRY)
THROUGHPUT = Counter('app_requests_total', 'Total requests', ['service'], registry=REGISTRY)

# ML specific metrics
MODEL_INFERENCE_TIME = Histogram('ml_inference_duration_seconds', 'Model inference time', ['model_type'], registry=REGISTRY)
MODEL_MEMORY_USAGE = Gauge('ml_model_memory_bytes', 'Model memory usage', ['model_type'], registry=REGISTRY)
GPU_UTILIZATION = Gauge('gpu_utilization_percent', 'GPU utilization', ['gpu_id'], registry=REGISTRY)

class SystemPerformanceMonitor:
"""Comprehensive system performance monitoring"""

def __init__(self, collection_interval: int = 10):
self.collection_interval = collection_interval
self.running = False

async def start_monitoring(self):
"""Start performance monitoring"""
self.running = True

# Start monitoring tasks
tasks = [
self._monitor_system_resources(),
self._monitor_network(),
self._monitor_gpu(),
self._monitor_application_metrics()
]

await asyncio.gather(*tasks)

async def _monitor_system_resources(self):
"""Monitor CPU, memory, and disk usage"""
while self.running:
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
CPU_USAGE.set(cpu_percent)

# Memory usage
memory = psutil.virtual_memory()
MEMORY_USAGE.set(memory.percent)

# Disk usage
for partition in psutil.disk_partitions():
try:
disk_usage = psutil.disk_usage(partition.mountpoint)
DISK_USAGE.labels(mount=partition.mountpoint).set(
(disk_usage.used / disk_usage.total) * 100
)
except PermissionError:
continue

await asyncio.sleep(self.collection_interval)

except Exception as e:
logging.error(f"System monitoring error: {e}")
await asyncio.sleep(self.collection_interval)

async def _monitor_network(self):
"""Monitor network I/O"""
last_net_io = psutil.net_io_counters()

while self.running:
try:
current_net_io = psutil.net_io_counters()

# Calculate deltas
bytes_sent_delta = current_net_io.bytes_sent - last_net_io.bytes_sent
bytes_recv_delta = current_net_io.bytes_recv - last_net_io.bytes_recv

NETWORK_IO.labels(direction='sent').inc(bytes_sent_delta)
NETWORK_IO.labels(direction='received').inc(bytes_recv_delta)

last_net_io = current_net_io
await asyncio.sleep(self.collection_interval)

except Exception as e:
logging.error(f"Network monitoring error: {e}")
await asyncio.sleep(self.collection_interval)

async def _monitor_gpu(self):
"""Monitor GPU utilization and memory"""
try:
import pynvml
pynvml.nvmlInit()
device_count = pynvml.nvmlDeviceGetCount()

while self.running:
for i in range(device_count):
try:
handle = pynvml.nvmlDeviceGetHandleByIndex(i)

# GPU utilization
utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
GPU_UTILIZATION.labels(gpu_id=str(i)).set(utilization.gpu)

# GPU memory
memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
gpu_memory_usage = (memory_info.used / memory_info.total) * 100
GPU_UTILIZATION.labels(gpu_id=f"{i}_memory").set(gpu_memory_usage)

except Exception as e:
logging.error(f"GPU {i} monitoring error: {e}")

await asyncio.sleep(self.collection_interval)

except ImportError:
logging.warning("pynvml not available, GPU monitoring disabled")
except Exception as e:
logging.error(f"GPU monitoring initialization error: {e}")

async def _monitor_application_metrics(self):
"""Monitor application-specific metrics"""
services = [
("http://api-gateway:8000/metrics", "api-gateway"),
("http://image-processor:8001/metrics", "image-processor"),
("http://ml-embeddings:8002/metrics", "ml-embeddings"),
("http://ml-classifier:8003/metrics", "ml-classifier"),
("http://ocr-extractor:8004/metrics", "ocr-extractor")
]

while self.running:
async with aiohttp.ClientSession() as session:
for url, service_name in services:
try:
async with session.get(url, timeout=5) as response:
if response.status == 200:
metrics_data = await response.text()
self._parse_and_update_metrics(metrics_data, service_name)

except Exception as e:
ERROR_RATE.labels(service=service_name, error_type='monitoring').inc()
logging.error(f"Failed to collect metrics from {service_name}: {e}")

await asyncio.sleep(self.collection_interval * 2) # Less frequent for app metrics

def _parse_and_update_metrics(self, metrics_data: str, service_name: str):
"""Parse Prometheus metrics and update local metrics"""
# This is a simplified parser - in production, use proper Prometheus client
lines = metrics_data.split('\n')

for line in lines:
if line.startswith('http_request_duration_seconds'):
# Extract latency metrics
if 'quantile="0.95"' in line:
value = float(line.split()[-1])
REQUEST_LATENCY.labels(service=service_name).observe(value)

def stop_monitoring(self):
"""Stop performance monitoring"""
self.running = False

class ApplicationProfiler:
"""Application performance profiler"""

def __init__(self):
self.profiling_data = {}
self.active_profiles = {}

def start_profile(self, profile_name: str):
"""Start profiling session"""
self.active_profiles[profile_name] = {
'start_time': time.time(),
'start_memory': psutil.Process().memory_info().rss
}

def end_profile(self, profile_name: str) -> Dict:
"""End profiling session and return results"""
if profile_name not in self.active_profiles:
return {}

start_data = self.active_profiles[profile_name]
end_time = time.time()
end_memory = psutil.Process().memory_info().rss

profile_result = {
'duration': end_time - start_data['start_time'],
'memory_delta': end_memory - start_data['start_memory'],
'timestamp': end_time
}

# Store in profiling data
if profile_name not in self.profiling_data:
self.profiling_data[profile_name] = []

self.profiling_data[profile_name].append(profile_result)

# Cleanup
del self.active_profiles[profile_name]

return profile_result

def get_profile_summary(self, profile_name: str) -> Dict:
"""Get summary statistics for profile"""
if profile_name not in self.profiling_data:
return {}

data = self.profiling_data[profile_name]
durations = [d['duration'] for d in data]
memory_deltas = [d['memory_delta'] for d in data]

return {
'count': len(data),
'avg_duration': sum(durations) / len(durations),
'max_duration': max(durations),
'min_duration': min(durations),
'avg_memory_delta': sum(memory_deltas) / len(memory_deltas),
'max_memory_delta': max(memory_deltas)
}

# Performance testing utilities
class PerformanceTester:
"""Performance testing and benchmarking"""

def __init__(self):
self.test_results = {}

async def run_load_test(self, endpoint: str, concurrent_requests: int = 10,
duration_seconds: int = 60) -> Dict:
"""Run load test against endpoint"""

async def make_request(session, url):
start_time = time.time()
try:
async with session.get(url) as response:
await response.read()
return {
'status': response.status,
'duration': time.time() - start_time,
'success': response.status == 200
}
except Exception as e:
return {
'status': 0,
'duration': time.time() - start_time,
'success': False,
'error': str(e)
}

# Load test execution
start_test_time = time.time()
all_results = []

async with aiohttp.ClientSession() as session:
while time.time() - start_test_time < duration_seconds:
# Create batch of concurrent requests
tasks = [
make_request(session, endpoint)
for _ in range(concurrent_requests)
]

batch_results = await asyncio.gather(*tasks)
all_results.extend(batch_results)

# Small delay between batches
await asyncio.sleep(0.1)

# Analyze results
successful_requests = [r for r in all_results if r['success']]
failed_requests = [r for r in all_results if not r['success']]

if successful_requests:
durations = [r['duration'] for r in successful_requests]
avg_duration = sum(durations) / len(durations)
max_duration = max(durations)
min_duration = min(durations)
p95_duration = sorted(durations)[int(len(durations) * 0.95)]
else:
avg_duration = max_duration = min_duration = p95_duration = 0

return {
'total_requests': len(all_results),
'successful_requests': len(successful_requests),
'failed_requests': len(failed_requests),
'success_rate': len(successful_requests) / len(all_results) if all_results else 0,
'avg_response_time': avg_duration,
'max_response_time': max_duration,
'min_response_time': min_duration,
'p95_response_time': p95_duration,
'requests_per_second': len(all_results) / duration_seconds,
'test_duration': duration_seconds
}

# Global performance monitor instance
performance_monitor = SystemPerformanceMonitor()
profiler = ApplicationProfiler()

Grafana Dashboard Configuration

{
"dashboard": {
"id": null,
"title": "IRIS Performance Dashboard",
"tags": ["iris", "performance"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "System Overview",
"type": "row",
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 0}
},
{
"id": 2,
"title": "CPU Usage",
"type": "graph",
"targets": [
{
"expr": "system_cpu_usage_percent",
"legendFormat": "CPU Usage %"
}
],
"yAxes": [
{
"min": 0,
"max": 100,
"unit": "percent"
}
],
"gridPos": {"h": 8, "w": 8, "x": 0, "y": 1}
},
{
"id": 3,
"title": "Memory Usage",
"type": "graph",
"targets": [
{
"expr": "system_memory_usage_percent",
"legendFormat": "Memory Usage %"
}
],
"yAxes": [
{
"min": 0,
"max": 100,
"unit": "percent"
}
],
"gridPos": {"h": 8, "w": 8, "x": 8, "y": 1}
},
{
"id": 4,
"title": "GPU Utilization",
"type": "graph",
"targets": [
{
"expr": "gpu_utilization_percent",
"legendFormat": "GPU {{gpu_id}}"
}
],
"gridPos": {"h": 8, "w": 8, "x": 16, "y": 1}
},
{
"id": 5,
"title": "Application Performance",
"type": "row",
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 9}
},
{
"id": 6,
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(app_requests_total[5m])",
"legendFormat": "{{service}} - Requests/sec"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 10}
},
{
"id": 7,
"title": "Response Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(app_request_latency_seconds_bucket[5m]))",
"legendFormat": "95th percentile - {{service}}"
},
{
"expr": "histogram_quantile(0.50, rate(app_request_latency_seconds_bucket[5m]))",
"legendFormat": "50th percentile - {{service}}"
}
],
"yAxes": [
{
"unit": "s"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 10}
},
{
"id": 8,
"title": "ML Performance",
"type": "row",
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 18}
},
{
"id": 9,
"title": "Model Inference Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(ml_inference_duration_seconds_bucket[5m]))",
"legendFormat": "{{model_type}} - 95th percentile"
}
],
"yAxes": [
{
"unit": "s"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 19}
},
{
"id": 10,
"title": "Model Memory Usage",
"type": "graph",
"targets": [
{
"expr": "ml_model_memory_bytes",
"legendFormat": "{{model_type}}"
}
],
"yAxes": [
{
"unit": "bytes"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 19}
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "10s"
}
}

This comprehensive performance guide provides all the tools and strategies needed to optimize and monitor the IRIS OCR platform for maximum efficiency and reliability.