Использование jar-файлов с PySpark
Я пытаюсь использовать Scala-версию распространяемого XGBoost на spark в python. Для этого я поместил файлы jar - xgboost4j-spark-0.80.jar и xgboost4j-0.80.jar в spark/jars
,
Это реализация, которую я использую для доступа к классам XGBoost
class JavaParamsOverrides(object):
"""
Mixin for overriding methods derived from JavaParams.
"""
# Define a fix similar to SPARK-10931 (For Spark <2.3)
def _create_params_from_java(self):
"""
Create params that are defined in the Java obj but not here
"""
java_params = list(self._java_obj.params())
from pyspark.ml.param import Param
for java_param in java_params:
java_param_name = java_param.name()
if not hasattr(self, java_param_name):
param = Param(self, java_param_name, java_param.doc())
setattr(param, "created_from_java_param", True)
setattr(self, java_param_name, param)
self._params = None # need to reset so self.params will discover new params
# Backport SPARK-10931 (For Spark <2.3)
def _transfer_params_from_java(self):
"""
Transforms the embedded params from the companion Java object.
"""
sc = SparkContext._active_spark_context
for param in self.params:
if self._java_obj.hasParam(param.name):
java_param = self._java_obj.getParam(param.name)
# SPARK-14931: Only check set params back to avoid default params mismatch.
if self._java_obj.isSet(java_param):
value = _java2py(sc, self._java_obj.getOrDefault(java_param))
self._set(**{param.name: value})
# SPARK-10931: Temporary fix for params that have a default in Java
if self._java_obj.hasDefault(java_param) and not self.isDefined(param):
value = _java2py(sc, self._java_obj.getDefault(java_param)).get()
self._setDefault(**{param.name: value})
# Override the "_from_java" method, so we can read our objects.
@classmethod
def _from_java(cls, java_stage):
"""
Given a Java object, create and return a Python wrapper of it.
"""
# Create a new instance of this stage.
py_stage = cls()
# Load information from java_stage to the instance.
py_stage._java_obj = java_stage
py_stage._create_params_from_java()
py_stage._resetUid(java_stage.uid())
py_stage._transfer_params_from_java()
return py_stage
Класс выше должен переопределить методы из JavaParams
,
from pyspark import SparkContext, keyword_only
from pyspark.ml.common import _java2py
from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol, HasWeightCol, HasCheckpointInterval
from pyspark.ml.util import JavaMLWritable, JavaPredictionModel
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
class XGBoostRegressor(JavaParamsOverrides, JavaEstimator, HasCheckpointInterval, HasFeaturesCol, HasLabelCol,
HasPredictionCol, HasWeightCol, JavaMLWritable):
"""
A PySpark implementation of ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor.
"""
@keyword_only
def __init__(self, alpha=0.0, baseMarginCol="baseMargin", baseScore=0.5, checkpointInterval=-1,
checkpointPath="", colsampleBylevel=1.0, colsampleBytree=1.0, numEarlyStoppingRounds=10, eta=0.3, evalMetric="error",
featuresCol="features", gamma=0.0, growPolicy="depthwise", labelCol="label", regLambda=0.0,
lambdaBias=0.0, maxBin=256, maxDeltaStep=0.0, maxDepth=6, minChildWeight=1.0, normalizeType="tree",
numRound=10, numWorkers=1, objective="binary:logistic", predictionCol="prediction", rateDrop=0.0,
sampleType="uniform", scalePosWeight=1.0, seed=42, silent=1, sketchEps=0.03, skipDrop=0.0,
subsample=1.0, treeMethod="auto", useExternalMemory=False):
super(XGBoostRegressor, self).__init__()
self._java_obj = self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor", self.uid)
self._create_params_from_java()
self._setDefault(
# Column Params
featuresCol="features", labelCol="label", predictionCol="prediction",
# Booster Params
objective="binary:logistic", evalMetric="error", numRound=2)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, alpha=0.0, baseScore=0.5, checkpointInterval=-1, checkpointPath="", colsampleBylevel=1.0,
colsampleBytree=1.0, numEarlyStoppingRounds=10, eta=0.3, evalMetric="error", featuresCol="features", gamma=0.0,
growPolicy="depthwise", labelCol="label", regLambda=0.0, lambdaBias=0.0, maxBin=256,
maxDeltaStep=0.0, maxDepth=6, minChildWeight=1.0, normalizeType="tree", numClass=2,
numRound=10, numWorkers=1, objective="regression:error", predictionCol="prediction",
probabilityCol="probability", rateDrop=0.0, rawPredictionCol="rawPrediction", sampleType="uniform",
scalePosWeight=1.0, seed=42, silent=1, sketchEps=0.03, skipDrop=0.0, subsample=1.0, treeMethod="auto",
useExternalMemory=False, weightCol="weightCol"):
kwargs = self._input_kwargs_processed()
return self._set(**kwargs)
Реализация выше была взята из этой темы. Я пытался использовать классы, но я получаю следующую ошибку
>>> xgb = XGBoostRegressor()
py4j.protocol.Py4JError: ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor does not exist in the JVM
Я пробовал инициализировать pyspark
определив флаг jars, но он по-прежнему выдает ту же ошибку
pyspark --jars xgboost4j-0.80.jar,xgboost4j-spark-0.80.jar
Из того, что я могу понять, JVM не загрузила необходимые фляги и следовательно ошибки. Но я также явно использовал --jars
опцию файла и поместили файл jars в spark/jars
Сделки РЕПО.
Я также пытался использовать XGBoost с помощью spark-submit
, Я тоже использовал опцию jars, но получаю ту же ошибку.
Из ветки здесь, в Stackru, я нашел это предложение в ответах
SUBMIT_ARGS = "--jars xgboost4j-0.80.jar xgboost4j-spark-0.80.jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
Но я снова оказался с той же ошибкой.
Почему это происходит?