Разрешить SparkException: задача не сериализуется при импорте модели PMML

Я хочу импортировать модель PMML, чтобы вычислить оценку, используя Spark. Все отлично работает, когда я не использую искры, но я не могу использовать свой метод в картографе.

Проблема в том, что мне нужен объект Evaluation из org.jpmml.evaluator.Evaluator, который, по-видимому, не поддерживает сериализацию. Поэтому я попытался сделать это в Serialiazable со следующим классом:

package util;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import org.jpmml.evaluator.Evaluator;

public class SerializableEvaluator implements Serializable {

    private static final long serialVersionUID = 6631604036553063657L;
    private Evaluator evaluator;

    public SerializableEvaluator(Evaluator evaluator) {
        this.evaluator = evaluator;
    }

    public Evaluator getEvaluator() {
        return evaluator;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(evaluator);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        Evaluator eval = (Evaluator) in.readObject();
    }
}

Я также сделал все мои классы сериализуемыми.

Вот пример моего кода:

        logger.info("Print 5 first rows----------------------------");
        strTitanicRDD
                .take(5)
                .forEach(row -> logger.info(row));
        logger.info("Print 5 first Titatnic Obs---------------------");
        strTitanicRDD
                .map(row -> new TitanicObservation(row))
                .take(5)
                .forEach(titanic -> logger.info(titanic.toString()));
        logger.info("Print 5 first Scored Titatnic Obs---------------");

        try{strTitanicRDD
            .map(row -> new TitanicObservation(row))
            .map(
                new Function<TitanicObservation,String>(){

                    private static final long serialVersionUID = -2968122030659306400L;

                    @Override
                    public String call(TitanicObservation titanic) throws Exception {
                        String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
                        return res;
                    }

                })
        .take(5)
        .forEach(row -> logger.info(row));

Но я не думаю, что мой код поможет вам решить мою проблему, что очень ясно (см. Логи:)

org.apache.spark.SparkException: задача не сериализуется в org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) в org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala 15:) в org.apache.spark.SparkContext.clean(SparkContext.scala:1623) в org.apache.spark.rdd.RDD.map(RDD.scala:286) в org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89) в org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) в Score.acv.AppWithSpark.main(AppWithSpark.java:117) в sun.reflect.NativeMethodAccessorImpl.invoke0(нативный метод) в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegavaMetho.rej.rej.rej.ref.jj.java:497) в org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577) в org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) в или g.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) в org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) в org.apache.spark.deploy.SparkSubmit. Основной (SparkSubmit.scala)

Вызывается: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl Стек сериализации:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 15 more

1 ответ

Решение

За org.jpmml.evaluator.Evaluator Интерфейс есть экземпляр некоторых org.jpmml.evaluator.ModelEvaluator подкласс. Класс ModelEvaluator и все его подклассы являются сериализуемыми по дизайну. Проблема относится к org.dmg.pmml.PMML экземпляр объекта, который вы предоставили ModelEvaluatorFactory#newModelManager(PMML) метод в начале.

Вкратце, каждый объект модели класса PMML может иметь прикрепленную к нему информацию SAX Locator. Это полезно на этапах разработки и тестирования для обнаружения нарушающего содержимого XML. Однако на стадии производства эту информацию больше не нужно хранить. Вы можете отключить информацию SAX-локатора, либо правильно настроив среду выполнения JAXB, либо просто очистив существующие экземпляры SAX-локатора, вызвав PMMLObject#setLocator(Locatable) с null аргумент. Последняя функциональность формализована org.jpmml.model.visitors.LocatorNullifier Класс посетителей.

Для полного примера, пожалуйста, смотрите org.jpmml.spark.EvaluatorUtil служебный класс (особенно в строках с 73 по 75) официального проекта JPMML-Spark. Почему бы вам не использовать JPMML-Spark?

Другие вопросы по тегам