Как я могу вернуть два DStreams в функцию после использования преобразования фильтра в потоковой передаче искры?

В функции есть ли способ вернуть два DStreams после использования filter? Например, когда я фильтрую DStreamотфильтрованные будут сохранены в DStream а нефильтрованные будут храниться в другом DStream,

1 ответ

Это можно было бы сделать более эффективно, если бы оно было встроенным, но

def partition[A](stream: DStream[A])(pred: A => Boolean) {
  val stream1 = stream.map(x => (x, pred(x)).cache()
  val good = stream1.filter(_._2).map(_._1)
  val bad = stream1.filter(!_._2).map(_._1)
  (good, bad)
}

Заметка cache() требуется, чтобы убедиться, stream1 рассчитывается только один раз; если pred достаточно просто, и stream уже кешируется, просто (stream.filter(pred), stream.filter(x => !pred(x))) должно быть быстрее.

Другие вопросы по тегам