Пользовательский генератор данных Tensorflow2.x с многопроцессорностью
Я только что обновился до tenorflow 2.3. Я хочу сделать собственный генератор данных для обучения. С tenorflow 1.x я сделал следующее:
def get_data_generator(test_flag):
item_list = load_item_list(test_flag)
print('data loaded')
while True:
X = []
Y = []
for _ in range(BATCH_SIZE):
x, y = get_random_augmented_sample(item_list)
X.append(x)
Y.append(y)
yield np.asarray(X), np.asarray(Y)
data_generator_train = get_data_generator(False)
data_generator_test = get_data_generator(True)
model.fit_generator(data_generator_train, validation_data=data_generator_test,
epochs=10000, verbose=2,
use_multiprocessing=True,
workers=8,
validation_steps=100,
steps_per_epoch=500,
)
Этот код отлично работал с tensorflow 1.x. В системе создано 8 процессов. Процессор и видеокарта загрузились отлично. "данные загружены" было напечатано 8 раз.
С tenorflow 2.3 я получил предупреждение:
ВНИМАНИЕ: тензорный поток: многопроцессорность может плохо взаимодействовать с TensorFlow, вызывая недетерминированные взаимоблокировки. Для высокопроизводительных конвейеров данных рекомендуется tf.data.
"данные загружены" было напечатано один раз (должно 8 раз). GPU загружен не полностью. У него также есть утечка памяти каждую эпоху, поэтому обучение будет остановлено через несколько эпох. Флаг use_multiprocessing не помог.
Как сделать генератор / итератор в tenorflow(keras) 2.x, который можно легко распараллелить между несколькими процессами ЦП? Тупики и порядок данных не важны.
1 ответ
С
tf.data
конвейера, есть несколько мест, где можно распараллелить. В зависимости от того, как ваши данные хранятся и читаются, вы можете распараллелить чтение. Вы также можете распараллелить расширение и предварительно получить данные во время обучения, чтобы ваш графический процессор (или другое оборудование) никогда не нуждался в данных.
В приведенном ниже коде я продемонстрировал, как вы можете распараллелить расширение и добавить предварительную выборку.
import numpy as np
import tensorflow as tf
x_shape = (32, 32, 3)
y_shape = () # A single item (not array).
classes = 10
def generator_fn(n_samples):
"""Return a function that takes no arguments and returns a generator."""
def generator():
for i in range(n_samples):
# Synthesize an image and a class label.
x = np.random.random_sample(x_shape).astype(np.float32)
y = np.random.randint(0, classes, size=y_shape, dtype=np.int32)
yield x, y
return generator
def augment(x, y):
return x * tf.random.normal(shape=x_shape), y
samples = 10
batch_size = 5
epochs = 2
# Create dataset.
gen = generator_fn(n_samples=samples)
dataset = tf.data.Dataset.from_generator(
generator=gen,
output_types=(np.float32, np.int32),
output_shapes=(x_shape, y_shape)
)
# Parallelize the augmentation.
dataset = dataset.map(
augment,
num_parallel_calls=tf.data.experimental.AUTOTUNE,
# Order does not matter.
deterministic=False
)
dataset = dataset.batch(batch_size, drop_remainder=True)
# Prefetch some batches.
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
# Prepare model.
model = tf.keras.applications.VGG16(weights=None, input_shape=x_shape, classes=classes)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy")
# Train. Do not specify batch size because the dataset takes care of that.
model.fit(dataset, epochs=epochs)