Skip to content

Pipeline API

This page documents the pipeline components of the Segmentation Robustness Framework.

segmentation_robustness_framework.pipeline.core

Classes

SegmentationRobustnessPipeline(model: SegmentationModelProtocol, dataset: torch.utils.data.Dataset, attacks: list, metrics: list[Callable], batch_size: int = 8, device: str = 'cpu', output_dir: Optional[str] = None, auto_resize_masks: bool = True, metric_names: Optional[list[str]] = None, output_formats: list[str] = ['json', 'csv'], metric_precision: int = 4, num_workers: int = 0, pin_memory: bool = False, persistent_workers: bool = False)

Pipeline for evaluating segmentation models under adversarial attacks.

This pipeline evaluates a segmentation model on clean and adversarial images, computes metrics, and provides hooks for saving results.

Attributes:

Name Type Description
model SegmentationModelProtocol

Adapter-wrapped segmentation model.

dataset Dataset

Dataset for evaluation.

attacks list

List of attack instances (must implement call(images, targets)).

metrics list[Callable]

List of metric functions/classes (accepting (targets, preds)).

batch_size int

Batch size for evaluation.

device str

Device to use for computation.

output_dir str

Directory to save results.

auto_resize_masks bool

Whether to automatically resize masks to model output size.

Initialize the segmentation robustness pipeline.

Parameters:

Name Type Description Default
model SegmentationModelProtocol

Segmentation model (adapter-wrapped).

required
dataset Dataset

Dataset object.

required
attacks list

List of attack instances.

required
metrics list[Callable]

List of metric functions or classes.

required
batch_size int

Batch size for evaluation.

8
device str

Device to use.

'cpu'
output_dir str

Directory to save results.

None
auto_resize_masks bool

Whether to automatically resize masks to model output size.

True
metric_names list[str]

Custom names for metrics. If None, auto-generate.

None
output_formats list[str]

List of output formats to save. Options: ["json", "csv"]. Defaults to ["json", "csv"] (save both).

['json', 'csv']
metric_precision int

Number of decimal places for metric values. Defaults to 4.

4
num_workers int

Number of workers for DataLoader. Defaults to 0 to prevent hanging.

0
pin_memory bool

Whether to pin memory in DataLoader. Defaults to False.

False
persistent_workers bool

Whether to use persistent workers. Defaults to False.

False
Source code in segmentation_robustness_framework/pipeline/core.py
def __init__(
    self,
    model: SegmentationModelProtocol,
    dataset: torch.utils.data.Dataset,
    attacks: list,
    metrics: list[Callable],
    batch_size: int = 8,
    device: str = "cpu",
    output_dir: Optional[str] = None,
    auto_resize_masks: bool = True,
    metric_names: Optional[list[str]] = None,
    output_formats: list[str] = ["json", "csv"],
    metric_precision: int = 4,
    num_workers: int = 0,
    pin_memory: bool = False,
    persistent_workers: bool = False,
):
    """Initialize the segmentation robustness pipeline.

    Args:
        model (SegmentationModelProtocol): Segmentation model (adapter-wrapped).
        dataset (Dataset): Dataset object.
        attacks (list): List of attack instances.
        metrics (list[Callable]): List of metric functions or classes.
        batch_size (int): Batch size for evaluation.
        device (str): Device to use.
        output_dir (str, optional): Directory to save results.
        auto_resize_masks (bool): Whether to automatically resize masks to model output size.
        metric_names (list[str], optional): Custom names for metrics. If None, auto-generate.
        output_formats (list[str]): List of output formats to save. Options: ["json", "csv"].
            Defaults to ["json", "csv"] (save both).
        metric_precision (int): Number of decimal places for metric values. Defaults to 4.
        num_workers (int): Number of workers for DataLoader. Defaults to 0 to prevent hanging.
        pin_memory (bool): Whether to pin memory in DataLoader. Defaults to False.
        persistent_workers (bool): Whether to use persistent workers. Defaults to False.
    """
    self.model = model.to(device)
    self.dataset = dataset
    self.attacks = attacks
    self.metrics = metrics
    self.batch_size = batch_size
    self.device = device
    self.base_output_dir = output_dir or "./runs"
    self.auto_resize_masks = auto_resize_masks
    self.metric_precision = metric_precision
    self.num_workers = num_workers
    self.pin_memory = pin_memory
    self.persistent_workers = persistent_workers

    self.metric_names = self._setup_metric_names(metric_names)

    self.output_formats = self._setup_output_formats(output_formats)

    self.run_id = self._generate_run_id()
    self.output_dir = Path(self.base_output_dir) / f"run_{self.run_id}"
    os.makedirs(self.output_dir, exist_ok=True)

    logger.info(f"Pipeline initialized. Run ID: {self.run_id}")
    logger.info(f"Output directory: {self.output_dir}")

    if self.auto_resize_masks:
        self._setup_automatic_mask_resizing()

    self.results = {}

