Callbacks and helper functions to train in parallel or use distributed training

When using multiple GPUs, you will most probably want to fit using distributed training.

Example use can be found:

To use distributed training, there are only three required steps:

  1. Add with learn.distrib_ctx(): before your learn.fit call
  2. Either config Accelerate yourself by running accelerate config from the command line, or run:
    from accelerate.utils import write_default_config
    write_default_config()
    
  3. Run your training script with accelerate launch scriptname.py ...args...

If you're using untar_data, or may be downloading or uncompressing data or models as part of your script, you should wrap that code with rank0_first, which forces that step to occur first just once on the master process, prior to the remaining processes running it in parallel. E.g. instead of:

path = untar_data(URLs.IMAGEWOOF_320)

...you instead use:

path = rank0_first(untar_data, URLs.IMAGEWOOF_320)

See below for details on the full API and underlying helper functions, if needed -- however, note that you will not need anything except the above unless you need to change how the distributed training is implemented.

Parallel

DataParallel.reset[source]

DataParallel.reset()

Patch required reset call into DataParallel

class ParallelTrainer[source]

ParallelTrainer(device_ids) :: Callback

Wrap a model DataParallel automatically

Learner.to_parallel[source]

Learner.to_parallel(device_ids=None)

Add ParallelTrainer callback to a Learner

Learner.detach_parallel[source]

Learner.detach_parallel()

Remove ParallelTrainer callback from a Learner

Learner.parallel_ctx[source]

Learner.parallel_ctx(device_ids=None)

A context manager to adapt a learner to train in data parallel mode.

Distributed

Helper functions

DistributedDataParallel.reset[source]

DistributedDataParallel.reset()

Patch required reset call into DistributedDataParallel

setup_distrib[source]

setup_distrib(gpu=None)

Setup this process to participate in distributed training

teardown_distrib[source]

teardown_distrib()

Free distributed training resources

DataLoader

class DistributedDL[source]

DistributedDL(dl, rank=None, world_size=None) :: TfmdDL

A TfmdDL which splits a batch into equal size pieces for each worker

dl = TfmdDL(list(range(50)), bs=12, num_workers=2)
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    test_eq(list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50))

class DistributedTrainer[source]

DistributedTrainer(sync_bn=True, device_placement:bool=True, split_batches:bool=False, cpu:bool=False, deepspeed_plugin:DeepSpeedPlugin=None, fsdp_plugin:FullyShardedDataParallelPlugin=None, rng_types:Optional[List[typing.Union[str, accelerate.utils.dataclasses.RNGType]]]=None, dispatch_batches:Optional[bool]=None, kwargs_handlers:Optional[List[KwargsHandler]]=None) :: Callback

Wrap model in DistributedDataParallel and dls in DistributedDL

Type Default Details
sync_bn bool True Whether to replace all batch norm with nn.SyncBatchNorm
Valid Keyword Arguments
device_placement bool True Argument passed to Accelerator.__init__
split_batches bool False Argument passed to Accelerator.__init__
cpu bool False Argument passed to Accelerator.__init__
deepspeed_plugin DeepSpeedPlugin None Argument passed to Accelerator.__init__
fsdp_plugin FullyShardedDataParallelPlugin None Argument passed to Accelerator.__init__
rng_types typing.Optional[typing.List[typing.Union[str, accelerate.utils.dataclasses.RNGType]]] None Argument passed to Accelerator.__init__
dispatch_batches typing.Optional[bool] None Argument passed to Accelerator.__init__
kwargs_handlers typing.Optional[typing.List[accelerate.utils.dataclasses.KwargsHandler]] None Argument passed to Accelerator.__init__

Learner.to_distributed[source]

Learner.to_distributed(sync_bn=True, device_placement:bool=True, split_batches:bool=False, cpu:bool=False, deepspeed_plugin:DeepSpeedPlugin=None, fsdp_plugin:FullyShardedDataParallelPlugin=None, rng_types:Optional[List[typing.Union[str, accelerate.utils.dataclasses.RNGType]]]=None, dispatch_batches:Optional[bool]=None, kwargs_handlers:Optional[List[KwargsHandler]]=None)

Add AcceleratedTrainer to a learner, and configures an Accelerator

