Как реализовать карту fastutils в UDAF Spark?

Я создаю Spark UDAF, где я храню промежуточные данные в карте fastutils. Схема выглядит так:

def bufferSchema = new StructType().add("my_map_col", MapType(StringType, IntegerType))

Я инициализирую без проблем:

def initialize(buffer: MutableAggregationBuffer) = {
   buffer(0) = new Object2IntOpenHashMap[String]()
}

Проблема возникает, когда я пытаюсь обновить:

def update(buffer: MutableAggregationBuffer, input: Row) = { 
  val myMap = buffer.getAs[Object2IntOpenHashMap[String]](0)
  myMap.put(input.getAs[String](0), 1)
  buffer(0) = myMap
}

Получение следующей ошибки:

Caused by: java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap

В любом случае, я могу заставить эту работу?

1 ответ

Решение

В любом случае, я могу заставить эту работу?

На самом деле, нет. это

buffer.getAs[Object2IntOpenHashMap[String]](0)

эквивалентно

buffer.get(0).asInstanceOf[Object2IntOpenHashMap[String]]]

и внешний тип для MapType является scala.collection.Map,

На практике это тупик в любом случае - UserDefinedAggregate функции делают полную копию данных о каждом вызове. Возможно, вам повезет больше с Aggregator (как в связанном вопросе).

Другие вопросы по тегам