Functions

run(save: bool = False, show: bool = False) -> dict[str, Any]

Run the evaluation pipeline: clean and adversarial evaluation.

Parameters:

Name Type Description Default
save bool

Whether to save results (images, metrics, etc.).

False
show bool

Whether to show visualizations.

False

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Dictionary containing all evaluation results.

Source code in segmentation_robustness_framework/pipeline/core.py
def run(self, save: bool = False, show: bool = False) -> dict[str, Any]:
    """Run the evaluation pipeline: clean and adversarial evaluation.

    Args:
        save (bool): Whether to save results (images, metrics, etc.).
        show (bool): Whether to show visualizations.

    Returns:
        dict[str, Any]: Dictionary containing all evaluation results.
    """
    # Create DataLoader with explicit parameters to prevent hanging
    loader = DataLoader(
        self.dataset,
        batch_size=self.batch_size,
        shuffle=False,
        num_workers=self.num_workers,
        pin_memory=self.pin_memory,
        persistent_workers=self.persistent_workers,
    )
    self.model.eval()

    logger.info("Starting clean evaluation...")
    clean_metrics = self.evaluate_clean(loader)
    self.results["clean"] = self._aggregate_metrics(clean_metrics)
    if save:
        self.save_results(clean_metrics, "clean")

    for attack in self.attacks:
        logger.info(f"Starting evaluation for attack: {attack}")
        adv_metrics = self.evaluate_attack(loader, attack)
        attack_name = self._generate_attack_name(attack)
        self.results[f"attack_{attack_name}"] = self._aggregate_metrics(adv_metrics)
        if save:
            self.save_results(adv_metrics, f"attack_{attack_name}")

    if save:
        self._save_summary_results()
        if show:
            self._create_visualizations()

    return self.results
get_summary() -> dict[str, Any]

Get a summary of the evaluation results.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Summary containing key statistics and comparisons.

Source code in segmentation_robustness_framework/pipeline/core.py
def get_summary(self) -> dict[str, Any]:
    """Get a summary of the evaluation results.

    Returns:
        dict[str, Any]: Summary containing key statistics and comparisons.
    """
    if not self.results:
        return {"error": "No results available. Run the pipeline first."}

    summary = {
        "total_evaluations": len(self.results),
        "evaluations": list(self.results.keys()),
        "clean_performance": self.results.get("clean", {}),
        "attack_performance": {k: v for k, v in self.results.items() if k.startswith("attack_")},
    }

    if "clean" in self.results:
        clean_metrics = self.results["clean"]
        attack_results = summary["attack_performance"]

        robustness_analysis = {}
        for attack_name, attack_metrics in attack_results.items():
            robustness = {}
            for metric in clean_metrics.keys():
                if metric in attack_metrics and attack_metrics[metric] is not None:
                    clean_val = clean_metrics[metric]
                    attack_val = attack_metrics[metric]
                    if clean_val is not None and clean_val != 0:
                        robustness[f"{metric}_degradation"] = (clean_val - attack_val) / clean_val * 100
                    else:
                        robustness[f"{metric}_degradation"] = None
            robustness_analysis[attack_name] = robustness

        summary["robustness_analysis"] = robustness_analysis

    return summary
print_summary() -> None

Print a formatted summary of the evaluation results.

