Проблема преобразования данных Spark при прогнозировании с помощью Horovod KerasEstimator()
Я обучаю модель Keras созданию рекомендательной системы и запускаю ее в Spark с Хороводом иhvd.KerasEstimator()
.
Вот мой оценщик:
keras_estimator = hvd.KerasEstimator(
num_proc=2,
store=store,
model=model,
optimizer=optimizer,
loss='mse',
metrics=[tf.keras.metrics.RootMeanSquaredError()],
feature_cols=['userID','itemID'],
label_cols=['rating'],
batch_size=512,
epochs=5,
verbose=1)
keras_model = keras_estimator.fit(train_df).setOutputCols(['rating_prob'])
Функция прогнозирования просто:
pred_df = keras_model.transform(test_df)
Модель обучается без каких-либо проблем, и я могу получить потери для каждой эпохи, но у меня проблемы с прогнозами!
Функция прогнозирования не выводит никаких ошибок и, похоже, работает, но манипулировать pred_df невозможно.
Я пытался сделать:
pred_df.show()
илиpred_df.toPandas()
но все вызывает ту же ошибку ниже:
«org.apache.spark.api.python.PythonException: 'ValueError: невозможно преобразовать тип данных Spark <класс 'pyspark.sql.types.DecimalType'> в собственный тип Python'»
Я не понимаю, потому что мои train_df и test_df имеют одинаковые типы!
Я пробовал менять типы с помощью:
# reset data types to integer and float for tensorflow
train_df = train_df.withColumn("itemID",col("itemID").cast(IntegerType())) \
.withColumn("userID",col("userID").cast(IntegerType())) \
.withColumn("rating",col("rating").cast(FloatType()))
test_df = test_df.withColumn("itemID",col("itemID").cast(IntegerType())) \
.withColumn("userID",col("userID").cast(IntegerType())) \
.withColumn("rating",col("rating").cast(FloatType()))
Но ошибка все еще здесь..
Ниже приведен пример моих данных:
Можете ли вы помочь мне решить эту проблему, пожалуйста?
заранее спасибо
Я только что попытался изменить типы столбцов, но это ничего не меняет