Проблема прогнозирования с использованием Keras и TransformSpec с PySpark - petastorm
Я пытаюсь получить прогнозы из модели Кераса с двумя входными данными: информация о последовательности и обычная ковариата.
С помощью функции TransformSpec я предварительно обрабатываю последовательности, чтобы они имели одинаковую длину и для маскирования значений.
Модель подходит хорошо, но у меня проблемы с получением прогнозов.
import pyspark.sql.functions as F
import pyspark.sql.types as T
import tensorflow as tf
from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm import TransformSpec
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///dbfs/...')
import numpy as np
import pandas as pd
# create data
sequence = [[1, 1, 1, 1, 1], [2, 2, 2, 2], [3, 2, 2], [3, 3, 3], [3, 3, 3, 3, 3], [2, 2, 2, 2, 2]]
y = [0, 1, 1, 2, 2, 1]
x = [0.3, 0.1, 0.3, 0.5, 0.5, 0.1]
df = pd.DataFrame({'y':y, 'x':x, 'sequence':sequence})
sdf = spark.createDataFrame(df)
target='y'
all_features = ['sequence', 'x']
# functions
def preprocess(v, max_length=5):
vv = list(v)
vv = [0] * (max_length - len(vv)) + vv
return np.array(vv)
def format_sequence_data(pd_batch):
pd_batch['sequence'] = pd_batch['sequence'].map(lambda x: preprocess(x))
return pd_batch.loc[:,['sequence', 'x', 'y']]
transform_spec_fn = TransformSpec(
format_sequence_data,
edit_fields=[
('sequence', np.float32, (5,), False),
('x', np.float32, (), False),
('y', np.int32, (), False)],
selected_fields=['sequence', 'x', 'y'])
# petastorm
df_converter = make_spark_converter(sdf)
# model
def createModel():
seq_vec = tf.keras.Input(shape=(5,), name='sequence')
e = tf.keras.layers.Embedding(input_dim=5, output_dim=5,
input_length=5, mask_zero=True, name='sequence_embedding')(seq_vec)
x = tf.keras.Input(shape=(1,), name='x', dtype='float')
x = tf.keras.layers.Normalization()(x)
ml = tf.keras.layers.LSTM(10, return_sequences=True)(e)
ml = tf.keras.layers.LSTM(5)(ml)
combined = tf.keras.layers.Concatenate()([ml, x])
mlp = tf.keras.layers.Dense(10)(combined)
mlp = tf.keras.layers.Dense(5)(mlp)
mlp = tf.keras.layers.Dense(3, activation='softmax')(mlp)
model = tf.keras.Model([seq_vec, x], mlp)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics='accuracy')
return model
model = createModel()
# training
batch_size=1
def train_and_evaluate():
with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=batch_size) as data:
data = data.map(lambda x: (tuple(getattr(x, col) for col in ['sequence', 'x']), getattr(x, target)))
steps_per_epoch = int(len(df_converter) / batch_size)
history = model.fit(data,
steps_per_epoch=steps_per_epoch,
epochs=10,
shuffle=False,
verbose=2)
return history
history = train_and_evaluate()
Для предсказания я использую:
# udf function for prediction (pyspark)
def model_prediction_prob_udf(model):
def predict(input_batch_iter):
for input_batch in input_batch_iter:
input_batch['sequence'] = input_batch['sequence'].map(lambda x: preprocess(x))
preds = model.predict([input_batch.loc[:,c] for c in all_features], batch_size=1000)
yield pd.Series(preds.tolist())
return_type = T.ArrayType(T.DoubleType())
return F.pandas_udf(return_type, F.PandasUDFType.SCALAR_ITER)(predict)
pred_prob_udf = model_prediction_prob_udf(model)
pred = sdf.withColumn('features', F.struct(all_features))
pred = pred.withColumn('prediction_prob', pred_prob_udf(F.col('features')))
display(pred)
Я получаю сообщение об ошибке:
'ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type numpy.ndarray).'
Если я использую что-то вроде кода ниже, предсказание длится вечно:
with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=1) as data:
data = data.map(lambda x: ((x.sequence, x.x),))
tt = model.predict(data)
Любые идеи или предложения о том, как это исправить? Спасибо!