Использование jar-файлов с PySpark

Я пытаюсь использовать Scala-версию распространяемого XGBoost на spark в python. Для этого я поместил файлы jar - xgboost4j-spark-0.80.jar и xgboost4j-0.80.jar в spark/jars,

Это реализация, которую я использую для доступа к классам XGBoost

class JavaParamsOverrides(object):
    """
    Mixin for overriding methods derived from JavaParams.
    """
    # Define a fix similar to SPARK-10931 (For Spark <2.3)
    def _create_params_from_java(self):
        """
        Create params that are defined in the Java obj but not here
        """
        java_params = list(self._java_obj.params())
        from pyspark.ml.param import Param
        for java_param in java_params:
            java_param_name = java_param.name()
            if not hasattr(self, java_param_name):
                param = Param(self, java_param_name, java_param.doc())
                setattr(param, "created_from_java_param", True)
                setattr(self, java_param_name, param)
                self._params = None  # need to reset so self.params will discover new params
    # Backport SPARK-10931 (For Spark <2.3)
    def _transfer_params_from_java(self):
        """
        Transforms the embedded params from the companion Java object.
        """
        sc = SparkContext._active_spark_context
        for param in self.params:
            if self._java_obj.hasParam(param.name):
                java_param = self._java_obj.getParam(param.name)
                # SPARK-14931: Only check set params back to avoid default params mismatch.
                if self._java_obj.isSet(java_param):
                    value = _java2py(sc, self._java_obj.getOrDefault(java_param))
                    self._set(**{param.name: value})
                # SPARK-10931: Temporary fix for params that have a default in Java
                if self._java_obj.hasDefault(java_param) and not self.isDefined(param):
                    value = _java2py(sc, self._java_obj.getDefault(java_param)).get()
                    self._setDefault(**{param.name: value})
    # Override the "_from_java" method, so we can read our objects.
    @classmethod
    def _from_java(cls, java_stage):
        """
        Given a Java object, create and return a Python wrapper of it.
        """
        # Create a new instance of this stage.
        py_stage = cls()
        # Load information from java_stage to the instance.
        py_stage._java_obj = java_stage
        py_stage._create_params_from_java()
        py_stage._resetUid(java_stage.uid())
        py_stage._transfer_params_from_java()
        return py_stage

Класс выше должен переопределить методы из JavaParams,

from pyspark import SparkContext, keyword_only
from pyspark.ml.common import _java2py
from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol, HasWeightCol, HasCheckpointInterval
from pyspark.ml.util import JavaMLWritable, JavaPredictionModel
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper

class XGBoostRegressor(JavaParamsOverrides, JavaEstimator, HasCheckpointInterval, HasFeaturesCol, HasLabelCol,
                       HasPredictionCol, HasWeightCol, JavaMLWritable):
    """
    A PySpark implementation of ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor.
    """
    @keyword_only
    def __init__(self, alpha=0.0, baseMarginCol="baseMargin", baseScore=0.5, checkpointInterval=-1,
                 checkpointPath="", colsampleBylevel=1.0, colsampleBytree=1.0, numEarlyStoppingRounds=10, eta=0.3, evalMetric="error",
                 featuresCol="features", gamma=0.0, growPolicy="depthwise", labelCol="label", regLambda=0.0,
                 lambdaBias=0.0, maxBin=256, maxDeltaStep=0.0, maxDepth=6, minChildWeight=1.0, normalizeType="tree",
                 numRound=10, numWorkers=1, objective="binary:logistic", predictionCol="prediction", rateDrop=0.0,
                 sampleType="uniform", scalePosWeight=1.0, seed=42, silent=1, sketchEps=0.03, skipDrop=0.0,
                 subsample=1.0, treeMethod="auto", useExternalMemory=False):
        super(XGBoostRegressor, self).__init__()
        self._java_obj = self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor", self.uid)
        self._create_params_from_java()
        self._setDefault(
            # Column Params
            featuresCol="features", labelCol="label", predictionCol="prediction",
            # Booster Params
            objective="binary:logistic", evalMetric="error", numRound=2)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
    @keyword_only
    def setParams(self, alpha=0.0, baseScore=0.5, checkpointInterval=-1, checkpointPath="", colsampleBylevel=1.0,
                  colsampleBytree=1.0, numEarlyStoppingRounds=10, eta=0.3, evalMetric="error", featuresCol="features", gamma=0.0,
                  growPolicy="depthwise", labelCol="label", regLambda=0.0, lambdaBias=0.0, maxBin=256,
                  maxDeltaStep=0.0, maxDepth=6, minChildWeight=1.0, normalizeType="tree", numClass=2,
                  numRound=10, numWorkers=1, objective="regression:error", predictionCol="prediction",
                  probabilityCol="probability", rateDrop=0.0, rawPredictionCol="rawPrediction", sampleType="uniform",
                  scalePosWeight=1.0, seed=42, silent=1, sketchEps=0.03, skipDrop=0.0, subsample=1.0, treeMethod="auto",
                  useExternalMemory=False, weightCol="weightCol"):
        kwargs = self._input_kwargs_processed()
        return self._set(**kwargs)

Реализация выше была взята из этой темы. Я пытался использовать классы, но я получаю следующую ошибку

>>> xgb = XGBoostRegressor()
py4j.protocol.Py4JError: ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor does not exist in the JVM

Я пробовал инициализировать pyspark определив флаг jars, но он по-прежнему выдает ту же ошибку

pyspark --jars xgboost4j-0.80.jar,xgboost4j-spark-0.80.jar

Из того, что я могу понять, JVM не загрузила необходимые фляги и следовательно ошибки. Но я также явно использовал --jars опцию файла и поместили файл jars в spark/jars Сделки РЕПО.

Я также пытался использовать XGBoost с помощью spark-submit, Я тоже использовал опцию jars, но получаю ту же ошибку.

Из ветки здесь, в Stackru, я нашел это предложение в ответах

SUBMIT_ARGS = "--jars xgboost4j-0.80.jar xgboost4j-spark-0.80.jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

Но я снова оказался с той же ошибкой.

Почему это происходит?

0 ответов

Другие вопросы по тегам