Source code in segmentation_robustness_framework/pipeline/core.py
def print_summary(self) -> None:
    """Print a formatted summary of the evaluation results."""
    summary = self.get_summary()

    print("\n" + "=" * 60)
    print("SEGMENTATION ROBUSTNESS EVALUATION SUMMARY")
    print("=" * 60)

    if "error" in summary:
        print(f"Error: {summary['error']}")
        return

    print(f"Total evaluations: {summary['total_evaluations']}")
    print(f"Evaluations: {', '.join(summary['evaluations'])}")

    if "clean" in self.results:
        print("\n" + "-" * 40)
        print("CLEAN PERFORMANCE")
        print("-" * 40)
        for metric, value in self.results["clean"].items():
            if value is not None:
                print(f"{metric}: {value:.4f}")

    if "robustness_analysis" in summary:
        print("\n" + "-" * 40)
        print("ROBUSTNESS ANALYSIS")
        print("-" * 40)
        for attack_name, robustness in summary["robustness_analysis"].items():
            print(f"\n{attack_name}:")
            for metric, degradation in robustness.items():
                if degradation is not None:
                    print(f"  {metric}: {degradation:.2f}%")
                else:
                    print(f"  {metric}: N/A")

    print("\n" + "=" * 60)
get_run_info() -> dict[str, Any]

Get information about the current run.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Dictionary containing run information.

Source code in segmentation_robustness_framework/pipeline/core.py
def get_run_info(self) -> dict[str, Any]:
    """Get information about the current run.

    Returns:
        dict[str, Any]: Dictionary containing run information.
    """
    return {
        "run_id": self.run_id,
        "output_directory": str(self.output_dir),
        "base_output_directory": str(self.base_output_dir),
        "device": self.device,
        "batch_size": self.batch_size,
        "auto_resize_masks": self.auto_resize_masks,
        "model_num_classes": self.model.num_classes,
        "dataset_size": len(self.dataset),
        "num_attacks": len(self.attacks),
        "num_metrics": len(self.metrics),
        "output_formats": self.output_formats,
    }
evaluate_clean(loader: DataLoader) -> list[dict[str, Any]]

Evaluate model on clean images.

Parameters:

Name Type Description Default
loader DataLoader

DataLoader for the dataset.

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of metric results for each batch.

Source code in segmentation_robustness_framework/pipeline/core.py
def evaluate_clean(self, loader: DataLoader) -> list[dict[str, Any]]:
    """Evaluate model on clean images.

    Args:
        loader (DataLoader): DataLoader for the dataset.

    Returns:
        list[dict[str, Any]]: List of metric results for each batch.
    """
    all_metrics = []
    for images, targets in tqdm(loader, desc="Clean Evaluation"):
        images = images.to(self.device, non_blocking=True)
        targets = targets.to(self.device, non_blocking=True)

        valid_mask = targets >= 0  # Exclude ignore_index (-1)
        if torch.any(valid_mask):
            max_valid_value = torch.max(targets[valid_mask])
            if max_valid_value >= self.model.num_classes:
                targets = torch.clamp(targets, -1, self.model.num_classes - 1)
                logger.debug("Clamped mask values to valid range")

        with torch.no_grad():
            preds = self.model.predictions(images)
        batch_metrics = self.compute_metrics(targets, preds)
        all_metrics.append(batch_metrics)

        # Memory cleanup
        del preds
        if self.device == "cuda":
            torch.cuda.empty_cache()

    logger.info("Clean evaluation complete.")
    return all_metrics
evaluate_attack(loader: DataLoader, attack: Callable) -> list[dict[str, Any]]

Evaluate model on adversarial images for a given attack.

Parameters:

Name Type Description Default
loader DataLoader

DataLoader for the dataset.

required
attack Callable

Attack instance.

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of metric results for each batch.

Source code in segmentation_robustness_framework/pipeline/core.py
def evaluate_attack(self, loader: DataLoader, attack: Callable) -> list[dict[str, Any]]:
    """Evaluate model on adversarial images for a given attack.

    Args:
        loader (DataLoader): DataLoader for the dataset.
        attack (Callable): Attack instance.

    Returns:
        list[dict[str, Any]]: List of metric results for each batch.
    """
    all_metrics = []
    for images, targets in tqdm(loader, desc=f"Attack: {attack}"):
        images = images.to(self.device, non_blocking=True)
        targets = targets.to(self.device, non_blocking=True)

        valid_mask = targets >= 0  # Exclude ignore_index (-1)
        if torch.any(valid_mask):
            max_valid_value = torch.max(targets[valid_mask])
            if max_valid_value >= self.model.num_classes:
                targets = torch.clamp(targets, -1, self.model.num_classes - 1)
                logger.debug("Clamped mask values to valid range")

        adv_images = attack(images, targets)
        if adv_images.device != self.device:
            adv_images = adv_images.to(self.device)

        with torch.no_grad():
            adv_preds = self.model.predictions(adv_images)

        batch_metrics = self.compute_metrics(targets, adv_preds)
        all_metrics.append(batch_metrics)

        # Memory cleanup
        del adv_images, adv_preds
        if self.device == "cuda":
            torch.cuda.empty_cache()

    logger.info(f"Evaluation for attack {attack} complete.")
    return all_metrics