Type Default Details
sync_bn bool True Whether to replace all batch norm with nn.SyncBatchNorm
Valid Keyword Arguments
device_placement bool True Argument passed to Accelerator.__init__
split_batches bool False Argument passed to Accelerator.__init__
cpu bool False Argument passed to Accelerator.__init__
deepspeed_plugin DeepSpeedPlugin None Argument passed to Accelerator.__init__
fsdp_plugin FullyShardedDataParallelPlugin None Argument passed to Accelerator.__init__
rng_types typing.Optional[typing.List[typing.Union[str, accelerate.utils.dataclasses.RNGType]]] None Argument passed to Accelerator.__init__
dispatch_batches typing.Optional[bool] None Argument passed to Accelerator.__init__
kwargs_handlers typing.Optional[typing.List[accelerate.utils.dataclasses.KwargsHandler]] None Argument passed to Accelerator.__init__

Learner.detach_distributed[source]

Learner.detach_distributed()

Remove DistributedTrainer from a learner

distrib_ctx context manager

Learner.distrib_ctx[source]

Learner.distrib_ctx(sync_bn=True, in_notebook=False, device_placement:bool=True, split_batches:bool=False, cpu:bool=False, deepspeed_plugin:DeepSpeedPlugin=None, fsdp_plugin:FullyShardedDataParallelPlugin=None, rng_types:Optional[List[typing.Union[str, accelerate.utils.dataclasses.RNGType]]]=None, dispatch_batches:Optional[bool]=None, kwargs_handlers:Optional[List[KwargsHandler]]=None)

A context manager to adapt a learner to train in distributed data parallel mode.

Type Default Details
sync_bn bool True Whether to replace all batch norm with nn.SyncBatchNorm
in_notebook bool False Whether we are launching from a notebook or not
Valid Keyword Arguments
device_placement bool True Argument passed to Accelerator.__init__
split_batches bool False Argument passed to Accelerator.__init__
cpu bool False Argument passed to Accelerator.__init__
deepspeed_plugin DeepSpeedPlugin None Argument passed to Accelerator.__init__
fsdp_plugin FullyShardedDataParallelPlugin None Argument passed to Accelerator.__init__
rng_types typing.Optional[typing.List[typing.Union[str, accelerate.utils.dataclasses.RNGType]]] None Argument passed to Accelerator.__init__
dispatch_batches typing.Optional[bool] None Argument passed to Accelerator.__init__
kwargs_handlers typing.Optional[typing.List[accelerate.utils.dataclasses.KwargsHandler]] None Argument passed to Accelerator.__init__

distrib_ctx prepares a learner to train in distributed data parallel mode. It assumes the script/code will either be ran through the command line via accelerate launch or through the notebook_launcher function from Accelerate. It also assumes that accelerate has been configured through either running write_basic_config() or calling accelerate config through the CLI and answering the prompts.

Typical usage:

with learn.distrib_ctx(): learn.fit(.....)

It attaches a DistributedTrainer callback and DistributedDL data loader to the learner, then executes learn.fit(.....). Upon exiting the context, it removes the DistributedTrainer and DistributedDL, and destroys any locally created distributed process group. The process is still attached to the GPU though.

rank0_first[source]

rank0_first(func, *args, **kwargs)

Execute func in the Rank-0 process first, then in other ranks in parallel.

rank0_first calls f() in rank-0 process first, then in parallel on the rest, in distributed training mode. In single process, non-distributed training mode, f() is called only once as expected.

One application of rank0_first() is to make fresh downloads via untar_data safe in distributed training scripts launched by python -m fastai.launch <script>:

path = untar_data(URLs.IMDB)

becomes:

path = rank0_first(lambda: untar_data(URLs.IMDB))

Some learner factory methods may use untar_data to download pretrained models:

learn = text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy)

becomes:

learn = rank0_first(lambda: text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy))

Otherwise, multiple processes will download at the same time and corrupt the data.

Notebook Launcher

Accelerate provides a notebook_launcher functionality to let you keep using your Jupyter Notebook as you would, but train in a distributed setup!

First, make sure accelerate is properly configured. You can either run accelerate config from the command line, or have an autofilled configuration setup by running in the first cell of your notebook:

from accelerate.utils import write_basic_config
write_basic_config()

After Accelerate is configured, to utilize the notebook_launcher functionality migrate your training into a function, and pass this to notebook_launcher, such as:

---
from fastai.vision.all import *
from fastai.accelerate import *

set_seed(99, True)
path = untar_data(URLs.PETS)/'images'
dls = ImageDataLoaders.from_name_func(
    path, get_image_files(path), valid_pct=0.2,
    label_func=lambda x: x[0].isupper(), item_tfms=Resize(224))

learn = vision_learner(dls, resnet34, metrics=error_rate).to_fp16()

def train():
    with learn.accelerate_ctx(in_notebook=True):
        learn.fine_tune(1)
---
from accelerate import notebook_launcher
notebook_launcher(train, num_processes=2)
---