Как распечатать данные, которые поступают в keras model.fit , особенно при использовании набора данных petastorm

Обновлять

Хотя я оценил ответ AloneTogether, мне не понравилось, что я использовал take() и он был отделен от model.fit.

Я поставил еще один ответ здесь, если вы хотите посмотреть на него. Это включает в себя создание подкласса Model. Это не так уж плохо.

Конец обновления

У меня есть простой пример, файл паркета с 8 столбцами с именем feature_#, заполненный от 1 до 100 для каждого столбца.

          feature_1      feature_2     ...      feature_8
    1              1                      1
    2              2                      2
    ...            ...                    ...
    99             99                     99
    100            100                    100

моя модель:

      all_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7","feature_8"]
x_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7"]


inputs = [Input(shape=(1,),name=col) for col in x_cols]
merged = Concatenate(axis=1)(inputs)
x = Dense(50, activation="relu")(merged)
x = Dense(20,activation="relu")(x)
outputs = Dense(101,activation="softmax")(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
opt = tf.keras.optimizers.Adam(learning_rate=.001)

model.compile(loss="sparse_categorical_crossentropy",\
                      optimizer=opt,metrics=['accuracy'])

Я использую petastorm так:

      batch_size = 4

with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                   schema_fields=all_cols) as train_reader:
    with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                       schema_fields=all_cols) as val_reader:
train_ds = make_petastorm_dataset(train_reader) \
                        .unbatch() \
                        .map( 
                        lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 
                                            

        val_ds = make_petastorm_dataset(val_reader) \
                        .unbatch() \
                        .map(
                        lambda x: (tuple(getattr(x, col) for col in x_cols), 
                                    getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 

В этом простом примере я использую те же данные для поезда, что и для проверки. Я хочу подтвердить, что весь набор данных отправляется в model.fit(), поэтому я пишу пользовательский обратный вызов.

      class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = []
    self.train_data = train_data

  def on_train_batch_begin(self, batch, logs=None):
    print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])

# and I pass the dataset to the custom callback:
callbacks.append(MyCustomCallback(train_ds))

не печатает все значения... от 1 до 100. Если я перебираю набор данных (простой цикл for) без model.fit, то я получаю все от 1 до 100, поэтому я думаю, что take() конкурирует с model.fit, просто теория.

Я также пробовал:

      class MyCustomCallback(tf.keras.callbacks.Callback):

  def on_train_batch_begin(self, batch, logs=None):
    print(self.model.layers[0].input) # or .output
    #or
    #print(self.model.layers[0].get_weights())


Но это не дает мне реальных значений, а get_weights() выводит пустые массивы.

это то, что печатает ввод:

      KerasTensor(type_spec=TensorSpec(shape=(None, 1), dtype=tf.float32, name='feature_1'), name='feature_1', description="created by layer 'feature_1'")

Я попытался использовать K.eval() на входе и выходе слоя, и это закончилось проблемой с numpy, которая не исправлена ​​​​ни одной из нетерпеливых настроек.

Я действительно не думаю, что это должно быть так сложно. Я просто хочу изучить набор данных, прежде чем он перейдет к обучению.

Я дурачился с Repeat(), cache() и просто перебирал набор данных перед model.fit, но мне не нравится идея, что это происходит до model.fit и что, если он не кэширован, он перетасовывает его, и т.д...

Но я также хочу иметь возможность произвольно смотреть на модель любого значения, любого веса и в любое время. Я не чувствую, что могу получить доступ к этим вещам, но чувствую, что должен иметь возможность.

Любая помощь приветствуется.

о, и использование tensorflow 2.6.2 atm с tf.keras

2 ответа

Я думаю, все зависит от вашего размера, потому что take(1)принимает одну партию, и если batch_size< 100, вы не увидите все значения. Если, например, у вас есть batch_size=100, то вы обязательно увидите значения от 1 до 100:

      import pandas as pd
import tensorflow as tf
import numpy as np
from petastorm.tf_utils import make_petastorm_dataset
from petastorm.reader import make_batch_reader

df = pd.DataFrame({'feature1':np.arange(1, 101), 
              'feature2':np.arange(1, 101),
              'feature3':np.arange(1, 101),
              'feature4':np.arange(1, 101),
              'feature5':np.arange(1, 101),
              'feature6':np.arange(1, 101),
              'feature7':np.arange(1, 101),
              'feature8':np.arange(1, 101)})
columns = list(df)
df.to_parquet('file.parquet')
x_cols = columns[:-1]
batch_size = 100

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = []
    self.train_data = train_data

  def on_train_batch_begin(self, batch, logs=None):
    tf.print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])