compute_metrics(targets: torch.Tensor | np.ndarray, preds: torch.Tensor | np.ndarray) -> dict[str, Any]

Compute all metrics for a batch.

Parameters:

Name Type Description Default
targets Tensor | ndarray

Ground truth labels.

required
preds Tensor | ndarray

Predicted labels.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Dictionary of metric results.

Source code in segmentation_robustness_framework/pipeline/core.py
def compute_metrics(self, targets: torch.Tensor | np.ndarray, preds: torch.Tensor | np.ndarray) -> dict[str, Any]:
    """Compute all metrics for a batch.

    Args:
        targets (torch.Tensor | np.ndarray): Ground truth labels.
        preds (torch.Tensor | np.ndarray): Predicted labels.

    Returns:
        dict[str, Any]: Dictionary of metric results.
    """
    results = {}
    for i, metric in enumerate(self.metrics):
        metric_name = self.metric_names[i]
        try:
            metric_value = metric(targets, preds)
            if metric_value is not None:
                results[metric_name] = round(float(metric_value), self.metric_precision)
            else:
                results[metric_name] = None
        except Exception as e:
            logger.error(f"Metric {metric_name} failed: {e}")
            results[metric_name] = None
    return results
save_results(metrics: list[dict[str, Any]], name: str) -> None

Save detailed batch metrics to disk.

Parameters:

Name Type Description Default
metrics list[dict[str, Any]]

List of metric results for each batch.

required
name str

Name for the result set (e.g., 'clean', 'attack_FGSM').

required
Source code in segmentation_robustness_framework/pipeline/core.py
def save_results(self, metrics: list[dict[str, Any]], name: str) -> None:
    """Save detailed batch metrics to disk.

    Args:
        metrics (list[dict[str, Any]]): List of metric results for each batch.
        name (str): Name for the result set (e.g., 'clean', 'attack_FGSM').
    """
    saved_files = []

    if "json" in self.output_formats:
        results_file = Path(self.output_dir) / f"{name}_detailed.json"
        with open(results_file, "w") as f:
            json.dump(metrics, f, indent=2, default=str)
        saved_files.append(str(results_file))

    if "csv" in self.output_formats:
        csv_file = Path(self.output_dir) / f"{name}_detailed.csv"
        df = pd.DataFrame(metrics)
        df.to_csv(csv_file, index=False)
        saved_files.append(str(csv_file))

    logger.info(f"Detailed results for {name} saved to: {', '.join(saved_files)}")

Functions

Modules


segmentation_robustness_framework.pipeline.config

Classes

PipelineConfig(config: dict[str, Any])

Configuration parser and pipeline factory for segmentation robustness evaluation.

This class loads configuration from YAML/JSON files and creates ready-to-run segmentation robustness pipelines.

Configuration File Structure:

# Model configuration
model:
  type: "torchvision"  # torchvision, smp, huggingface, custom
  config:
    name: "deeplabv3_resnet50"
    num_classes: 21
  weights_path: null  # optional
  weight_type: "full"  # full or encoder
  adapter: null  # optional custom adapter class

# Dataset configuration
dataset:
  name: "ade20k"
  root: null  # will use cache directory
  split: "val"
  image_shape: [256, 256]
  download: true

# Attack configurations
attacks:
  - name: "fgsm"
    eps: 0.02
  - name: "pgd"
    eps: 0.02
    alpha: 0.01
    iters: 10
    targeted: false

# Pipeline configuration
pipeline:
  batch_size: 8
  device: "cuda"
  output_dir: "./runs"
  auto_resize_masks: true
  output_formats: ["json", "csv"]

# Metrics configuration
metrics:
  ignore_index: 255
  selected_metrics:
    - "mean_iou"
    - "pixel_accuracy"
    - {"name": "dice_score", "average": "micro"}
    - "name_of_custom_metric"

Usage Examples:

# From YAML file
config = PipelineConfig.from_yaml("config.yaml")
pipeline = config.create_pipeline()
results = pipeline.run(save=True)

