Distributed training

Callbacks and helper functions to train in parallel or use distributed training

When using multiple GPUs, you will most probably want to fit using distributed training.

Example use can be found:

To use distributed training, there are only three required steps:

  1. Add with learn.distrib_ctx(): before your learn.fit call
  2. Either config Accelerate yourself by running accelerate config from the command line, or run:
from accelerate.utils import write_basic_config
write_basic_config()
  1. Run your training script with accelerate launch scriptname.py ...args...

If you’re using untar_data, or may be downloading or uncompressing data or models as part of your script, you should wrap that code with rank0_first, which forces that step to occur first just once on the master process, prior to the remaining processes running it in parallel. E.g. instead of:

path = untar_data(URLs.IMAGEWOOF_320)

…you instead use:

path = rank0_first(untar_data, URLs.IMAGEWOOF_320)

See below for details on the full API and underlying helper functions, if needed – however, note that you will not need anything except the above unless you need to change how the distributed training is implemented.

Parallel


source

DataParallel.reset


def reset(
    
):

Patch required reset call into DataParallel


source

ParallelTrainer


def ParallelTrainer(
    device_ids
):

Wrap a model DataParallel automatically


source

Learner.to_parallel


def to_parallel(
    device_ids:NoneType=None
):

Add ParallelTrainer callback to a Learner


source

Learner.detach_parallel


def detach_parallel(
    
):

Remove ParallelTrainer callback from a Learner


parallel_ctx


def parallel_ctx(
    device_ids:NoneType=None
):

A context manager to adapt a learner to train in data parallel mode.

Distributed

Helper functions


source

DistributedDataParallel.reset


def reset(
    
):

Patch required reset call into DistributedDataParallel


source

setup_distrib


def setup_distrib(
    gpu:NoneType=None
):

Setup this process to participate in distributed training


source

teardown_distrib


def teardown_distrib(
    
):

Free distributed training resources

DataLoader


source

DistributedDL


def DistributedDL(
    dl, rank:NoneType=None, world_size:NoneType=None, device:NoneType=None
):

A TfmdDL which splits a batch into equal size pieces for each worker

dl = TfmdDL(list(range(50)), bs=12, num_workers=2)
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    test_eq(list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50))

source

DistributedTrainer


def DistributedTrainer(
    sync_bn:bool=True, # Whether to replace all batch norm with `nn.SyncBatchNorm`
    device_placement:bool=True, split_batches:bool=<object object at 0x7fb5e5d9d0e0>,
    gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfiguration | None=None,
    deepspeed_plugin:DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None=None,
    fsdp_plugin:FullyShardedDataParallelPlugin | None=None,
    torch_tp_plugin:TorchTensorParallelPlugin | None=None, # Deprecate later, warning in `post_init`
    megatron_lm_plugin:MegatronLMPlugin | None=None, rng_types:list[str | RNGType] | None=None,
    project_dir:str | os.PathLike | None=None, project_config:ProjectConfiguration | None=None,
    gradient_accumulation_plugin:GradientAccumulationPlugin | None=None,
    kwargs_handlers:list[KwargsHandler] | None=None, dynamo_backend:DynamoBackend | str | None=None,
    dynamo_plugin:TorchDynamoPlugin | None=None,
    deepspeed_plugins:DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None=None,
    parallelism_config:ParallelismConfig | None=None
):

Wrap model in DistributedDataParallel and dls in DistributedDL


source

Learner.to_distributed


def to_distributed(
    sync_bn:bool=True, # Whether to replace all batch norm with `nn.SyncBatchNorm`
    device_placement:bool=True, split_batches:bool=<object object at 0x7fb5e5d9d0e0>,
    gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfiguration | None=None,
    deepspeed_plugin:DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None=None,
    fsdp_plugin:FullyShardedDataParallelPlugin | None=None,
    torch_tp_plugin:TorchTensorParallelPlugin | None=None, # Deprecate later, warning in `post_init`
    megatron_lm_plugin:MegatronLMPlugin | None=None, rng_types:list[str | RNGType] | None=None,
    project_dir:str | os.PathLike | None=None, project_config:ProjectConfiguration | None=None,
    gradient_accumulation_plugin:GradientAccumulationPlugin | None=None,
    kwargs_handlers:list[KwargsHandler] | None=None, dynamo_backend:DynamoBackend | str | None=None,
    dynamo_plugin:TorchDynamoPlugin | None=None,
    deepspeed_plugins:DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None=None,
    parallelism_config:ParallelismConfig | None=None
):

