Потоковая передача больших обучающих и тестовых файлов в DNNClassifier Tensorflow

У меня есть огромный обучающий файл CSV (709M) и большой тестовый файл CSV (125M), который я хочу отправить в DNNClassifier в контексте использования высокоуровневого API Tensorflow.

Похоже, что input_fn параметр принят fit а также evaluate должен хранить все данные функций и меток в памяти, но в настоящее время я хотел бы запустить их на своем локальном компьютере и, таким образом, ожидать, что они исчерпают память довольно быстро, если я прочту эти файлы в память и затем обработаю их.

Я просмотрел документацию по потоковому чтению данных, но пример кода для чтения CSV, похоже, предназначен для низкоуровневого API Tensorflow.

И - если вы простите немного ныть - это кажется слишком сложным для тривиального варианта использования отправки хорошо подготовленных файлов обучающих и тестовых данных в Estimator... хотя, возможно, такой уровень сложности действительно необходим для обучения и тестирования больших объемов данных в Tensorflow?

В любом случае, я был бы очень признателен за пример использования этого подхода с высокоуровневым API, если это вообще возможно, в чем я начинаю сомневаться.

Покопавшись, мне удалось найти DNNClassifier#partial_fit и будет пытаться использовать его для обучения.

Примеры использования этого метода сэкономили бы мне время, хотя, надеюсь, я найду правильное использование в следующие несколько часов.

Тем не менее, похоже, нет соответствующего DNNClassifier#partial_evaluate... хотя я подозреваю, что могу разбить данные тестирования на более мелкие части и запустить DNNClassifier#evaluate последовательно для каждой партии, что может быть отличным способом сделать это, так как я могу сегментировать данные тестирования в когорты и, таким образом, получать точность для каждой когорты.

==== Обновление ====

Укороченная версия:

  1. Рекомендация DomJack должна быть принятым ответом.

  2. Однако 16 ГБ оперативной памяти моего Mac достаточно для того, чтобы в нем можно было хранить все тренировочные данные 709 МБ без сбоев. Поэтому, хотя я буду использовать функцию DataSets при развертывании приложения, я пока не использую ее для локальной разработки.

Более длинная версия:

Я начал с использования partial_fit API, как описано выше, но при каждом использовании выдается предупреждение.

Итак, я пошел посмотреть на источник для метода здесь и обнаружил, что его полная реализация выглядит следующим образом:

logging.warning('The current implementation of partial_fit is not optimized'
                ' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
                batch_size=batch_size, monitors=monitors)

... который напоминает мне об этой сцене из путеводителя автостопом:

Артур Дент: Что произойдет, если я нажму эту кнопку?

Префект Форд: я бы не

Артур Дент: О

Префект Форд: Что случилось?

Артур Дент: загорелась табличка с надписью "Пожалуйста, не нажимайте эту кнопку снова".

Что сказать: partial_fit кажется, существует с единственной целью сказать вам, чтобы не использовать его.

Кроме того, модель, созданная с помощью partial_fit итеративно в обучающих файлах куски были намного меньше, чем сгенерированные с помощью fit на весь учебный файл, который настоятельно рекомендует, чтобы только последний partial_fit Тренировочный кусок фактически "взял".

2 ответа

Проверьте tf.data.Dataset API. Есть несколько способов создать набор данных. Я обрисую три, но вам нужно будет реализовать только один.

Я предполагаю, что каждый ряд вашего csv файлы n_features значения с плавающей запятой, сопровождаемые одним int значение.

Создание tf.data.Dataset

Оберните генератор питона Dataset.from_generator

Самый простой способ - обернуть собственный генератор Python. Я рекомендую вам попробовать это в первую очередь и изменить только в случае серьезных проблем с производительностью.

def read_csv(filename):
    with open(filename, 'r') as f:
        for line in f.readlines():
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield features, label

def get_dataset():
    filename = 'my_train_dataset.csv'
    generator = lambda: read_csv(filename)
    return tf.data.Dataset.from_generator(
        generator, (tf.float32, tf.int32), ((n_features,), ()))

Этот подход очень универсален и позволяет вам протестировать функцию генератора (read_csv) независимо от TensorFlow.

Обернуть основанную на индексе функцию Python

Одним из недостатков вышесказанного является перетасовка результирующего набора данных с помощью случайного буфера размера n требует n примеры для загрузки. Это будет либо создавать периодические паузы в вашем конвейере (большой n) или привести к потенциально плохой перетасовке n).

def get_record(i):
    # load the ith record using standard python, return numpy arrays
    return features, labels

