Проблема с RDD пользовательских объектов в Spark

Я передаю тип в плоскую карту, как это;

 val customData: RDD[(Text/String, Custom)] = origRDD.flatMap { case(x) => parse(x)}

это возвращает пару ключ-значение String и Custom (я также использовал Text на месте на String, но безрезультатно, это объяснение 'Text/String'). Этот класс расширяет Serializable и зарегистрирован в Kryo.

Когда я пытаюсь запустить программу, она просто запускается и никогда не заканчивается. Под бесконечностью я подразумеваю, что я оставил это бегущим в течение 18 часов, и это не закончилось. Если я изменю его на Text(hadoop io) с Int (счетчик) вместо пользовательского объекта, он очень быстро завершится. Когда я говорю быстро, я имею в виду 30 минут. Данные, через которые он проходит, - это одни и те же данные, и в обоих случаях используется метод синтаксического анализа (так же как и плоская карта), поэтому он работает по одной и той же логике. Метод, который он использует в плоской карте, является тем же самым методом разбора. Поведение снижается, когда я изменяю его на (Text/String, Custom) с (Text, Int).

Я хотел бы знать, что мне нужно добавить, чтобы сделать эту работу. Это должно быть доступно для записи?

Пример реализации класса пользовательских объектов (очевидно, не точный, но очень хорошо имитирует);

class Custom(dateAsLong: java.lang.Long, typeOfTransaction: util.HashSet[java.lang.Long], isCustomer: Boolean, amount: String, customerMatch: ObjectMatch) extends Serializable {
//has getters and setters here 

 val startDate = dateAsLong
 val transType = typeOfTransaction
 val customer = isCustomer
 val cost = amount
 val matchedData = customerMatch

 def getStartDate(): java.lang.Long = startDate
 def getTransType(): util.HashSet[java.lang.Long] = transType
 def getCustomer(): Boolean = customer
 def getCost(): String = amount
 def getMatchedData(): ObjectMatch = matchedData
}

Пример метода разбора внутри объекта, расширяющего Java Serializable;

object Paser extends Serializable { 
    def parse(transaction: Transaction, customerList: util.HashMap[String, String], storeList: util.HashMap[String, String]): List[(Text, Custom)] ={ //list because flatmap emits 0, 1 or 2 
//adds items to the list depending on conditions
    var list:List[(Text, Custom)] = List()
    val typeOfTransaction = getType(transaction)
    val dateAsLong = getDate(transaction)
    val amount = getAmount(transaction)
    val customerMatch = getCustomerMatch(transaction, customerList)
    val storeMatch = getStoreMatch(transaction, storeList)
     //more fields parsed

    if (customerMatch != Some(null)){
       isCustomer = true
       val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction,      isCustomer, amount, customerMatch)
       val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
       list = list :+ (new Text(transactionId), transaction)

    }  
    if (storeMatch != Some(null)){
       isCustomer = false
       val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction,      isCustomer, typeOfTransaction, storeMatch)
       val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
       list = list :+ (new Text(transactionId), transaction)
    }
  }
 list
}

Сериализация крио такова;

 conf.registerKryoClasses(Array(classOf[Custom]))

Любая помощь приветствуется с примерами кода или ссылками на пример.

Пользовательский интерфейс Spark для (Text/String, Custom)

Spark UI Total Job

Нижняя часть задачи 1/11 - это плоская карта, верхняя часть - saveAsNewHadoopAPIFile

Нижняя часть задачи 1/11 - это плоская карта, верхняя часть - saveAsNewHadoopAPIFile

Этап 0 плоской карты - saveAsNewHadoopAPIFile -> filter x7 -> flatmap

Сводные метрики

Задачи

Выполнить с (Текст, Int)

Главная страница вакансий

Завершенные этапы

Сводные метрики

Задачи

Медленный запуск (Text/String, Custom) говорит о 1.1h, однако я позволил ему работать 18 часов. Когда он работает в течение 18 часов, он медленно продвигается, однако не идеально, чтобы он работал в течение дня. Что-то не так, очень неправильно. Опять же, метод parse используется в обоих случаях, поэтому он работает по точно такой же логике, даже несмотря на то, что при более быстром запуске не выводится пользовательское значение, вместо этого он выводит ключи Text и int.

Не уверен, что это полезно, но что-то не так заставляет сканирование в Accumulo также выглядеть по-другому. При выполнении Text, Int запускается нормальное увеличение количества сканирований, при этом количество сканирований остается примерно таким же в течение 30 минут, а затем падает. Когда я бегу с кастомом, он увеличивается, а затем сразу падает. подобным образом, затем тянется с меньшей частотой сканирования в течение нескольких часов.

Версия Spark: 1.6.2, Scala 2.11

1 ответ

Решение

Вы не должны использовать var list:List[(Text, Custom)] = List(), Каждое выполнение кода list = list :+ (new Text(transactionId), transaction) создает новый список (это не просто добавление к существующему списку). List в Скале неизменна. Вы должны заменить его на val myListBuffer = ListBuffer[(Text, Custom)](), Я не уверен, что это единственная проблема - но это изменение должно помочь, если ваш список огромен.

Кроме того, пара комментариев по написанию кода в Scala - нет необходимости иметь геттеры и сеттеры в классе Scala. Все члены являются неизменными в любом случае. Вы должны серьезно подумать, прежде чем использовать var в Скале. Неизменность сделает ваш код устойчивым, улучшит читабельность и облегчит изменение.

Другие вопросы по тегам