InvalidArgumentError при чтении паркетных файлов в Keras через Petastorm
Я пытаюсь прочитать данные с паркета для языковой модели.
Паркет состоит из двух колонн:
- цель (число)
- feature_vec (массив int)
Я адаптирую код из этого поста (который у меня работает). Когда я пробую приведенный ниже код, я получаю InvalidArgumentError при попытке запустить модель.
import random
from pyspark.sql import Row
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Embedding, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import Sequence
from petastorm import make_batch_reader
from petastorm.etl.dataset_metadata import materialize_dataset
import pyarrow.parquet as pq
## build toy dataset
vocab_size = 250
seq_length = 100
parquet_path = '/dbfs/ml/langmod/petastorm/toy_dataset.parquet'
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return Row(target = random.randint(0, vocab_size), feature_vec = [random.randint(0, vocab_size) for i in range(seq_length)])
rows_count = 1000
rows_rdd = sc.parallelize(range(rows_count)).map(row_generator)
df = spark.createDataFrame(rows_rdd)
df.write.parquet(parquet_path, mode = 'overwrite')
underscore_files = [f for f in os.listdir(get_local_path(parquet_path)) if f.startswith("_")]
pq.EXCLUDED_PARQUET_PATHS.update(underscore_files)
## build model and read in data from parquet, converting to tf.Dataset format
with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size))) \
.apply(tf.data.experimental.unbatch()) \
.batch(10, drop_remainder = True)
model = Sequential()
model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)
Ошибка:
InvalidArgumentError Traceback (most recent call last)
<command-2202319388737190> in <module>
10 model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
11 model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
---> 12 model.fit(train_dataset, epochs = 2, steps_per_epoch = 10, verbose = 1)
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
725 max_queue_size=max_queue_size,
726 workers=workers,
--> 727 use_multiprocessing=use_multiprocessing)
728
729 def evaluate(self,
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
673 validation_steps=validation_steps,
674 validation_freq=validation_freq,
--> 675 steps_name='steps_per_epoch')
676
677 def evaluate(self,
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_arrays.py in model_iteration(model, inputs, targets, sample_weights, batch_size, epochs, verbose, callbacks, val_inputs, val_targets, val_sample_weights, shuffle, initial_epoch, steps_per_epoch, validation_steps, validation_freq, mode, validation_in_fit, prepared_feed_values_from_dataset, steps_name, **kwargs)
298 else:
299 actual_inputs = ins()
--> 300 batch_outs = f(actual_inputs)
301 except errors.OutOfRangeError:
302 if is_dataset:
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/keras/backend.py in __call__(self, inputs)
3474
3475 fetched = self._callable_fn(*array_vals,
-> 3476 run_metadata=self.run_metadata)
3477 self._call_fetch_callbacks(fetched[-len(self._fetches):])
3478 output_structure = nest.pack_sequence_as(
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/client/session.py in __call__(self, *args, **kwargs)
1470 ret = tf_session.TF_SessionRunCallable(self._session._session,
1471 self._handle, args,
-> 1472 run_metadata_ptr)
1473 if run_metadata:
1474 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
InvalidArgumentError: 2 root error(s) found.
(0) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
[[{{node lstm1_3/transpose}}]]
[[lstm1_3/TensorArrayUnstack_1/range/_459]]
(1) Invalid argument: transpose expects a vector of size 4. But input(1) is a vector of size 3
[[{{node lstm1_3/transpose}}]]
0 successful operations.
0 derived errors ignored.
Эта ошибка удивительна, поскольку, похоже, говорится о проблеме с формой среднего слоя модели, которая должна точно соответствовать форме выходных данных предыдущего слоя.
Однако, если я конвертирую набор данных в итератор, а затем запускаю выходные данные X и Y по отдельности, он работает так, как ожидалось для этого пакета:
with make_batch_reader('file:/dbfs/' + parquet_path, num_epochs = None) as train_reader:
train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size))) \
.apply(tf.data.experimental.unbatch()) \
.batch(10, drop_remainder = True)
iterator = train_dataset.make_one_shot_iterator()
tensor = iterator.get_next()
with tf.Session() as sess:
features, target = sess.run(tensor)
model = Sequential()
model.add(Embedding(vocab_size, 20, mask_zero = True, input_length = None, name = 'embed'))
model.add(LSTM(50, return_sequences = False, name = 'lstm1'))
model.add(Dense(vocab_size, activation = 'softmax', name = 'dense_layer'))
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['categorical_accuracy'])
model.fit(x = features, y = target, verbose = 1)
10/10 [==============================] - 1s 76ms/sample - loss: 5.5202 - categorical_accuracy: 0.1000
Я предполагаю, что есть некоторая проблема с тем, как столбец целочисленного массива считывается и конвертируется в формат tf.Dataset, но не вижу, что может быть причиной этого. Я предполагаю, что с этой строкой из приведенного выше блока должна быть какая-то проблема:
train_dataset = make_petastorm_dataset(train_reader).map(lambda x: (tf.convert_to_tensor(x.feature_vec), tf.one_hot(x.target, depth = vocab_size))) \
.apply(tf.data.experimental.unbatch()) \
.batch(10, drop_remainder = True)
Я использую среду выполнения Databricks ML 6.2
- тензор потока 1.15.0
- петасторм 0.8.0