Пользовательский генератор данных Tensorflow2.x с многопроцессорностью

Я только что обновился до tenorflow 2.3. Я хочу сделать собственный генератор данных для обучения. С tenorflow 1.x я сделал следующее:

def get_data_generator(test_flag):
  item_list = load_item_list(test_flag)
  print('data loaded')
  while True:
    X = []
    Y = []
    for _ in range(BATCH_SIZE):
      x, y = get_random_augmented_sample(item_list)
      X.append(x)
      Y.append(y)
    yield np.asarray(X), np.asarray(Y)

data_generator_train = get_data_generator(False)
data_generator_test = get_data_generator(True)
model.fit_generator(data_generator_train, validation_data=data_generator_test, 
                    epochs=10000, verbose=2,
                    use_multiprocessing=True,
                    workers=8,
                    validation_steps=100,
                    steps_per_epoch=500,
                    )

Этот код отлично работал с tensorflow 1.x. В системе создано 8 процессов. Процессор и видеокарта загрузились отлично. "данные загружены" было напечатано 8 раз.

С tenorflow 2.3 я получил предупреждение:

ВНИМАНИЕ: тензорный поток: многопроцессорность может плохо взаимодействовать с TensorFlow, вызывая недетерминированные взаимоблокировки. Для высокопроизводительных конвейеров данных рекомендуется tf.data.

"данные загружены" было напечатано один раз (должно 8 раз). GPU загружен не полностью. У него также есть утечка памяти каждую эпоху, поэтому обучение будет остановлено через несколько эпох. Флаг use_multiprocessing не помог.

Как сделать генератор / итератор в tenorflow(keras) 2.x, который можно легко распараллелить между несколькими процессами ЦП? Тупики и порядок данных не важны.

1 ответ

С tf.dataконвейера, есть несколько мест, где можно распараллелить. В зависимости от того, как ваши данные хранятся и читаются, вы можете распараллелить чтение. Вы также можете распараллелить расширение и предварительно получить данные во время обучения, чтобы ваш графический процессор (или другое оборудование) никогда не нуждался в данных.

В приведенном ниже коде я продемонстрировал, как вы можете распараллелить расширение и добавить предварительную выборку.

import numpy as np
import tensorflow as tf

x_shape = (32, 32, 3)
y_shape = ()  # A single item (not array).
classes = 10

def generator_fn(n_samples):
    """Return a function that takes no arguments and returns a generator."""
    def generator():
        for i in range(n_samples):
            # Synthesize an image and a class label.
            x = np.random.random_sample(x_shape).astype(np.float32)
            y = np.random.randint(0, classes, size=y_shape, dtype=np.int32)
            yield x, y
    return generator

def augment(x, y):
    return x * tf.random.normal(shape=x_shape), y

samples = 10
batch_size = 5
epochs = 2

# Create dataset.
gen = generator_fn(n_samples=samples)
dataset = tf.data.Dataset.from_generator(
    generator=gen, 
    output_types=(np.float32, np.int32), 
    output_shapes=(x_shape, y_shape)
)
# Parallelize the augmentation.
dataset = dataset.map(
    augment, 
    num_parallel_calls=tf.data.experimental.AUTOTUNE,
    # Order does not matter.
    deterministic=False
)
dataset = dataset.batch(batch_size, drop_remainder=True)
# Prefetch some batches.
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

# Prepare model.
model = tf.keras.applications.VGG16(weights=None, input_shape=x_shape, classes=classes)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy")

# Train. Do not specify batch size because the dataset takes care of that.
model.fit(dataset, epochs=epochs)
Другие вопросы по тегам