Parallel Runner#

The parallel runners creates a pool of runners with the pool size equal to the number defined when creating the runner.

Each process in the pool has a process id assigned to it that is between 0-n_jobs. When a job is run using the parallel runner, the process id is passed to the job. The job can use this id to determine process specific logic. For example, if the pool size is equal to the number of GPUs, the job can use the process id to assign a specific GPU to the process.

Parallel execution of pipeline jobs using process pools.

This module provides the ParallelRunner class for executing pipeline jobs in parallel across multiple processes. It uses Python’s ProcessPoolExecutor to manage a pool of worker processes.

Example

>>> from anomalib.pipelines.components.runners import ParallelRunner
>>> from anomalib.pipelines.components.base import JobGenerator
>>> generator = JobGenerator()
>>> runner = ParallelRunner(generator, n_jobs=4)
>>> results = runner.run({"param": "value"})

The parallel runner handles:

  • Creating and managing a pool of worker processes

  • Distributing jobs across available workers

  • Collecting and combining results from parallel executions

  • Error handling for failed jobs

The number of parallel jobs can be configured based on available compute resources like CPU cores or GPUs.

exception anomalib.pipelines.components.runners.parallel.ParallelExecutionError#

Bases: Exception

Pool execution error should be raised when one or more jobs fail in the pool.

class anomalib.pipelines.components.runners.parallel.ParallelRunner(generator, n_jobs)#

Bases: Runner

Run jobs in parallel using a process pool.

This runner executes jobs concurrently using a pool of worker processes. It manages process creation, job distribution, and result collection.

Parameters:
  • generator (JobGenerator) – Generator that creates jobs to be executed.

  • n_jobs (int) – Number of parallel processes to use.

Example

Create a pool with size matching available GPUs and submit jobs:

>>> from anomalib.pipelines.components.runners import ParallelRunner
>>> from anomalib.pipelines.components.base import JobGenerator
>>> import torch
>>> generator = JobGenerator()
>>> runner = ParallelRunner(generator, n_jobs=torch.cuda.device_count())
>>> results = runner.run({"param": "value"})

Notes

When a job is submitted to the pool, a task_id parameter is passed to the job’s run() method. Jobs can use this ID to manage device assignment:

def run(self, arg1: int, arg2: nn.Module, task_id: int) -> None:
    device = torch.device(f"cuda:{task_id}")
    model = arg2.to(device)
    # ... rest of job logic
The runner handles:
  • Creating and managing worker processes

  • Distributing jobs to available workers

  • Collecting and combining results

  • Error handling for failed jobs

  • Resource cleanup

run(args, prev_stage_results=None)#

Run the job in parallel.

Return type:

Any