with make_batch_reader('file:///content/file.parquet', num_epochs=1,
                                   schema_fields=columns) as train_reader:
  train_ds = make_petastorm_dataset(train_reader) \
                        .unbatch() \
                        .map( 
                        lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature8"))
                        ) \
                        .shuffle(buffer_size=1000).batch(batch_size)
                        
  inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
  merged = tf.keras.layers.Concatenate(axis=1)(inputs)
  x = tf.keras.layers.Dense(50, activation="relu")(merged)
  x = tf.keras.layers.Dense(20,activation="relu")(x)
  outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
  model = tf.keras.Model(inputs=inputs, outputs=outputs)
  opt = tf.keras.optimizers.Adam(learning_rate=.001)

  model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
  model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])
      Epoch 1/2
array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
        40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
        53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
        66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
        79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
        92,  93,  94,  95,  96,  97,  98,  99, 100])
      1/Unknown - 1s 777ms/step - loss: 19.3339 - accuracy: 0.0100array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
        40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
        53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
        66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
        79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
        92,  93,  94,  95,  96,  97,  98,  99, 100])
1/1 [==============================] - 1s 899ms/step - loss: 19.3339 - accuracy: 0.0100
...

Кроме того, я не уверен, в чем именно преимущества petastormесть, но если вы ищете альтернативу, вы можете попробовать tensorflow-io:

      import pandas as pd
import tensorflow_io as tfio
import tensorflow as tf
import numpy as np

df = pd.DataFrame({'feature1':np.arange(1, 101), 
              'feature2':np.arange(1, 101),
              'feature3':np.arange(1, 101),
              'feature4':np.arange(1, 101),
              'feature5':np.arange(1, 101),
              'feature6':np.arange(1, 101),
              'feature7':np.arange(1, 101),
              'feature8':np.arange(1, 101)})
columns = list(df)
df.to_parquet('file.parquet')
ds = tfio.IODataset.from_parquet('file.parquet', columns = columns)
x_cols = columns[:-1]
batch_size = 100

train_ds = ds.map(lambda x: (tuple(x[col] for col in x_cols),x["feature8"])).shuffle(buffer_size=1000).batch(batch_size)
inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
merged = tf.keras.layers.Concatenate(axis=1)(inputs)
x = tf.keras.layers.Dense(50, activation="relu")(merged)
x = tf.keras.layers.Dense(20,activation="relu")(x)
outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
opt = tf.keras.optimizers.Adam(learning_rate=.001)

model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])

Обновление 1 : вы можете добавить каждую партию в массив в Callbackи в конце каждой эпохи вы можете распечатать значения и сбросить массив для следующей эпохи:

      class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)
    self.train_data = train_data

  def on_batch_end(self, batch, logs=None):
    self.mylist = self.mylist.write(self.mylist.size(), list(self.train_data.take(1).as_numpy_iterator())[0][0][0])
  
  def on_epoch_end(self, epoch, logs=None):
    arr = self.mylist.stack()
    tf.print(arr, summarize=-1)
    self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)

Итак, это мой собственный ответ после проб и ошибок. Надеюсь, это поможет вам, потому что я не мог легко найти ответ.

Первый подкласс модели

      class CustomModel(tf.keras.Model):

    #normally wouldn't have to define __init__ but creating a variable "mylist"

    def __init__(self,inputs,outputs):
        super().__init__(inputs,outputs)
        self.mylist = []   
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data
        self.mylist.append(x[0].numpy())  # <<----- Everything here is standard except this

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

затем убедитесь, что вы используете "run_eagerly=True" в model.compile()

      inputs = [Input(shape=(1,),name=col) for col in x_cols]
merged = Concatenate(axis=1)(inputs)
x = Dense(50, activation="relu")(merged)
x = Dense(20,activation="relu")(x)
outputs = Dense(101,activation="softmax")(x)
model = CustomModel(inputs=inputs, outputs=outputs)  # <<--- use custom model
opt = tf.keras.optimizers.Adam(learning_rate=.001)

#notice the run_eagerly, this must be done for keras(not just tensorflow)
#to process things like python would
model.compile(loss="sparse_categorical_crossentropy",\
                      optimizer=opt,metrics=['accuracy'],run_eagerly=True)

затем, наконец, сделайте кое-что в пользовательском обратном вызове

      class MyCustomCallback(tf.keras.callbacks.Callback):
   
  def on_epoch_end(self, epoch, logs=None):
    #I'm sure this could be written better but I got a listwrapper of 
    #np.ndarrays to be a normal list of arrays
    mylist = [item.tolist() for item in list(self.model.mylist)]
    

    #and then flatten the list to sort them
    # remember to import itertools
    flat_list = list(itertools.chain(*mylist))
    flat_list.sort()

    # if these are equal then we have 1-100 in our input
    print(list(range(1,101))==flat_list)
    # or just print the list out of course
    print(flat_list)

    # and finally remember to reset the model's mylist after the epoch
    self.model.mylist = []
Другие вопросы по тегам