Skip to content

Advanced Examples

This page provides advanced examples and use cases for the Segmentation Robustness Framework. 🚀

🤖 Multi-Model Evaluation

Compare multiple models on the same dataset and attacks:

from segmentation_robustness_framework.pipeline import SegmentationRobustnessPipeline
from segmentation_robustness_framework.loaders import UniversalModelLoader, DatasetLoader
from segmentation_robustness_framework.metrics import MetricsCollection
from segmentation_robustness_framework.attacks import FGSM, PGD
import torch

# Initialize components
model_loader = UniversalModelLoader()
dataset_loader = DatasetLoader({
    "name": "voc",
    "split": "val",
    "root": "./data",
    "image_shape": [512, 512],
    "download": True
})
dataset = dataset_loader.load_dataset()

# Define models to compare
models = {
    "deeplabv3_resnet50": {
        "type": "torchvision",
        "config": {"name": "deeplabv3_resnet50", "num_classes": 21}
    },
    "deeplabv3_resnet101": {
        "type": "torchvision", 
        "config": {"name": "deeplabv3_resnet101", "num_classes": 21}
    },
    "fcn_resnet50": {
        "type": "torchvision",
        "config": {"name": "fcn_resnet50", "num_classes": 21}
    }
}

# Define attacks
attacks = [
    FGSM(None, eps=0.02),  # Will be set per model
    FGSM(None, eps=0.05),
    PGD(None, eps=0.02, alpha=0.02, iters=10, targeted=False),
    PGD(None, eps=0.05, alpha=0.02, iters=20, targeted=False)
]

# Define metrics
metrics = MetricsCollection(num_classes=21, ignore_index=255)
metric_functions = [metrics.mean_iou, metrics.pixel_accuracy, metrics.dice_score]

# Evaluate each model
results = {}
for model_name, model_config in models.items():
    print(f"\nEvaluating {model_name}...")

    # Load model
    model = model_loader.load_model(
        model_type=model_config["type"],
        model_config=model_config["config"]
    )
    model = model.to("cuda")

    # Set model for attacks
    for attack in attacks:
        attack.model = model
        attack.set_device("cuda")

    # Create pipeline
    pipeline = SegmentationRobustnessPipeline(
        model=model,
        dataset=dataset,
        attacks=attacks,
        metrics=metric_functions,
        batch_size=4,
        device="cuda",
        output_dir=f"./results/{model_name}",
        auto_resize_masks=True,
        output_formats=["json", "csv"]
    )

    # Run evaluation
    model_results = pipeline.run()
    results[model_name] = model_results

# Compare results
print("\n" + "="*60)
print("MODEL COMPARISON RESULTS")
print("="*60)

for model_name, model_results in results.items():
    print(f"\n{model_name}:")
    print(f"  Clean IoU: {model_results['clean']['mean_iou']:.3f}")
    print(f"  Clean Pixel Accuracy: {model_results['clean']['pixel_accuracy']:.3f}")

    for attack_name, attack_results in model_results.items():
        if attack_name.startswith('attack_'):
            print(f"  {attack_name}:")
            print(f"    IoU: {attack_results['mean_iou']:.3f}")
            print(f"    Pixel Accuracy: {attack_results['pixel_accuracy']:.3f}")

⚔️ Custom Attack Implementation

Create your own adversarial attack:

import torch
import torch.nn as nn
from segmentation_robustness_framework.attacks import AdversarialAttack
from segmentation_robustness_framework.attacks.registry import register_attack

