= TfmdDL(list(range(50)), bs=12, num_workers=2)
dl for i in range(4):
= DistributedDL(dl, i, 4)
dl1 list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50)) test_eq(
Distributed training
When using multiple GPUs, you will most probably want to fit using distributed training.
Example use can be found:
- In the form of a script with examples/distrib.py
- Across all the App Examples with the Notebook Launcher
- At the bottom of this notebook for more examples with
notebook_launcher
.
To use distributed training, there are only three required steps:
- Add
with learn.distrib_ctx():
before yourlearn.fit
call - Either config Accelerate yourself by running
accelerate config
from the command line, or run:
from accelerate.utils import write_basic_config
write_basic_config()
- Run your training script with
accelerate launch scriptname.py ...args...
If you’re using untar_data
, or may be downloading or uncompressing data or models as part of your script, you should wrap that code with rank0_first
, which forces that step to occur first just once on the master process, prior to the remaining processes running it in parallel. E.g. instead of:
= untar_data(URLs.IMAGEWOOF_320) path
…you instead use:
= rank0_first(untar_data, URLs.IMAGEWOOF_320) path
See below for details on the full API and underlying helper functions, if needed – however, note that you will not need anything except the above unless you need to change how the distributed training is implemented.
Parallel
DataParallel.reset
DataParallel.reset ()
Patch required reset
call into DataParallel
ParallelTrainer
ParallelTrainer (device_ids)
Wrap a model DataParallel
automatically
Learner.to_parallel
Learner.to_parallel (device_ids=None)
Add ParallelTrainer
callback to a Learner
Learner.detach_parallel
Learner.detach_parallel ()
Remove ParallelTrainer
callback from a Learner
parallel_ctx
parallel_ctx (device_ids=None)
A context manager to adapt a learner to train in data parallel mode.
Distributed
Helper functions
DistributedDataParallel.reset
DistributedDataParallel.reset ()
Patch required reset
call into DistributedDataParallel
setup_distrib
setup_distrib (gpu=None)
Setup this process to participate in distributed training
teardown_distrib
teardown_distrib ()
Free distributed training resources
DataLoader
DistributedDL
DistributedDL (dl, rank=None, world_size=None, device=None)
A TfmdDL
which splits a batch into equal size pieces for each worker
DistributedTrainer
DistributedTrainer (sync_bn=True, device_placement:bool=True, split_batches:bool=<object object at 0x7f95aa4539b0>, gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfiguration|None=None, deepspeed_plugin:DeepSpeedPlugin|dict[str,DeepSpeedPl ugin]|None=None, fsdp_plugin:FullyShardedDataParallelPlugin|None=None, megatron_lm_plugin:MegatronLMPlugin|None=None, rng_types:list[str|RNGType]|None=None, project_dir:str|os.PathLike|None=None, project_config:ProjectConfiguration|None=None, gradie nt_accumulation_plugin:GradientAccumulationPlugin|Non e=None, kwargs_handlers:list[KwargsHandler]|None=None, dynamo_backend:DynamoBackend|str|None=None, deepspeed _plugins:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|No ne=None)
Wrap model
in DistributedDataParallel
and dls
in DistributedDL
Type | Default | Details | |
---|---|---|---|
sync_bn | bool | True | Whether to replace all batch norm with nn.SyncBatchNorm |
device_placement | bool | True | |
split_batches | bool | <object object at 0x7f95aa4539b0> | |
gradient_accumulation_steps | int | 1 | |
cpu | bool | False | |
dataloader_config | DataLoaderConfiguration | None | None | |
deepspeed_plugin | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None | |
fsdp_plugin | FullyShardedDataParallelPlugin | None | None | |
megatron_lm_plugin | MegatronLMPlugin | None | None | |
rng_types | list[str | RNGType] | None | None | |
project_dir | str | os.PathLike | None | None | |
project_config | ProjectConfiguration | None | None | |
gradient_accumulation_plugin | GradientAccumulationPlugin | None | None | |
kwargs_handlers | list[KwargsHandler] | None | None | |
dynamo_backend | DynamoBackend | str | None | None | |
deepspeed_plugins | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None |
Learner.to_distributed
Learner.to_distributed (sync_bn=True, device_placement:bool=True, split_batches:bool=<object object at 0x7f95aa4539b0>, gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfi guration|None=None, deepspeed_plugin:DeepSpeedPlu gin|dict[str,DeepSpeedPlugin]|None=None, fsdp_plu gin:FullyShardedDataParallelPlugin|None=None, megatron_lm_plugin:MegatronLMPlugin|None=None, rng_types:list[str|RNGType]|None=None, project_dir:str|os.PathLike|None=None, project_config:ProjectConfiguration|None=None, gr adient_accumulation_plugin:GradientAccumulationPl ugin|None=None, kwargs_handlers:list[KwargsHandler]|None=None, dynamo_backend:DynamoBackend|str|None=None, deeps peed_plugins:DeepSpeedPlugin|dict[str,DeepSpeedPl ugin]|None=None)
Add AcceleratedTrainer
to a learner, and configures an Accelerator
Type | Default | Details | |
---|---|---|---|
sync_bn | bool | True | Whether to replace all batch norm with nn.SyncBatchNorm |
device_placement | bool | True | |
split_batches | bool | <object object at 0x7f95aa4539b0> | |
gradient_accumulation_steps | int | 1 | |
cpu | bool | False | |
dataloader_config | DataLoaderConfiguration | None | None | |
deepspeed_plugin | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None | |
fsdp_plugin | FullyShardedDataParallelPlugin | None | None | |
megatron_lm_plugin | MegatronLMPlugin | None | None | |
rng_types | list[str | RNGType] | None | None | |
project_dir | str | os.PathLike | None | None | |
project_config | ProjectConfiguration | None | None | |
gradient_accumulation_plugin | GradientAccumulationPlugin | None | None | |
kwargs_handlers | list[KwargsHandler] | None | None | |
dynamo_backend | DynamoBackend | str | None | None | |
deepspeed_plugins | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None |
Learner.detach_distributed
Learner.detach_distributed ()
Remove DistributedTrainer
from a learner
distrib_ctx
context manager
distrib_ctx
distrib_ctx (sync_bn=True, in_notebook=False, device_placement:bool=True, split_batches:bool=<object object at 0x7f95aa4539b0>, gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfiguration|None=None, deepspe ed_plugin:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|None=Non e, fsdp_plugin:FullyShardedDataParallelPlugin|None=None, megatron_lm_plugin:MegatronLMPlugin|None=None, rng_types:list[str|RNGType]|None=None, project_dir:str|os.PathLike|None=None, project_config:ProjectConfiguration|None=None, gradient_accu mulation_plugin:GradientAccumulationPlugin|None=None, kwargs_handlers:list[KwargsHandler]|None=None, dynamo_backend:DynamoBackend|str|None=None, deepspeed_plugin s:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|None=None)
A context manager to adapt a learner to train in distributed data parallel mode.
Type | Default | Details | |
---|---|---|---|
sync_bn | bool | True | Whether to replace all batch norm with nn.SyncBatchNorm |
in_notebook | bool | False | Whether we are launching from a notebook or not |
device_placement | bool | True | |
split_batches | bool | <object object at 0x7f95aa4539b0> | |
gradient_accumulation_steps | int | 1 | |
cpu | bool | False | |
dataloader_config | DataLoaderConfiguration | None | None | |
deepspeed_plugin | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None | |
fsdp_plugin | FullyShardedDataParallelPlugin | None | None | |
megatron_lm_plugin | MegatronLMPlugin | None | None | |
rng_types | list[str | RNGType] | None | None | |
project_dir | str | os.PathLike | None | None | |
project_config | ProjectConfiguration | None | None | |
gradient_accumulation_plugin | GradientAccumulationPlugin | None | None | |
kwargs_handlers | list[KwargsHandler] | None | None | |
dynamo_backend | DynamoBackend | str | None | None | |
deepspeed_plugins | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None |
distrib_ctx
prepares a learner to train in distributed data parallel mode. It assumes the script/code will either be ran through the command line via accelerate launch
or through the notebook_launcher
function from Accelerate. It also assumes that accelerate
has been configured through either running write_basic_config()
or calling accelerate config
through the CLI and answering the prompts.
Typical usage:
with learn.distrib_ctx(): learn.fit(.....)
It attaches a DistributedTrainer
callback and DistributedDL
data loader to the learner, then executes learn.fit(.....)
. Upon exiting the context, it removes the DistributedTrainer
and DistributedDL
, and destroys any locally created distributed process group. The process is still attached to the GPU though.
rank0_first
rank0_first (func, *args, **kwargs)
Execute func
in the Rank-0 process first, then in other ranks in parallel.
rank0_first
calls f()
in rank-0 process first, then in parallel on the rest, in distributed training mode. In single process, non-distributed training mode, f()
is called only once as expected.
One application of rank0_first()
is to make fresh downloads via untar_data
safe in distributed training scripts launched by python -m fastai.launch <script>
:
path = untar_data(URLs.IMDB)
becomes:
path = rank0_first(lambda: untar_data(URLs.IMDB))
Some learner factory methods may use untar_data
to download pretrained models:
learn = text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy)
becomes:
learn = rank0_first(lambda: text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy))
Otherwise, multiple processes will download at the same time and corrupt the data.
Notebook Launcher
Accelerate provides a notebook_launcher functionality to let you keep using your Jupyter Notebook as you would, but train in a distributed setup!
First, make sure accelerate is properly configured. You can either run accelerate config
from the command line, or have an autofilled configuration setup by running in the first cell of your notebook:
from accelerate.utils import write_basic_config
write_basic_config()
After Accelerate is configured, to utilize the notebook_launcher
functionality migrate your training into a function, and pass this to notebook_launcher
, such as:
---
from fastai.vision.all import *
from fastai.distributed import *
def train():
99, True)
set_seed(= untar_data(URLs.PETS)/'images'
path = ImageDataLoaders.from_name_func(
dls =0.2,
path, get_image_files(path), valid_pct=lambda x: x[0].isupper(), item_tfms=Resize(224))
label_func
= vision_learner(dls, resnet34, metrics=error_rate).to_fp16()
learn with learn.distrib_ctx(in_notebook=True):
1)
learn.fine_tune(---
from accelerate import notebook_launcher
=2)
notebook_launcher(train, num_processes---