bs = 4
letters = list(string.ascii_lowercase)

DataLoader helpers

fastai includes a replacement for Pytorch's DataLoader which is largely API-compatible, and adds a lot of useful functionality and flexibility. Before we look at the class, there are a couple of helpers we'll need to define.

fa_collate[source]

fa_collate(t)

A replacement for PyTorch default_collate which maintains types and handles Sequences

t = [(1,(2,3)),(1,(2,3))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])

t = [(1,(2,(3,4))),(1,(2,(3,4)))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])

fa_convert[source]

fa_convert(t)

A replacement for PyTorch default_convert which maintains types and handles Sequences

t0 = array([1,2])
t = [t0,(t0,t0)]

test_eq(fa_convert(t), default_convert(t))
test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])

class SkipItemException[source]

SkipItemException() :: Exception

Raised to notify DataLoader to skip an item

class DataLoader[source]

DataLoader(dataset=None, bs=None, num_workers=0, pin_memory=False, timeout=0, batch_size=None, shuffle=False, drop_last=False, indexed=None, n=None, device=None, persistent_workers=False, wif=None, before_iter=None, after_item=None, before_batch=None, after_batch=None, after_iter=None, create_batches=None, create_item=None, create_batch=None, retain=None, get_idxs=None, sample=None, shuffle_fn=None, do_batch=None) :: GetAttr

API compatible with PyTorch DataLoader, with a lot more callbacks and flexibility

Arguments to DataLoader:

  • dataset: dataset from which to load the data. Can be either map-style or iterable-style dataset.
  • bs (int): how many samples per batch to load (if batch_size is provided then batch_size will override bs). If bs=None, then it is assumed that dataset.__getitem__ returns a batch.
  • num_workers (int): how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process.
  • pin_memory (bool): If True, the data loader will copy Tensors into CUDA pinned memory before returning them.
  • timeout (float>0): the timeout value in seconds for collecting a batch from workers.
  • batch_size (int): It is only provided for PyTorch compatibility. Use bs.
  • shuffle (bool): If True, then data is shuffled every time dataloader is fully read/iterated.
  • drop_last (bool): If True, then the last incomplete batch is dropped.
  • indexed (bool): Set to False, if you are using iterable-style dataset. Otherwise it is set to True by default.
  • n (int): Defaults to len(dataset). If you are using iterable-style dataset, you can specify the size of batch using n.
  • device (torch.device): Defaults to default_device() which is CUDA by default. You can specify device as `torch.device('cpu').

Override item and use the default infinite sampler to get a stream of unknown length (stop() when you want to stop the stream).

class RandDL(DataLoader):
    def create_item(self, s):
        r = random.random()
        return r if r<0.95 else stop()

L(RandDL())
(#3) [0.24909400651331803,0.07084655323087252,0.8581901930117967]
L(RandDL(bs=4, drop_last=True)).map(len)
(#7) [4,4,4,4,4,4,4]
dl = RandDL(bs=4, num_workers=4, drop_last=True)
L(dl).map(len)
(#19) [4,4,4,4,4,4,4,4,4,4...]
test_eq(dl.fake_l.num_workers, 4)
with dl.fake_l.no_multiproc(): 
    test_eq(dl.fake_l.num_workers, 0)
    L(dl).map(len)
test_eq(dl.fake_l.num_workers, 4)
def _rand_item(s):
    r = random.random()
    return r if r<0.95 else stop()

L(DataLoader(create_item=_rand_item))
(#17) [0.019170518381322443,0.21415073448249933,0.8631370535605363,0.04114785661614451,0.18284294588892835,0.3199549378089168,0.659532020456991,0.47433462825872874,0.10437650093263617,0.49773892711181855...]

If you don't set bs, then dataset is assumed to provide an iterator or a __getitem__ that returns a batch.

ds1 = DataLoader(letters)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

test_shuffled(L(DataLoader(letters, shuffle=True)), letters)

ds1 = DataLoader(letters, indexed=False)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)

t2 = L(tensor([0,1,2]),tensor([3,4,5]))
ds2 = DataLoader(t2)
test_eq_type(L(ds2), t2)

t3 = L(array([0,1,2]),array([3,4,5]))
ds3 = DataLoader(t3)
test_eq_type(L(ds3), t3.map(tensor))

ds4 = DataLoader(t3, create_batch=noop, after_iter=lambda: setattr(t3, 'f', 1))
test_eq_type(L(ds4), t3)
test_eq(t3.f, 1)

If you do set bs, then dataset is assumed to provide an iterator or a __getitem__ that returns a single item of a batch.

def twoepochs(d): return ' '.join(''.join(list(o)) for _ in range(2) for o in d)
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')

ds1 = DataLoader(letters,4,num_workers=2)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')

ds1 = DataLoader(range(12), bs=4, num_workers=3)
test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))

ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))
test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq(t3.f, 2)

it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))
test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])
class SleepyDL(list):
    def __getitem__(self,i):
        time.sleep(random.random()/50)
        return super().__getitem__(i)

t = SleepyDL(letters)

%time test_eq(DataLoader(t, num_workers=0), letters)
%time test_eq(DataLoader(t, num_workers=2), letters)
%time test_eq(DataLoader(t, num_workers=4), letters)

dl = DataLoader(t, shuffle=True, num_workers=1)
test_shuffled(L(dl), letters)
test_shuffled(L(dl), L(dl))
CPU times: user 4.34 ms, sys: 1.11 ms, total: 5.45 ms
Wall time: 273 ms
CPU times: user 11.4 ms, sys: 17 ms, total: 28.4 ms
Wall time: 200 ms
CPU times: user 14.4 ms, sys: 26 ms, total: 40.4 ms
Wall time: 98.8 ms
class SleepyQueue():
    "Simulate a queue with varying latency"
    def __init__(self, q): self.q=q
    def __iter__(self):
        while True:
            time.sleep(random.random()/100)
            try: yield self.q.get_nowait()
            except queues.Empty: return

q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)

%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30))
CPU times: user 21.4 ms, sys: 17.8 ms, total: 39.2 ms
Wall time: 116 ms
class A(TensorBase): pass

for nw in (0,2):
    t = A(tensor([1,2]))
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = first(dl)
    test_eq(type(b), A)

    t = (A(tensor([1,2])),)
    dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
    b = first(dl)
    test_eq(type(b[0]), A)
list(DataLoader(list(range(50)),bs=32,shuffle=True,num_workers=3))
[tensor([ 3, 28, 30, 12, 16, 40, 10,  6, 22, 46, 17, 18,  2, 41, 14,  8, 42, 13,
         20, 24,  5, 32, 44, 15, 11, 21, 38, 47, 27,  7, 33, 49]),
 tensor([48, 36, 35, 29, 37, 45,  1, 23,  0,  9, 26, 43, 25, 34, 31,  4, 19, 39])]
class A(TensorBase): pass
t = A(tensor(1,2))

tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
b = first(tdl)
test_eq(type(b), A)

# Unknown attributes are delegated to `dataset`
test_eq(tdl.pop(), tensor(1,2))

Override get_idxs to return the same index until consumption of the DL. This is intented to test consistent sampling behavior when num_workers>1.

class AdamantDL(DataLoader):
    def get_idxs(self):
        r=random.randint(0,self.n-1)
        return [r] * self.n

test_eq(torch.cat(tuple(AdamantDL((list(range(50))),bs=16,num_workers=4))).unique().numel(),1)