@register_attack("custom_attack")
class CustomAttack(AdversarialAttack):
    """Custom adversarial attack implementation.

    This is an example of how to implement your own attack.
    """

    def __init__(self, model: nn.Module, eps: float = 0.02, alpha: float = 0.01, iters: int = 10):
        """Initialize custom attack.

        Args:
            model (nn.Module): Model to attack.
            eps (float): Maximum perturbation magnitude.
            alpha (float): Step size for each iteration.
            iters (int): Number of iterations.
        """
        super().__init__(model)
        self.eps = eps
        self.alpha = alpha
        self.iters = iters

    def apply(self, images: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
        """Apply custom attack to images.

        Args:
            images (torch.Tensor): Input images [B, C, H, W].
            labels (torch.Tensor): Target labels [B, H, W].

        Returns:
            torch.Tensor: Adversarial images [B, C, H, W].
        """
        self.model.eval()

        images = images.to(self.device, non_blocking=True)
        labels = labels.to(self.device, non_blocking=True)

        # Initialize adversarial images
        adv_images = images.clone().detach()

        for _ in range(self.iters):
            adv_images.requires_grad = True

            # Forward pass
            outputs = self.model(adv_images)

            # Compute loss
            loss = torch.nn.CrossEntropyLoss()(outputs, labels)

            # Compute gradients
            grad = torch.autograd.grad(loss, adv_images, retain_graph=False)[0]

            # Update adversarial images
            adv_images = adv_images.detach() + self.alpha * grad.sign()

            # Project to epsilon ball
            delta = torch.clamp(adv_images - images, min=-self.eps, max=self.eps)
            adv_images = torch.clamp(images + delta, min=0, max=1).detach()

        return adv_images

# Use custom attack
custom_attack = CustomAttack(model, eps=0.03, alpha=0.01, iters=15)

pipeline = SegmentationRobustnessPipeline(
    model=model,
    dataset=dataset,
    attacks=[custom_attack],
    metrics=[metrics.mean_iou],
    batch_size=4,
    device="cuda"
)

📊 Advanced Metric Configuration

Create complex metric configurations with custom averaging:

from segmentation_robustness_framework.metrics import MetricsCollection, register_custom_metric
import numpy as np

# Custom weighted IoU metric
@register_custom_metric("weighted_iou")
def weighted_iou(targets, predictions):
    """Compute weighted IoU with class-specific weights."""
    # Convert to numpy
    if isinstance(targets, torch.Tensor):
        targets = targets.cpu().numpy()
    if isinstance(predictions, torch.Tensor):
        predictions = predictions.cpu().numpy()

    # Define class weights (e.g., more weight for foreground classes)
    class_weights = np.ones(21)  # VOC has 21 classes
    class_weights[1:] = 2.0  # Give more weight to foreground classes

    # Compute IoU for each class
    ious = []
    for cls in range(21):
        if cls == 255:  # ignore index
            continue

        pred = (predictions == cls).astype(np.int32)
        true = (targets == cls).astype(np.int32)

        intersection = np.sum(pred * true)
        union = np.sum(pred) + np.sum(true) - intersection

        if union > 0:
            iou = intersection / union
        else:
            iou = 0.0
        ious.append(iou)

    # Compute weighted average
    valid_ious = [iou for iou in ious if not np.isnan(iou)]
    if valid_ious:
        return np.average(valid_ious, weights=class_weights[:len(valid_ious)])
    return 0.0

# Create metrics collection with custom metrics
metrics = MetricsCollection(num_classes=21, ignore_index=255)

# Define metric configuration with different averaging strategies
metric_config = {
    "ignore_index": 255,
    "selected_metrics": [
        "mean_iou",  # Default macro averaging
        "pixel_accuracy",
        {"name": "precision", "average": "macro"},
        {"name": "precision", "average": "micro"},
        {"name": "recall", "average": "macro"},
        {"name": "recall", "average": "micro"},
        {"name": "dice_score", "average": "macro"},
        {"name": "dice_score", "average": "micro"},
        "weighted_iou"  # Custom metric
    ]
}

# Use in pipeline
pipeline = SegmentationRobustnessPipeline(
    model=model,
    dataset=dataset,
    attacks=[FGSM(model, eps=0.02)],
    metrics=[metrics.mean_iou, metrics.pixel_accuracy, weighted_iou],
    batch_size=4,
    device="cuda"
)

📁 Custom Dataset Integration

Integrate your own dataset:

import torch
from torch.utils.data import Dataset
from PIL import Image
import os
from segmentation_robustness_framework.datasets.registry import register_dataset

@register_dataset("custom_dataset")
class CustomDataset(Dataset):
    """Custom dataset implementation."""

    def __init__(self, root: str, split: str = "train", transform=None, target_transform=None):
        """Initialize custom dataset.

        Args:
            root (str): Dataset root directory.
            split (str): Dataset split ('train', 'val', 'test').
            transform: Image transformations.
            target_transform: Target transformations.
        """
        self.root = root
        self.split = split
        self.transform = transform
        self.target_transform = target_transform

        # Load image and mask paths
        self.images_dir = os.path.join(root, "images", split)
        self.masks_dir = os.path.join(root, "masks", split)

        self.images = [f for f in os.listdir(self.images_dir) if f.endswith('.jpg')]
        self.num_classes = 21  # Adjust based on your dataset

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        # Load image
        img_path = os.path.join(self.images_dir, self.images[idx])
        image = Image.open(img_path).convert('RGB')

        # Load mask
        mask_path = os.path.join(self.masks_dir, self.images[idx].replace('.jpg', '.png'))
        mask = Image.open(mask_path)

        # Apply transformations
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            mask = self.target_transform(mask)

        return image, mask

# Use custom dataset
dataset_config = {
    "name": "custom_dataset",
    "root": "./path/to/your/dataset",
    "split": "val",
    "image_shape": [512, 512],
}

dataset_loader = DatasetLoader(dataset_config)
dataset = dataset_loader.load_dataset()

🤖 Advanced Model Integration

Integrate complex models with custom preprocessing:

import torch
import torch.nn as nn
from segmentation_robustness_framework.adapters import CustomAdapter
from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation

class CustomHuggingFaceAdapter(CustomAdapter):
    """Custom adapter for HuggingFace models with preprocessing."""

    def __init__(self, model, processor, num_classes=21):
        super().__init__(model, num_classes)
        self.processor = processor

    def logits(self, x):
        """Get logits with custom preprocessing."""
        # Apply processor transformations
        inputs = self.processor(x, return_tensors="pt")

        # Move to model device
        for key, value in inputs.items():
            if isinstance(value, torch.Tensor):
                inputs[key] = value.to(self.model.device)

        # Forward pass
        with torch.no_grad():
            outputs = self.model(**inputs)

        return outputs.logits

    def predictions(self, x):
        """Get predictions with custom preprocessing."""
        logits = self.logits(x)
        return torch.argmax(logits, dim=1)

# Load custom HuggingFace model
model_name = "nvidia/segformer-b0-finetuned-ade-512-512"
processor = AutoImageProcessor.from_pretrained(model_name)
model = AutoModelForSemanticSegmentation.from_pretrained(model_name)

# Create custom adapter
adapter = CustomHuggingFaceAdapter(model, processor, num_classes=150)

# Use in pipeline
pipeline = SegmentationRobustnessPipeline(
    model=adapter,
    dataset=dataset,
    attacks=[FGSM(adapter, eps=0.02)],
    metrics=[metrics.mean_iou],
    batch_size=2,  # Smaller batch size for large models
    device="cuda"
)

⚡ Performance Optimization

Optimize for large-scale evaluation:

import torch
from segmentation_robustness_framework.pipeline import SegmentationRobustnessPipeline

# Optimize for performance
def create_optimized_pipeline(model, dataset, attacks, metrics):
    """Create pipeline with performance optimizations."""

    # Use mixed precision for faster computation
    if torch.cuda.is_available():
        model = model.half()  # Use FP16

    # Optimize DataLoader
    pipeline = SegmentationRobustnessPipeline(
        model=model,
        dataset=dataset,
        attacks=attacks,
        metrics=metrics,
        batch_size=8,  # Larger batch size
        device="cuda",
        num_workers=4,  # Parallel data loading
        pin_memory=True,  # Faster data transfer to GPU
        persistent_workers=True,  # Keep workers alive
        auto_resize_masks=True,
        output_formats=["json"],  # Faster than CSV for large results
        metric_precision=3  # Reduce precision for speed
    )

    return pipeline

# Memory optimization
def optimize_memory():
    """Optimize memory usage."""
    if torch.cuda.is_available():
        # Clear cache before large operations
        torch.cuda.empty_cache()

        # Set memory fraction
        torch.cuda.set_per_process_memory_fraction(0.8)

        # Enable memory efficient attention if available
        if hasattr(torch, 'backends') and hasattr(torch.backends, 'cuda'):
            torch.backends.cuda.enable_flash_sdp(True)

# Use optimizations
optimize_memory()
pipeline = create_optimized_pipeline(model, dataset, attacks, metrics)
results = pipeline.run()

🛡️ Error Handling and Logging

Implement robust error handling:

import logging
from segmentation_robustness_framework.pipeline import SegmentationRobustnessPipeline

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('evaluation.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

class RobustPipeline:
    """Pipeline with robust error handling."""

    def __init__(self, model, dataset, attacks, metrics):
        self.model = model
        self.dataset = dataset
        self.attacks = attacks
        self.metrics = metrics

    def run_with_error_handling(self):
        """Run pipeline with comprehensive error handling."""
        try:
            # Create pipeline
            pipeline = SegmentationRobustnessPipeline(
                model=self.model,
                dataset=self.dataset,
                attacks=self.attacks,
                metrics=self.metrics,
                batch_size=4,
                device="cuda",
                output_dir="./robust_results"
            )

            logger.info("Starting evaluation...")
            results = pipeline.run()
            logger.info("Evaluation completed successfully!")

            return results

        except torch.cuda.OutOfMemoryError:
            logger.error("GPU out of memory. Trying with smaller batch size...")
            return self.run_with_smaller_batch()

        except Exception as e:
            logger.error(f"Evaluation failed: {e}")
            logger.error("Attempting recovery...")
            return self.run_with_recovery()

    def run_with_smaller_batch(self):
        """Run with reduced batch size."""
        try:
            pipeline = SegmentationRobustnessPipeline(
                model=self.model,
                dataset=self.dataset,
                attacks=self.attacks,
                metrics=self.metrics,
                batch_size=1,  # Minimal batch size
                device="cuda",
                output_dir="./robust_results_small_batch"
            )

            logger.info("Running with batch size 1...")
            return pipeline.run()

        except Exception as e:
            logger.error(f"Even small batch failed: {e}")
            return None

    def run_with_recovery(self):
        """Run with recovery mechanisms."""
        try:
            # Clear GPU memory
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

            # Try CPU fallback
            logger.info("Trying CPU fallback...")
            pipeline = SegmentationRobustnessPipeline(
                model=self.model,
                dataset=self.dataset,
                attacks=self.attacks,
                metrics=self.metrics,
                batch_size=1,
                device="cpu",
                output_dir="./robust_results_cpu"
            )

            return pipeline.run()

        except Exception as e:
            logger.error(f"Recovery failed: {e}")
            return None

# Use robust pipeline
robust_pipeline = RobustPipeline(model, dataset, attacks, metrics)
results = robust_pipeline.run_with_error_handling()

🔗 Integration with External Tools

Integrate with popular ML tools:

# Integration with Weights & Biases
import wandb
from segmentation_robustness_framework.pipeline import SegmentationRobustnessPipeline

def run_with_wandb():
    """Run evaluation with Weights & Biases logging."""

    # Initialize wandb
    wandb.init(project="segmentation-robustness", name="advanced-evaluation")

    # Log configuration
    config = {
        "model": "deeplabv3_resnet50",
        "dataset": "voc",
        "attacks": ["fgsm", "pgd"],
        "batch_size": 4,
        "device": "cuda"
    }
    wandb.config.update(config)

    # Run evaluation
    pipeline = SegmentationRobustnessPipeline(
        model=model,
        dataset=dataset,
        attacks=attacks,
        metrics=metrics,
        batch_size=4,
        device="cuda"
    )

    results = pipeline.run()

    # Log results - handle missing metrics gracefully
    for attack_name, attack_results in results.items():
        if attack_name.startswith('attack_'):
            # Log each metric if it exists
            for metric_name, metric_value in attack_results.items():
                if metric_value is not None:  # Handle None values
                    wandb.log({f"{attack_name}/{metric_name}": metric_value})

    # Log clean performance
    if "clean" in results:
        for metric_name, metric_value in results["clean"].items():
            if metric_value is not None:
                wandb.log({f"clean/{metric_name}": metric_value})

    # Log summary statistics
    summary = pipeline.get_summary()
    if "robustness_analysis" in summary:
        for attack_name, robustness in summary["robustness_analysis"].items():
            for metric_name, degradation in robustness.items():
                if degradation is not None:
                    wandb.log({f"robustness/{attack_name}/{metric_name}": degradation})

    wandb.finish()
    return results

# Integration with MLflow
import mlflow
import mlflow.pytorch

def run_with_mlflow():
    """Run evaluation with MLflow tracking."""

    mlflow.set_experiment("segmentation-robustness")

    with mlflow.start_run():
        # Log parameters
        mlflow.log_params({
            "model": "deeplabv3_resnet50",
            "dataset": "voc",
            "batch_size": 4
        })

        # Run evaluation
        pipeline = SegmentationRobustnessPipeline(
            model=model,
            dataset=dataset,
            attacks=attacks,
            metrics=metrics,
            batch_size=4,
            device="cuda"
        )

        results = pipeline.run()

        # Log metrics - handle missing metrics gracefully
        for attack_name, attack_results in results.items():
            if attack_name.startswith('attack_'):
                for metric_name, metric_value in attack_results.items():
                    if metric_value is not None:  # Handle None values
                        mlflow.log_metric(f"{attack_name}_{metric_name}", metric_value)

        # Log clean performance
        if "clean" in results:
            for metric_name, metric_value in results["clean"].items():
                if metric_value is not None:
                    mlflow.log_metric(f"clean_{metric_name}", metric_value)

        # Log model (only if it's a standard PyTorch model)
        try:
            if hasattr(model, 'model'):  # If it's an adapter
                mlflow.pytorch.log_model(model.model, "segmentation_model")
            else:
                mlflow.pytorch.log_model(model, "segmentation_model")
        except Exception as e:
            print(f"Could not log model to MLflow: {e}")

        return results

# Integration with TensorBoard
import torch
from torch.utils.tensorboard import SummaryWriter
from segmentation_robustness_framework.pipeline import SegmentationRobustnessPipeline

def run_with_tensorboard():
    """Run evaluation with TensorBoard logging."""

    # Initialize TensorBoard writer
    writer = SummaryWriter('runs/segmentation_robustness')

    # Run evaluation
    pipeline = SegmentationRobustnessPipeline(
        model=model,
        dataset=dataset,
        attacks=attacks,
        metrics=metrics,
        batch_size=4,
        device="cuda"
    )

    results = pipeline.run()

    # Log results
    for attack_name, attack_results in results.items():
        if attack_name.startswith('attack_'):
            for metric_name, metric_value in attack_results.items():
                if metric_value is not None:
                    writer.add_scalar(f'{attack_name}/{metric_name}', metric_value, 0)

    # Log clean performance
    if "clean" in results:
        for metric_name, metric_value in results["clean"].items():
            if metric_value is not None:
                writer.add_scalar(f'clean/{metric_name}', metric_value, 0)

    writer.close()
    return results

# Integration with Neptune.ai
import neptune
from segmentation_robustness_framework.pipeline import SegmentationRobustnessPipeline

def run_with_neptune():
    """Run evaluation with Neptune.ai logging."""

    # Initialize Neptune
    run = neptune.init_run(project="your-workspace/segmentation-robustness")

    # Log configuration
    run["config/model"] = "deeplabv3_resnet50"
    run["config/dataset"] = "voc"
    run["config/batch_size"] = 4

    # Run evaluation
    pipeline = SegmentationRobustnessPipeline(
        model=model,
        dataset=dataset,
        attacks=attacks,
        metrics=metrics,
        batch_size=4,
        device="cuda"
    )

    results = pipeline.run()

    # Log results
    for attack_name, attack_results in results.items():
        if attack_name.startswith('attack_'):
            for metric_name, metric_value in attack_results.items():
                if metric_value is not None:
                    run[f"results/{attack_name}/{metric_name}"] = metric_value

    # Log clean performance
    if "clean" in results:
        for metric_name, metric_value in results["clean"].items():
            if metric_value is not None:
                run[f"results/clean/{metric_name}"] = metric_value

    run.stop()
    return results

🚀 Next Steps