Проблема с RDD пользовательских объектов в Spark
Я передаю тип в плоскую карту, как это;
val customData: RDD[(Text/String, Custom)] = origRDD.flatMap { case(x) => parse(x)}
это возвращает пару ключ-значение String и Custom (я также использовал Text на месте на String, но безрезультатно, это объяснение 'Text/String'). Этот класс расширяет Serializable и зарегистрирован в Kryo.
Когда я пытаюсь запустить программу, она просто запускается и никогда не заканчивается. Под бесконечностью я подразумеваю, что я оставил это бегущим в течение 18 часов, и это не закончилось. Если я изменю его на Text(hadoop io) с Int (счетчик) вместо пользовательского объекта, он очень быстро завершится. Когда я говорю быстро, я имею в виду 30 минут. Данные, через которые он проходит, - это одни и те же данные, и в обоих случаях используется метод синтаксического анализа (так же как и плоская карта), поэтому он работает по одной и той же логике. Метод, который он использует в плоской карте, является тем же самым методом разбора. Поведение снижается, когда я изменяю его на (Text/String, Custom) с (Text, Int).
Я хотел бы знать, что мне нужно добавить, чтобы сделать эту работу. Это должно быть доступно для записи?
Пример реализации класса пользовательских объектов (очевидно, не точный, но очень хорошо имитирует);
class Custom(dateAsLong: java.lang.Long, typeOfTransaction: util.HashSet[java.lang.Long], isCustomer: Boolean, amount: String, customerMatch: ObjectMatch) extends Serializable {
//has getters and setters here
val startDate = dateAsLong
val transType = typeOfTransaction
val customer = isCustomer
val cost = amount
val matchedData = customerMatch
def getStartDate(): java.lang.Long = startDate
def getTransType(): util.HashSet[java.lang.Long] = transType
def getCustomer(): Boolean = customer
def getCost(): String = amount
def getMatchedData(): ObjectMatch = matchedData
}
Пример метода разбора внутри объекта, расширяющего Java Serializable;
object Paser extends Serializable {
def parse(transaction: Transaction, customerList: util.HashMap[String, String], storeList: util.HashMap[String, String]): List[(Text, Custom)] ={ //list because flatmap emits 0, 1 or 2
//adds items to the list depending on conditions
var list:List[(Text, Custom)] = List()
val typeOfTransaction = getType(transaction)
val dateAsLong = getDate(transaction)
val amount = getAmount(transaction)
val customerMatch = getCustomerMatch(transaction, customerList)
val storeMatch = getStoreMatch(transaction, storeList)
//more fields parsed
if (customerMatch != Some(null)){
isCustomer = true
val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction, isCustomer, amount, customerMatch)
val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
list = list :+ (new Text(transactionId), transaction)
}
if (storeMatch != Some(null)){
isCustomer = false
val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction, isCustomer, typeOfTransaction, storeMatch)
val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
list = list :+ (new Text(transactionId), transaction)
}
}
list
}
Сериализация крио такова;
conf.registerKryoClasses(Array(classOf[Custom]))
Любая помощь приветствуется с примерами кода или ссылками на пример.
Пользовательский интерфейс Spark для (Text/String, Custom)
Нижняя часть задачи 1/11 - это плоская карта, верхняя часть - saveAsNewHadoopAPIFile
Этап 0 плоской карты - saveAsNewHadoopAPIFile -> filter x7 -> flatmap
Выполнить с (Текст, Int)
Медленный запуск (Text/String, Custom) говорит о 1.1h, однако я позволил ему работать 18 часов. Когда он работает в течение 18 часов, он медленно продвигается, однако не идеально, чтобы он работал в течение дня. Что-то не так, очень неправильно. Опять же, метод parse используется в обоих случаях, поэтому он работает по точно такой же логике, даже несмотря на то, что при более быстром запуске не выводится пользовательское значение, вместо этого он выводит ключи Text и int.
Не уверен, что это полезно, но что-то не так заставляет сканирование в Accumulo также выглядеть по-другому. При выполнении Text, Int запускается нормальное увеличение количества сканирований, при этом количество сканирований остается примерно таким же в течение 30 минут, а затем падает. Когда я бегу с кастомом, он увеличивается, а затем сразу падает. подобным образом, затем тянется с меньшей частотой сканирования в течение нескольких часов.
Версия Spark: 1.6.2, Scala 2.11
1 ответ
Вы не должны использовать var list:List[(Text, Custom)] = List()
, Каждое выполнение кода list = list :+ (new Text(transactionId), transaction)
создает новый список (это не просто добавление к существующему списку). List
в Скале неизменна. Вы должны заменить его на val myListBuffer = ListBuffer[(Text, Custom)]()
, Я не уверен, что это единственная проблема - но это изменение должно помочь, если ваш список огромен.
Кроме того, пара комментариев по написанию кода в Scala - нет необходимости иметь геттеры и сеттеры в классе Scala. Все члены являются неизменными в любом случае. Вы должны серьезно подумать, прежде чем использовать var
в Скале. Неизменность сделает ваш код устойчивым, улучшит читабельность и облегчит изменение.