Вызов внешней библиотеки из Spark mapPartitions: проблема повторного входа?

Я звоню во внешнюю библиотеку Scala ( Smile, никакой связи с Spark) из операции mapPartitions в Apache Spark. Мне нужно применить алгоритм для каждого раздела (и независимо от остальных разделов). Однако он выдает исключение ArrayIndexOutOfBoundsException из метода Smile (всегда в одной и той же строке, но каждый раз с разными данными, похоже на проблему параллелизма), поскольку элемент массива int загадочно изменился (до бессмысленного значения, но не супер большой или супер маленький) после того, как вектор был создан и заполнен.

Если я распечатаю содержимое раздела данных erroneus, а затем запускаю программу Scala (без Spark) для этих данных, выполняющих ту же обработку с Smile, все заканчивается нормально, и я не получаю исключения.

Smile сам по себе многоядерный, но я отключил его, передав параметры 'spark.executor.extraJavaOptions=-Dsmile.threads=1' --driver-java-options='-Dsmile.threads=1' к моей команде искры-подчинения.

У меня такой вопрос: есть ли проблемы с повторным входом при вызове внешней непараллельной библиотеки из операции mapPartitions, когда мои исполнители Spark используют несколько ядер каждое? Или, может быть, только если есть статические переменные или что-то в этом роде, но только если библиотека просто использует локальные переменные?

Я столкнулся с подобной проблемой при использовании WEKA точно для того же самого, но я еще не тестировал его в последовательной программе, как я делал это раньше с Smile.

РЕДАКТИРОВАТЬ: проблемный массив объявлен в Smile (Scala) как transient, Но в любом случае, это внутри mapPartitions, где я его называю, так что я думаю, что ничего не нужно сериализовать?

0 ответов

Другие вопросы по тегам