Проблема прогнозирования с использованием Keras и TransformSpec с PySpark - petastorm

Я пытаюсь получить прогнозы из модели Кераса с двумя входными данными: информация о последовательности и обычная ковариата.

С помощью функции TransformSpec я предварительно обрабатываю последовательности, чтобы они имели одинаковую длину и для маскирования значений.

Модель подходит хорошо, но у меня проблемы с получением прогнозов.

      import pyspark.sql.functions as F
import pyspark.sql.types as T

import tensorflow as tf

from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm import TransformSpec
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///dbfs/...')

import numpy as np
import pandas as pd

# create data
sequence = [[1, 1, 1, 1, 1], [2, 2, 2, 2], [3, 2, 2], [3, 3, 3], [3, 3, 3, 3, 3], [2, 2, 2, 2, 2]]
y = [0, 1, 1, 2, 2, 1]
x = [0.3, 0.1, 0.3, 0.5, 0.5, 0.1]
df = pd.DataFrame({'y':y, 'x':x, 'sequence':sequence})
sdf = spark.createDataFrame(df)
target='y'
all_features = ['sequence', 'x']

# functions
def preprocess(v, max_length=5):
    vv = list(v)
    vv = [0] * (max_length - len(vv)) + vv
    return np.array(vv)

def format_sequence_data(pd_batch):
    pd_batch['sequence'] = pd_batch['sequence'].map(lambda x: preprocess(x))
    return pd_batch.loc[:,['sequence', 'x', 'y']]
    
transform_spec_fn = TransformSpec(
  format_sequence_data, 
  edit_fields=[
        ('sequence', np.float32, (5,), False), 
        ('x', np.float32, (), False),
        ('y', np.int32, (), False)], 
  selected_fields=['sequence', 'x', 'y'])

# petastorm
df_converter = make_spark_converter(sdf)

# model
def createModel():

    seq_vec = tf.keras.Input(shape=(5,), name='sequence')
    e = tf.keras.layers.Embedding(input_dim=5, output_dim=5, 
        input_length=5, mask_zero=True, name='sequence_embedding')(seq_vec)

    x = tf.keras.Input(shape=(1,), name='x', dtype='float')
    x = tf.keras.layers.Normalization()(x)

    ml = tf.keras.layers.LSTM(10, return_sequences=True)(e)
    ml = tf.keras.layers.LSTM(5)(ml)

    combined = tf.keras.layers.Concatenate()([ml, x])

    mlp = tf.keras.layers.Dense(10)(combined)
    mlp = tf.keras.layers.Dense(5)(mlp)
    mlp = tf.keras.layers.Dense(3, activation='softmax')(mlp)

    model = tf.keras.Model([seq_vec, x], mlp)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics='accuracy')
    return model

model = createModel()

# training
batch_size=1
def train_and_evaluate(): 
    with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=batch_size) as data:
        data = data.map(lambda x: (tuple(getattr(x, col) for col in ['sequence', 'x']), getattr(x, target)))
        steps_per_epoch = int(len(df_converter) / batch_size)

        history = model.fit(data, 
            steps_per_epoch=steps_per_epoch,
            epochs=10,
            shuffle=False,
            verbose=2)

    return history

history = train_and_evaluate()

Для предсказания я использую:

      # udf function for prediction (pyspark)
def model_prediction_prob_udf(model):
  def predict(input_batch_iter):
    for input_batch in input_batch_iter:
        input_batch['sequence'] = input_batch['sequence'].map(lambda x: preprocess(x))
        preds = model.predict([input_batch.loc[:,c] for c in all_features], batch_size=1000)
        yield pd.Series(preds.tolist())
  return_type = T.ArrayType(T.DoubleType())
  return F.pandas_udf(return_type, F.PandasUDFType.SCALAR_ITER)(predict) 

pred_prob_udf = model_prediction_prob_udf(model)

pred = sdf.withColumn('features', F.struct(all_features))
pred = pred.withColumn('prediction_prob', pred_prob_udf(F.col('features')))
display(pred)

Я получаю сообщение об ошибке:

'ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type numpy.ndarray).'

Если я использую что-то вроде кода ниже, предсказание длится вечно:

      with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=1) as data:
        data = data.map(lambda x: ((x.sequence, x.x),))
        tt = model.predict(data)

Любые идеи или предложения о том, как это исправить? Спасибо!

0 ответов

Другие вопросы по тегам