Как я могу вернуть два DStreams в функцию после использования преобразования фильтра в потоковой передаче искры?
В функции есть ли способ вернуть два DStreams после использования filter
? Например, когда я фильтрую DStream
отфильтрованные будут сохранены в DStream
а нефильтрованные будут храниться в другом DStream
,
1 ответ
Это можно было бы сделать более эффективно, если бы оно было встроенным, но
def partition[A](stream: DStream[A])(pred: A => Boolean) {
val stream1 = stream.map(x => (x, pred(x)).cache()
val good = stream1.filter(_._2).map(_._1)
val bad = stream1.filter(!_._2).map(_._1)
(good, bad)
}
Заметка cache()
требуется, чтобы убедиться, stream1
рассчитывается только один раз; если pred
достаточно просто, и stream
уже кешируется, просто (stream.filter(pred), stream.filter(x => !pred(x)))
должно быть быстрее.