欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Asynchronous data loader

程序员文章站 2023-03-28 13:09:34
In the pytorch, when you build your own dataset and use a pytorch data loader to load dataset. The data loading procedure and networking training procedure and sequentially executed. The execution is shown in Fig. 1.Fig. 1. Sequential executionSo for on...

In the pytorch, when you build your own dataset and use a pytorch data loader to load dataset. The data loading procedure and networking training procedure and sequentially executed. The execution is shown in Fig. 1.

Asynchronous data loader
Fig. 1. Sequential execution

So for one batch training, we need Time T = T1+T2. Sometimes T1 and T2 are both time consuming, e.g. Sometimes you need to pre-process the input data which needs time. Without the increasing of computation resources, can we reduce the Time T? The answer is Yes. Since we are sequentially executed the loading part and training part here, the Time T = T1+T2. If we can parallelly execute the two procedures, the total time T will become roughly T = max(T1, T2). The question is how to do that in pytorch?

The basic idea is using a queue to store the pre-processed data which will be trained soon so the trainer can only get the data from the queue instead of preprocessing first. Here we create a thread separately for the pre-processing task.

Refer to the code below, deliver the dataloader to the CudaDataLoader class, and the device. I use the default queue size of 2 here. If you try to increase the queue size, it will take more GPU memory. If you use only one size queue, it may impact the performance of parallel execution (Sometimes trainer will wait the Loader to load data)

The load_loop function will endless load the data, preprocess the data and put the data into the queue. The load_instance function will load the data to your specified device. The __iter__ and __next__ are also implemented to make the data loader iteratable.

class CudaDataLoader:
    def __init__(self, loader, device, queue_size=2):
        self.device = device
        self.queue_size = queue_size
        self.loader = loader

        self.load_stream = torch.cuda.Stream(device=device)
        self.queue = Queue(maxsize=self.queue_size)

        self.idx = 0
        self.worker = Thread(target=self.load_loop)
        self.worker.setDaemon(True)
        self.worker.start()

    def load_loop(self):
        """ Append the data into the queue in a while-true loop"""
        # The loop that will load into the queue in the background
        while True:
            for i, sample in enumerate(self.loader):
                self.queue.put(self.load_instance(sample))

    def load_instance(self, sample):
        """ Load the data to specified device """
        if torch.is_tensor(sample):
            with torch.cuda.stream(self.load_stream):
                return sample.to(self.device, non_blocking=True)
        elif sample is None or type(sample) in (list, str):
            return sample
        elif isinstance(sample, dict):
            return {k: self.load_instance(v) for k, v in sample.items()}
        else:
            return [self.load_instance(s) for s in sample]

    def __iter__(self):
        self.idx = 0
        return self

    def __next__(self):
        if not self.worker.is_alive() and self.queue.empty():
            self.idx = 0
            self.queue.join()
            self.worker.join()
            raise StopIteration
        elif self.idx >= len(self.loader):
            self.idx = 0
            raise StopIteration
        else:
            out = self.queue.get()
            self.queue.task_done()
            self.idx += 1
        return out

    def next(self):
        return self.__next__()

    def __len__(self):
        return len(self.loader)

    @property
    def sampler(self):
        return self.loader.sampler

    @property
    def dataset(self):
        return self.loader.dataset
    """ 一直repeat的sampler """

When you implement the function in your code. Just deliver your data loader to this CudaDataLoader class Like the code below.

from myDataLoader import CudaDataLoader
from config import get_config

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

config = get_config()

tr_data_loader = DataLoader(
    tr_dt, config.batch_size , shuffle = True,
    num_workers = 0, collate_fn = lambda x: tuple(zip(*x))
)

my_data_loader = CudaDataLoader(loader=tr_data_loader, device=device)

for epoch in config.epochs:
    for images, labels in my_datat_loader:
        None

In Razen 5 3600 CPU and GTX 1060 GPU platform, we test the performance on Mask RCNN data loading and training. The performance is shown in Table. 1.

Total time per sample (Sequential) Data loading time per sample (Sequential) Total time per sample (Asynchronous) Data loading time per sample (Asynchronous)
Batch 1 1.3863 0.6390 0.8858 0.0002
Batch 2 1.1073 0.4363 0.7982 0.0001
Batch 3 1.1226 0.4501 0.7707 0.0000
Batch 4 1.2118 0.5326 0.7777 0.0000

本文地址:https://blog.csdn.net/ywqqqqqq/article/details/109960299