# From dictionary
config_dict = {
    "model": {
        "type": "torchvision",
        "config": {"name": "deeplabv3_resnet50", "num_classes": 21}
    },
    "dataset": {
        "name": "voc",
        "root": "./data/VOCdevkit/VOC2012",
        "split": "val",
        "image_shape": [256, 256],
        "download": false
    },
    "attacks": [{"name": "fgsm", "eps": 0.02}],
    "pipeline": {"batch_size": 4, "device": "cuda", "auto_resize_masks": true}
}
config = PipelineConfig.from_dict(config_dict)
pipeline = config.create_pipeline()
results = pipeline.run()

Initialize configuration parser.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary.

required
Source code in segmentation_robustness_framework/pipeline/config.py
def __init__(self, config: dict[str, Any]):
    """Initialize configuration parser.

    Args:
        config (dict[str, Any]): Configuration dictionary.
    """
    self.config = config
    self._validate_config()

Functions

from_yaml(config_path: Union[str, Path]) -> PipelineConfig classmethod

Create configuration from YAML file.

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML configuration file.

required

Returns:

Name Type Description
PipelineConfig PipelineConfig

Configuration instance.

Raises:

Type Description
FileNotFoundError

If config file doesn't exist.

YAMLError

If YAML is malformed.

Source code in segmentation_robustness_framework/pipeline/config.py
@classmethod
def from_yaml(cls, config_path: Union[str, Path]) -> "PipelineConfig":
    """Create configuration from YAML file.

    Args:
        config_path (str | Path): Path to YAML configuration file.

    Returns:
        PipelineConfig: Configuration instance.

    Raises:
        FileNotFoundError: If config file doesn't exist.
        yaml.YAMLError: If YAML is malformed.
    """
    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    with open(config_path) as f:
        config = yaml.safe_load(f)

    return cls(config)
from_json(config_path: Union[str, Path]) -> PipelineConfig classmethod

Create configuration from JSON file.

Parameters:

Name Type Description Default
config_path str | Path

Path to JSON configuration file.

required

Returns:

Name Type Description
PipelineConfig PipelineConfig

Configuration instance.

Raises:

Type Description
FileNotFoundError

If config file doesn't exist.

JSONDecodeError

If JSON is malformed.

Source code in segmentation_robustness_framework/pipeline/config.py
@classmethod
def from_json(cls, config_path: Union[str, Path]) -> "PipelineConfig":
    """Create configuration from JSON file.

    Args:
        config_path (str | Path): Path to JSON configuration file.

    Returns:
        PipelineConfig: Configuration instance.

    Raises:
        FileNotFoundError: If config file doesn't exist.
        json.JSONDecodeError: If JSON is malformed.
    """
    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    with open(config_path) as f:
        config = json.load(f)

    return cls(config)
from_dict(config: dict[str, Any]) -> PipelineConfig classmethod

Create configuration from dictionary.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary.

required

Returns:

Name Type Description
PipelineConfig PipelineConfig

Configuration instance.

Source code in segmentation_robustness_framework/pipeline/config.py
@classmethod
def from_dict(cls, config: dict[str, Any]) -> "PipelineConfig":
    """Create configuration from dictionary.

    Args:
        config (dict[str, Any]): Configuration dictionary.

    Returns:
        PipelineConfig: Configuration instance.
    """
    return cls(config)
create_pipeline() -> SegmentationRobustnessPipeline

Create and configure a segmentation robustness pipeline.

Returns:

Name Type Description
SegmentationRobustnessPipeline SegmentationRobustnessPipeline

Configured pipeline ready to run.

