Distributed training

Callbacks and helper functions to train in parallel or use distributed training

When using multiple GPUs, you will most probably want to fit using distributed training.

Example use can be found:

To use distributed training, there are only three required steps:

  1. Add with learn.distrib_ctx(): before your learn.fit call
  2. Either config Accelerate yourself by running accelerate config from the command line, or run:
from accelerate.utils import write_basic_config
write_basic_config()
  1. Run your training script with accelerate launch scriptname.py ...args...

If you’re using untar_data, or may be downloading or uncompressing data or models as part of your script, you should wrap that code with rank0_first, which forces that step to occur first just once on the master process, prior to the remaining processes running it in parallel. E.g. instead of:

path = untar_data(URLs.IMAGEWOOF_320)

…you instead use:

path = rank0_first(untar_data, URLs.IMAGEWOOF_320)

See below for details on the full API and underlying helper functions, if needed – however, note that you will not need anything except the above unless you need to change how the distributed training is implemented.

Parallel


source

DataParallel.reset

 DataParallel.reset ()

Patch required reset call into DataParallel


source

ParallelTrainer

 ParallelTrainer (device_ids)

Wrap a model DataParallel automatically


source

Learner.to_parallel

 Learner.to_parallel (device_ids=None)

Add ParallelTrainer callback to a Learner


source

Learner.detach_parallel

 Learner.detach_parallel ()

Remove ParallelTrainer callback from a Learner


parallel_ctx

 parallel_ctx (device_ids=None)

A context manager to adapt a learner to train in data parallel mode.

Distributed

Helper functions


source

DistributedDataParallel.reset

 DistributedDataParallel.reset ()

Patch required reset call into DistributedDataParallel


source

setup_distrib

 setup_distrib (gpu=None)

Setup this process to participate in distributed training


source

teardown_distrib

 teardown_distrib ()

Free distributed training resources

DataLoader


source

DistributedDL

 DistributedDL (dl, rank=None, world_size=None, device=None)

A TfmdDL which splits a batch into equal size pieces for each worker

dl = TfmdDL(list(range(50)), bs=12, num_workers=2)
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    test_eq(list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50))

source

DistributedTrainer

 DistributedTrainer (sync_bn=True, device_placement:bool=True,
                     split_batches:bool=<object object at 0x7f95aa4539b0>,
                     gradient_accumulation_steps:int=1, cpu:bool=False,
                     dataloader_config:DataLoaderConfiguration|None=None, 
                     deepspeed_plugin:DeepSpeedPlugin|dict[str,DeepSpeedPl
                     ugin]|None=None,
                     fsdp_plugin:FullyShardedDataParallelPlugin|None=None,
                     megatron_lm_plugin:MegatronLMPlugin|None=None,
                     rng_types:list[str|RNGType]|None=None,
                     project_dir:str|os.PathLike|None=None,
                     project_config:ProjectConfiguration|None=None, gradie
                     nt_accumulation_plugin:GradientAccumulationPlugin|Non
                     e=None,
                     kwargs_handlers:list[KwargsHandler]|None=None,
                     dynamo_backend:DynamoBackend|str|None=None, deepspeed
                     _plugins:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|No
                     ne=None)

Wrap model in DistributedDataParallel and dls in DistributedDL

Type Default Details
sync_bn bool True Whether to replace all batch norm with nn.SyncBatchNorm
device_placement bool True
split_batches bool <object object at 0x7f95aa4539b0>
gradient_accumulation_steps int 1
cpu bool False
dataloader_config DataLoaderConfiguration | None None
deepspeed_plugin DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None None
fsdp_plugin FullyShardedDataParallelPlugin | None None
megatron_lm_plugin MegatronLMPlugin | None None
rng_types list[str | RNGType] | None None
project_dir str | os.PathLike | None None
project_config ProjectConfiguration | None None
gradient_accumulation_plugin GradientAccumulationPlugin | None None
kwargs_handlers list[KwargsHandler] | None None
dynamo_backend DynamoBackend | str | None None
deepspeed_plugins DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None None

source

Learner.to_distributed

 Learner.to_distributed (sync_bn=True, device_placement:bool=True,
                         split_batches:bool=<object object at
                         0x7f95aa4539b0>,
                         gradient_accumulation_steps:int=1,
                         cpu:bool=False, dataloader_config:DataLoaderConfi
                         guration|None=None, deepspeed_plugin:DeepSpeedPlu
                         gin|dict[str,DeepSpeedPlugin]|None=None, fsdp_plu
                         gin:FullyShardedDataParallelPlugin|None=None,
                         megatron_lm_plugin:MegatronLMPlugin|None=None,
                         rng_types:list[str|RNGType]|None=None,
                         project_dir:str|os.PathLike|None=None,
                         project_config:ProjectConfiguration|None=None, gr
                         adient_accumulation_plugin:GradientAccumulationPl
                         ugin|None=None,
                         kwargs_handlers:list[KwargsHandler]|None=None,
                         dynamo_backend:DynamoBackend|str|None=None, deeps
                         peed_plugins:DeepSpeedPlugin|dict[str,DeepSpeedPl
                         ugin]|None=None)

Add AcceleratedTrainer to a learner, and configures an Accelerator

