Смешанные данные в одном пакете на DataLoader PyTorch с несколькими работниками

Я хочу знать, как использовать torch.utils.data.DataLoader в PyTorch, особенно в случае нескольких рабочих.

Я обнаружил, что один пакетный вывод из DataLoader всегда исходит от одного работника. Я ожидал, что в DataLoader есть очередь, в которой хранятся данные всех рабочих, а DataLoader перемешивает их в очереди для вывода случайных пакетных данных. Я думаю, что это путь в tf.data.Dataset в Tensorflow. Можем ли мы реализовать подобную функцию в PyTorch? Я хочу загрузить набор данных из больших сериализованных файлов (например, Tfrecord) с использованием нескольких работников. В этом случае важно смешивать исходный файл в одном пакете, что означает смешивание исходного источника.

Пожалуйста, обратитесь к следующему коду:

import random
import time

import torch


class MyDataset(torch.utils.data.Dataset):
    def __len__(self):
        return 50

    def __getitem__(self, idx):
        info = torch.utils.data.get_worker_info()

        time.sleep(random.uniform(0, 1))
        print("[{}]:{}".format(info.id, idx))
        return idx, info.id


if __name__ == '__main__':
    dataset = MyDataset()
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=5, shuffle=False, num_workers=2)
    for batch in dataloader:
        print(batch)

Выход:

[0]:0
[1]:5
[0]:1
[1]:6
[0]:2
[0]:3
[1]:7
[0]:4
[tensor([0, 1, 2, 3, 4]), tensor([0, 0, 0, 0, 0])]
[1]:8
[1]:9
[tensor([5, 6, 7, 8, 9]), tensor([1, 1, 1, 1, 1])]
[0]:10
[0]:11
[1]:15
[1]:16
[0]:12
[1]:17
...

Вот, [0, 1, 2, 3, 4] а также [0, 0, 0, 0, 0] в [tensor([0, 1, 2, 3, 4]), tensor([0, 0, 0, 0, 0])] означает, что в этот пакет входят индексы с 0-го по 4-й данные, полученные из идентификатора работника 0, Обратите внимание, что shuffle=True не решает эту проблему, которая только меняет индексы данных.

В этом случае я хочу получить партию как: [tensor([0, 5, 1, 6, 2]), tensor([0, 1, 0, 1, 0])],

1 ответ

Я реализовал кое-что простое для решения аналогичной проблемы, когда у меня есть большие видеофайлы в качестве обучающих данных, и каждый рабочий отвечает за загрузку и предварительную обработку одного файла, а затем получение из него образцов. Проблема в том, что, как описывает OP, с механизмом загрузки данных Pytorch по умолчанию каждый пакет содержит образцы только из одного видеофайла.

Во-первых, давайте рассмотрим проблему. В этом упрощенном примере кода каждый рабочий возвращает один тензор, содержащий его идентификатор рабочего с нулевым индексом. При размере партии 32 и 4 рабочих процесса мы хотим, чтобы каждая партия содержала 8 нулей, 8 единиц, 8 двоек и 8 троек.

from collections import defaultdict

import torch as T
import torch.utils.data as tdata


class Dataset(tdata.IterableDataset):
    def __init__(self, batch_size: int):
        self._bs = batch_size

    def __iter__(self):
        worker_info = tdata.get_worker_info()
        if not worker_info:
            raise NotImplementedError('Not implemented for num_workers=0')
        for _ in range(self._bs):
            yield T.tensor([worker_info.id])


batch_size = 32
num_workers = 4
dataset = Dataset(batch_size)
loader = tdata.DataLoader(dataset,
                          batch_size=batch_size,
                          num_workers=num_workers)


for batch in loader:
    counts = defaultdict(int)
    for n in batch.numpy().flatten():
        counts[n] += 1
    print(dict(counts))

Вместо этого код печатает:

{0: 32}
{1: 32}
{2: 32}
{3: 32}

Это означает, что первая партия содержит образцы только от рабочего 0, вторая - только от рабочего 1 и т. Д. Чтобы исправить это, мы установим размер партии в DataLoader к batch_size // num_workers и используйте простую оболочку над DataLoader объединить образцы от каждого воркера для нашей партии:

def pooled_batches(loader):
    loader_it = iter(loader)
    while True:
        samples = []
        for _ in range(loader.num_workers):
            try:
                samples.append(next(loader_it))
            except StopIteration:
                pass
        if len(samples) == 0:
            break
        else:
            yield T.cat(samples, dim=0)


batch_size = 32
num_workers = 4
dataset = Dataset(batch_size)
per_worker = batch_size // num_workers
loader = tdata.DataLoader(dataset,
                          batch_size=per_worker,
                          num_workers=num_workers)

for batch in pooled_batches(loader):
    counts = defaultdict(int)
    for n in batch.numpy().flatten():
        counts[n] += 1
    print(dict(counts))

И код теперь печатает

{0: 8, 1: 8, 2: 8, 3: 8}
{0: 8, 1: 8, 2: 8, 3: 8}
{0: 8, 1: 8, 2: 8, 3: 8}
{0: 8, 1: 8, 2: 8, 3: 8}

как и ожидалось.

Обратите внимание, что мульти-работник DataLoader с указанным размером batch_size будет загружать несколько пакетов параллельно, поэтому, по сути, один пакет всегда приходит от рабочего. Тем не менее, я достиг чего-то близкого к тому, что вам нужно, сделав следующее:

  1. Сделайте размер партии равным 1, чтобы каждый работник получал только один образец за раз

  2. Напишите фоновый процесс, который перебирает DataLoader, выбирает 1 образец за раз и вставляет его в очередь. Это позволяет размещать образцы в другом порядке в очереди, а не иметь рабочие партии

  3. Иметь механизм дозирования, например collate_fn который берет количество выборок, равное размеру вашей партии, из очереди и передает его в модель

Если вы хотите быть более конкретным в создании партии, например, выбирая конкретные образцы от конкретных работников, у вас может быть несколько очередей. Ваш порядок сортировки должен быть изменен для учета нескольких очередей и выбора из них. Но я сомневаюсь, нужна ли такая специфика.

Другие вопросы по тегам