def get_inputs(batch_size, is_training):

    def tf_map_fn(index):
        features, labels = tf.py_func(
            get_record, (index,), (tf.float32, tf.int32), stateful=False)
        features.set_shape((n_features,))
        labels.set_shape(())
        # do data augmentation here
        return features, labels

    epoch_size = get_epoch_size()
    dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
    if is_training:
        dataset = dataset.repeat().shuffle(epoch_size)
    dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
    dataset = dataset.batch(batch_size)
    # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, labels = dataset.make_one_shot_iterator().get_next()
    return features, labels

Короче говоря, мы создаем набор данных только из индексов записей (или любого небольшого идентификатора записи, который мы можем полностью загрузить в память). Затем мы выполняем операции тасования / повторения этого минимального набора данных, затем map индекс к фактическим данным через tf.data.Dataset.map а также tf.py_func, Увидеть Using with Estimators а также Testing in isolation разделы ниже для использования. Обратите внимание, что это требует, чтобы ваши данные были доступны по строкам, поэтому вам может потребоваться преобразовать из csv в какой-то другой формат.

TextLineDataset

Вы также можете прочитать csv файл непосредственно с помощью tf.data.TextLineDataset,

def get_record_defaults():
  zf = tf.zeros(shape=(1,), dtype=tf.float32)
  zi = tf.ones(shape=(1,), dtype=tf.int32)
  return [zf]*n_features + [zi]

def parse_row(tf_string):
    data = tf.decode_csv(
        tf.expand_dims(tf_string, axis=0), get_record_defaults())
    features = data[:-1]
    features = tf.stack(features, axis=-1)
    label = data[-1]
    features = tf.squeeze(features, axis=0)
    label = tf.squeeze(label, axis=0)
    return features, label

def get_dataset():
    dataset = tf.data.TextLineDataset(['data.csv'])
    return dataset.map(parse_row, num_parallel_calls=8)

parse_row функция немного запутанная, так как tf.decode_csv ожидает партии. Вы можете сделать это немного проще, если пакетируете набор данных перед анализом.

def parse_batch(tf_string):
    data = tf.decode_csv(tf_string, get_record_defaults())
    features = data[:-1]
    labels = data[-1]
    features = tf.stack(features, axis=-1)
    return features, labels

def get_batched_dataset(batch_size):
    dataset = tf.data.TextLineDataset(['data.csv'])
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(parse_batch)
    return dataset

TFRecordDataset

В качестве альтернативы вы можете конвертировать csv файлы в файлы TFRecord и использовать TFRecordDataset. Здесь есть подробное руководство.

Шаг 1: конвертировать csv данные к данным TFRecords. Пример кода ниже (см. read_csv от from_generator пример выше).

with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
    for features, labels in read_csv('my_train_dataset.csv'):
        example = tf.train.Example()
        example.features.feature[
            "features"].float_list.value.extend(features)
        example.features.feature[
            "label"].int64_list.value.append(label)
        writer.write(example.SerializeToString())

Это нужно запустить только один раз.

Шаг 2: Напишите набор данных, который декодирует эти файлы записей.