Source code in segmentation_robustness_framework/pipeline/config.py
def create_pipeline(self) -> SegmentationRobustnessPipeline:
    """Create and configure a segmentation robustness pipeline.

    Returns:
        SegmentationRobustnessPipeline: Configured pipeline ready to run.
    """
    logger.info("Creating pipeline from configuration...")

    model = self._load_model()
    logger.info(f"Loaded model: {type(model).__name__}")

    device = self.config["pipeline"].get("device", "cpu")
    model.to(device)
    model.eval()

    dataset = self._load_dataset()
    logger.info(f"Loaded dataset: {type(dataset).__name__}")

    attacks = self._load_attacks(model)
    logger.info(f"Loaded {len(attacks)} attack instances")

    metrics, metric_names = self._load_metrics(dataset)
    logger.info(f"Loaded {len(metrics)} metrics: {', '.join(metric_names)}")

    pipeline_config = self.config["pipeline"]

    pipeline = SegmentationRobustnessPipeline(
        model=model,
        dataset=dataset,
        attacks=attacks,
        metrics=metrics,
        batch_size=pipeline_config.get("batch_size", 8),
        device=pipeline_config.get("device", "cpu"),
        output_dir=pipeline_config.get("output_dir", "./runs"),
        auto_resize_masks=pipeline_config.get("auto_resize_masks", True),
        metric_names=metric_names,
        output_formats=pipeline_config.get("output_formats", ["json"]),
        metric_precision=pipeline_config.get("metric_precision", 4),
        num_workers=pipeline_config.get("num_workers", 0),
        pin_memory=pipeline_config.get("pin_memory", False),
        persistent_workers=pipeline_config.get("persistent_workers", False),
    )

    logger.info("Pipeline created successfully")
    return pipeline
run_pipeline(save: bool = True, show: bool = False) -> dict[str, Any]

Create and run the pipeline.

Parameters:

Name Type Description Default
save bool

Whether to save results. Defaults to True.

True
show bool

Whether to show visualizations. Defaults to False.

False

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Pipeline results.

Source code in segmentation_robustness_framework/pipeline/config.py
def run_pipeline(self, save: bool = True, show: bool = False) -> dict[str, Any]:
    """Create and run the pipeline.

    Args:
        save (bool): Whether to save results. Defaults to True.
        show (bool): Whether to show visualizations. Defaults to False.

    Returns:
        dict[str, Any]: Pipeline results.
    """
    pipeline = self.create_pipeline()
    return pipeline.run(save=save, show=show)
get_config_summary() -> dict[str, Any]

Get a summary of the configuration.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Configuration summary.

Source code in segmentation_robustness_framework/pipeline/config.py
def get_config_summary(self) -> dict[str, Any]:
    """Get a summary of the configuration.

    Returns:
        dict[str, Any]: Configuration summary.
    """
    return {
        "model": {
            "type": self.config["model"]["type"],
            "config": self.config["model"]["config"],
        },
        "dataset": {
            "name": self.config["dataset"]["name"],
            "split": self.config["dataset"].get("split"),
            "image_shape": self.config["dataset"]["image_shape"],
        },
        "attacks": [attack["name"] for attack in self.config["attacks"]],
        "pipeline": {
            "batch_size": self.config["pipeline"].get("batch_size", 8),
            "device": self.config["pipeline"].get("device", "cuda"),
            "output_dir": self.config["pipeline"].get("output_dir", "./runs"),
        },
    }

Functions

Pipeline Configuration

The pipeline configuration system allows you to define complete experiments using YAML configuration files.

Configuration Structure

pipeline:
  device: cuda
  batch_size: 4
  output_dir: results
  auto_resize_masks: true
  output_formats: ["json"]

model:
  type: torchvision
  config:
    name: deeplabv3_resnet50
    num_classes: 21

dataset:
  name: voc
  split: val
  root: ./data
  image_shape: [512, 512]
  download: true

attacks:
  - name: fgsm
    eps: 0.02
  - name: pgd
    eps: 0.02
    alpha: 0.02
    iters: 10
    targeted: false

metrics:
  ignore_index: 255
  selected_metrics:
    - mean_iou
    - pixel_accuracy
    - precision
    - recall

Pipeline Execution

The pipeline orchestrates the entire evaluation process:

  1. Model Loading: Loads the specified model with appropriate adapter
  2. Dataset Loading: Loads and preprocesses the dataset
  3. Attack Generation: Creates adversarial examples using specified attacks
  4. Evaluation: Runs both clean and adversarial evaluation
  5. Reporting: Generates comprehensive results and visualizations

Results Structure

The pipeline returns a dictionary with the following structure:

{
    'clean': {
        'mean_iou': 0.823,
        'pixel_accuracy': 0.956,
        'precision': 0.891,
        'recall': 0.845
    },
    'attack_fgsm': {
        'mean_iou': 0.452,
        'pixel_accuracy': 0.723,
        'precision': 0.567,
        'recall': 0.489
    },
    'attack_pgd': {
        'mean_iou': 0.231,
        'pixel_accuracy': 0.456,
        'precision': 0.234,
        'recall': 0.198
    }
}