Type Default Details
sync_bn bool True Whether to replace all batch norm with nn.SyncBatchNorm
device_placement bool True
split_batches bool <object object at 0x7f95aa4539b0>
gradient_accumulation_steps int 1
cpu bool False
dataloader_config DataLoaderConfiguration | None None
deepspeed_plugin DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None None
fsdp_plugin FullyShardedDataParallelPlugin | None None
megatron_lm_plugin MegatronLMPlugin | None None
rng_types list[str | RNGType] | None None
project_dir str | os.PathLike | None None
project_config ProjectConfiguration | None None
gradient_accumulation_plugin GradientAccumulationPlugin | None None
kwargs_handlers list[KwargsHandler] | None None
dynamo_backend DynamoBackend | str | None None
deepspeed_plugins DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None None

source

Learner.detach_distributed

 Learner.detach_distributed ()

Remove DistributedTrainer from a learner

distrib_ctx context manager


distrib_ctx

 distrib_ctx (sync_bn=True, in_notebook=False, device_placement:bool=True,
              split_batches:bool=<object object at 0x7f95aa4539b0>,
              gradient_accumulation_steps:int=1, cpu:bool=False,
              dataloader_config:DataLoaderConfiguration|None=None, deepspe
              ed_plugin:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|None=Non
              e, fsdp_plugin:FullyShardedDataParallelPlugin|None=None,
              megatron_lm_plugin:MegatronLMPlugin|None=None,
              rng_types:list[str|RNGType]|None=None,
              project_dir:str|os.PathLike|None=None,
              project_config:ProjectConfiguration|None=None, gradient_accu
              mulation_plugin:GradientAccumulationPlugin|None=None,
              kwargs_handlers:list[KwargsHandler]|None=None,
              dynamo_backend:DynamoBackend|str|None=None, deepspeed_plugin
              s:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|None=None)

A context manager to adapt a learner to train in distributed data parallel mode.

Type Default Details
sync_bn bool True Whether to replace all batch norm with nn.SyncBatchNorm
in_notebook bool False Whether we are launching from a notebook or not
device_placement bool True
split_batches bool <object object at 0x7f95aa4539b0>
gradient_accumulation_steps int 1
cpu bool False
dataloader_config DataLoaderConfiguration | None None
deepspeed_plugin DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None None
fsdp_plugin FullyShardedDataParallelPlugin | None None
megatron_lm_plugin MegatronLMPlugin | None None
rng_types list[str | RNGType] | None None
project_dir str | os.PathLike | None None
project_config ProjectConfiguration | None None
gradient_accumulation_plugin GradientAccumulationPlugin | None None
kwargs_handlers list[KwargsHandler] | None None
dynamo_backend DynamoBackend | str | None None
deepspeed_plugins DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None None

distrib_ctx prepares a learner to train in distributed data parallel mode. It assumes the script/code will either be ran through the command line via accelerate launch or through the notebook_launcher function from Accelerate. It also assumes that accelerate has been configured through either running write_basic_config() or calling accelerate config through the CLI and answering the prompts.

Typical usage:

with learn.distrib_ctx(): learn.fit(.....)

It attaches a DistributedTrainer callback and DistributedDL data loader to the learner, then executes learn.fit(.....). Upon exiting the context, it removes the DistributedTrainer and DistributedDL, and destroys any locally created distributed process group. The process is still attached to the GPU though.


source

rank0_first

 rank0_first (func, *args, **kwargs)

Execute func in the Rank-0 process first, then in other ranks in parallel.

rank0_first calls f() in rank-0 process first, then in parallel on the rest, in distributed training mode. In single process, non-distributed training mode, f() is called only once as expected.

One application of rank0_first() is to make fresh downloads via untar_data safe in distributed training scripts launched by python -m fastai.launch <script>:

path = untar_data(URLs.IMDB)

becomes:

path = rank0_first(lambda: untar_data(URLs.IMDB))

Some learner factory methods may use untar_data to download pretrained models:

learn = text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy)

becomes:

learn = rank0_first(lambda: text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy))

Otherwise, multiple processes will download at the same time and corrupt the data.

Notebook Launcher

Accelerate provides a notebook_launcher functionality to let you keep using your Jupyter Notebook as you would, but train in a distributed setup!

First, make sure accelerate is properly configured. You can either run accelerate config from the command line, or have an autofilled configuration setup by running in the first cell of your notebook:

from accelerate.utils import write_basic_config
write_basic_config()

After Accelerate is configured, to utilize the notebook_launcher functionality migrate your training into a function, and pass this to notebook_launcher, such as:

---
from fastai.vision.all import *
from fastai.distributed import *

def train():
    set_seed(99, True)
    path = untar_data(URLs.PETS)/'images'
    dls = ImageDataLoaders.from_name_func(
        path, get_image_files(path), valid_pct=0.2,
        label_func=lambda x: x[0].isupper(), item_tfms=Resize(224))
    
    learn = vision_learner(dls, resnet34, metrics=error_rate).to_fp16()
    with learn.distrib_ctx(in_notebook=True):
        learn.fine_tune(1)
---
from accelerate import notebook_launcher
notebook_launcher(train, num_processes=2)
---