Add AcceleratedTrainer to a learner, and configures an Accelerator


source

Learner.detach_distributed


def detach_distributed(
    
):

Remove DistributedTrainer from a learner

distrib_ctx context manager


source

Learner.distrib_ctx


def distrib_ctx(
    sync_bn:bool=True, # Whether to replace all batch norm with `nn.SyncBatchNorm`
    in_notebook:bool=False, # Whether we are launching from a notebook or not
    device_placement:bool=True, split_batches:bool=<object object at 0x7fb5e5d9d0e0>,
    gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfiguration | None=None,
    deepspeed_plugin:DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None=None,
    fsdp_plugin:FullyShardedDataParallelPlugin | None=None,
    torch_tp_plugin:TorchTensorParallelPlugin | None=None, # Deprecate later, warning in `post_init`
    megatron_lm_plugin:MegatronLMPlugin | None=None, rng_types:list[str | RNGType] | None=None,
    project_dir:str | os.PathLike | None=None, project_config:ProjectConfiguration | None=None,
    gradient_accumulation_plugin:GradientAccumulationPlugin | None=None,
    kwargs_handlers:list[KwargsHandler] | None=None, dynamo_backend:DynamoBackend | str | None=None,
    dynamo_plugin:TorchDynamoPlugin | None=None,
    deepspeed_plugins:DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None=None,
    parallelism_config:ParallelismConfig | None=None
):

A context manager to adapt a learner to train in distributed data parallel mode.

distrib_ctx prepares a learner to train in distributed data parallel mode. It assumes the script/code will either be ran through the command line via accelerate launch or through the notebook_launcher function from Accelerate. It also assumes that accelerate has been configured through either running write_basic_config() or calling accelerate config through the CLI and answering the prompts.

Typical usage:

with learn.distrib_ctx(): learn.fit(.....)

It attaches a DistributedTrainer callback and DistributedDL data loader to the learner, then executes learn.fit(.....). Upon exiting the context, it removes the DistributedTrainer and DistributedDL, and destroys any locally created distributed process group. The process is still attached to the GPU though.


source

rank0_first


def rank0_first(
    func, args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Execute func in the Rank-0 process first, then in other ranks in parallel.

rank0_first calls f() in rank-0 process first, then in parallel on the rest, in distributed training mode. In single process, non-distributed training mode, f() is called only once as expected.

One application of rank0_first() is to make fresh downloads via untar_data safe in distributed training scripts launched by python -m fastai.launch <script>:

path = untar_data(URLs.IMDB)

becomes:

path = rank0_first(lambda: untar_data(URLs.IMDB))

Some learner factory methods may use untar_data to download pretrained models:

learn = text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy)

becomes:

learn = rank0_first(lambda: text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy))

Otherwise, multiple processes will download at the same time and corrupt the data.

Notebook Launcher

Accelerate provides a notebook_launcher functionality to let you keep using your Jupyter Notebook as you would, but train in a distributed setup!

First, make sure accelerate is properly configured. You can either run accelerate config from the command line, or have an autofilled configuration setup by running in the first cell of your notebook:

from accelerate.utils import write_basic_config
write_basic_config()

After Accelerate is configured, to utilize the notebook_launcher functionality migrate your training into a function, and pass this to notebook_launcher, such as:

---
from fastai.vision.all import *
from fastai.distributed import *

def train():
    set_seed(99, True)
    path = untar_data(URLs.PETS)/'images'
    dls = ImageDataLoaders.from_name_func(
        path, get_image_files(path), valid_pct=0.2,
        label_func=lambda x: x[0].isupper(), item_tfms=Resize(224))
    
    learn = vision_learner(dls, resnet34, metrics=error_rate).to_fp16()
    with learn.distrib_ctx(in_notebook=True):
        learn.fine_tune(1)
---
from accelerate import notebook_launcher
notebook_launcher(train, num_processes=2)
---