Parallel Runner#
The parallel runners creates a pool of runners with the pool size equal to the number defined when creating the runner.
Each process in the pool has a process id assigned to it that is between 0-n_jobs
. When a job is run using the parallel runner, the process id is passed to the job. The job can use this id to determine process specific logic. For example, if the pool size is equal to the number of GPUs, the job can use the process id to assign a specific GPU to the process.
Parallel execution of pipeline jobs using process pools.
This module provides the ParallelRunner
class for executing pipeline jobs in
parallel across multiple processes. It uses Python’s ProcessPoolExecutor
to
manage a pool of worker processes.
Example
>>> from anomalib.pipelines.components.runners import ParallelRunner
>>> from anomalib.pipelines.components.base import JobGenerator
>>> generator = JobGenerator()
>>> runner = ParallelRunner(generator, n_jobs=4)
>>> results = runner.run({"param": "value"})
The parallel runner handles:
Creating and managing a pool of worker processes
Distributing jobs across available workers
Collecting and combining results from parallel executions
Error handling for failed jobs
The number of parallel jobs can be configured based on available compute resources like CPU cores or GPUs.
- exception anomalib.pipelines.components.runners.parallel.ParallelExecutionError#
Bases:
Exception
Pool execution error should be raised when one or more jobs fail in the pool.
- class anomalib.pipelines.components.runners.parallel.ParallelRunner(generator, n_jobs)#
Bases:
Runner
Run jobs in parallel using a process pool.
This runner executes jobs concurrently using a pool of worker processes. It manages process creation, job distribution, and result collection.
- Parameters:
generator (JobGenerator) – Generator that creates jobs to be executed.
n_jobs (int) – Number of parallel processes to use.
Example
Create a pool with size matching available GPUs and submit jobs:
>>> from anomalib.pipelines.components.runners import ParallelRunner >>> from anomalib.pipelines.components.base import JobGenerator >>> import torch >>> generator = JobGenerator() >>> runner = ParallelRunner(generator, n_jobs=torch.cuda.device_count()) >>> results = runner.run({"param": "value"})
Notes
When a job is submitted to the pool, a
task_id
parameter is passed to the job’srun()
method. Jobs can use this ID to manage device assignment:def run(self, arg1: int, arg2: nn.Module, task_id: int) -> None: device = torch.device(f"cuda:{task_id}") model = arg2.to(device) # ... rest of job logic
- The runner handles:
Creating and managing worker processes
Distributing jobs to available workers
Collecting and combining results
Error handling for failed jobs
Resource cleanup