def parse_function(example_proto):
    features = {
        'features': tf.FixedLenFeature((n_features,), tf.float32),
        'label': tf.FixedLenFeature((), tf.int64)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features['features'], parsed_features['label']

def get_dataset():
    dataset = tf.data.TFRecordDataset(['data.tfrecords'])
    dataset = dataset.map(parse_function)
    return dataset

Использование набора данных с оценщиками

def get_inputs(batch_size, shuffle_size):
    dataset = get_dataset()  # one of the above implementations
    dataset = dataset.shuffle(shuffle_size)
    dataset = dataset.repeat()  # repeat indefinitely
    dataset = dataset.batch(batch_size)
            # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, label = dataset.make_one_shot_iterator().get_next()

estimator.train(lambda: get_inputs(32, 1000), max_steps=1e7)

Тестирование набора данных изолированно

Я настоятельно рекомендую вам проверить свой набор данных независимо от вашей оценки. Используя вышеупомянутое get_inputs, это должно быть так просто, как

batch_size = 4
shuffle_size = 100
features, labels = get_inputs(batch_size, shuffle_size)
with tf.Session() as sess:
    f_data, l_data = sess.run([features, labels])
print(f_data, l_data)  # or some better visualization function

Спектакль

Предполагая, что вы используете графический процессор для запуска вашей сети, если каждая строка вашего csv Файл огромен, а сеть крошечная, вы, вероятно, не заметите разницы в производительности. Это потому что Estimator реализация заставляет загрузку / предварительную обработку данных выполняться на CPU, и prefetch означает, что следующая партия может быть подготовлена ​​на CPU, так как текущая партия обучается на GPU. Единственное исключение из этого - если у вас большой набор случайных чисел в наборе данных с большим объемом данных на запись, что займет некоторое время для загрузки ряда примеров, прежде чем что-либо пропустить через графический процессор.

Я согласен с DomJack об использовании Dataset API, кроме необходимости прочитать весь CSV-файл, а затем преобразовать в TfRecord, Настоящим я предлагаю emply TextLineDataset - подкласс Dataset API для прямой загрузки данных в программу TensorFlow. Интуитивный учебник можно найти здесь.

Приведенный ниже код используется для задачи классификации MNIST для иллюстрации и, надеюсь, ответит на вопрос OP. Файл csv содержит 784 столбца, а количество классов - 10. Классификатор, который я использовал в этом примере, представляет собой нейронную сеть с 1 скрытым слоем с 16 единицами измерения.

Во-первых, загрузите библиотеки и определите некоторые константы:

# load libraries
import tensorflow as tf
import os

# some constants
n_x = 784
n_h = 16
n_y = 10

# path to the folder containing the train and test csv files
# You only need to change PATH, rest is platform independent
PATH = os.getcwd() + '/' 

# create a list of feature names
feature_names = ['pixel' + str(i) for i in range(n_x)]

Во-вторых, мы создаем функцию ввода, считывающую файл с использованием API набора данных, а затем предоставляем результаты в API оценки. Возвращаемое значение должно быть двухэлементным кортежем, организованным следующим образом: первый элемент должен быть диктом, в котором каждый входной объект является ключом, а затем список значений для обучающего пакета, а второй элемент - список меток. для тренировочной партии.

def my_input_fn(file_path, batch_size=32, buffer_size=256,\
                perform_shuffle=False, repeat_count=1):
    '''
    Args:
        - file_path: the path of the input file
        - perform_shuffle: whether the data is shuffled or not
        - repeat_count: The number of times to iterate over the records in the dataset.
                    For example, if we specify 1, then each record is read once.
                    If we specify None, iteration will continue forever.
    Output is two-element tuple organized as follows:
        - The first element must be a dict in which each input feature is a key,
        and then a list of values for the training batch.
        - The second element is a list of labels for the training batch.
    '''
    def decode_csv(line):
        record_defaults = [[0.]]*n_x # n_x features
        record_defaults.insert(0, [0]) # the first element is the label (int)
        parsed_line = tf.decode_csv(records=line,\
                                    record_defaults=record_defaults)
        label = parsed_line[0]  # First element is the label
        del parsed_line[0]  # Delete first element
        features = parsed_line  # Everything but first elements are the features
        d = dict(zip(feature_names, features)), label
        return d

    dataset = (tf.data.TextLineDataset(file_path)  # Read text file
               .skip(1)  # Skip header row
               .map(decode_csv))  # Transform each elem by applying decode_csv fn
    if perform_shuffle:
        # Randomizes input using a window of 256 elements (read into memory)
        dataset = dataset.shuffle(buffer_size=buffer_size)
    dataset = dataset.repeat(repeat_count)  # Repeats dataset this # times
    dataset = dataset.batch(batch_size)  # Batch size to use
    iterator = dataset.make_one_shot_iterator()
    batch_features, batch_labels = iterator.get_next()

    return batch_features, batch_labels

Затем мини-партия может быть вычислена как

next_batch = my_input_fn(file_path=PATH+'train1.csv',\
                         batch_size=batch_size,\
                         perform_shuffle=True) # return 512 random elements

Далее мы определяем особенности столбцов числовых

feature_columns = [tf.feature_column.numeric_column(k) for k in feature_names]

В-третьих, мы создаем оценщик DNNClassifier:

classifier = tf.estimator.DNNClassifier(
    feature_columns=feature_columns,  # The input features to our model
    hidden_units=[n_h],  # One layer
    n_classes=n_y,
    model_dir=None)

Наконец, DNN обучается с использованием тестового CSV-файла, а оценка выполняется на тестовом файле. Пожалуйста, измените repeat_count а также steps чтобы убедиться, что обучение соответствует требуемому количеству эпох в вашем коде.

# train the DNN
classifier.train(
    input_fn=lambda: my_input_fn(file_path=PATH+'train1.csv',\
                                 perform_shuffle=True,\
                                 repeat_count=1),\
                                 steps=None)    

# evaluate using the test csv file
evaluate_result = classifier.evaluate(
    input_fn=lambda: my_input_fn(file_path=PATH+'test1.csv',\
                                 perform_shuffle=False))
print("Evaluation results")
for key in evaluate_result:
    print("   {}, was: {}".format(key, evaluate_result[key]))
Другие вопросы по тегам