Как разделить поток Fs2 по ключу, чтобы преобразовать каждый раздел отдельно?
Чего я хочу добиться, например, с помощью данных:
time, part, data
0, a, 3
1, a, 4
2, b, 10
3, b, 20
3, a, 5
и преобразование:
stream.keyBy(_.part).scan(0)((s, d) => s + d)
получить:
0, a, 3
1, a, 7
2, b, 10
3, b, 30
3, a, 12
Я попытался разделить его с помощью groupAdjacentBy
, но это становится слишком сложным, потому что мне нужно сохранять сложное состояние между каждым чанком с ключом. Интересно, есть ли что-то похожее на Flink DataStream. KeyBy? Или более простой способ реализовать это?
3 ответа
Проблема, как было сказано, может быть решена путем "разбиения" в самой операции сканирования:
import cats.implicits._
import cats.effect.IO
import fs2._
case class Element(time: Long, part: Symbol, value: Int)
val elements = Stream(
Element(0, 'a, 3),
Element(1, 'a, 4),
Element(2, 'b, 10),
Element(3, 'b, 20),
Element(3, 'a, 5)
)
val runningSumsByPart = elements
.scan(Map.empty[Symbol, Int] -> none[Element]) {
case ((sums, _), el@Element(_, part, value)) =>
val sum = sums.getOrElse(part, 0) + value
(sums + (part -> sum), el.copy(value = sum).some)
}
.collect { case (_, Some(el)) => el }
runningSumsByPart.covary[IO].evalTap(el => IO { println(el) }).compile.drain.unsafeRunSync()
Выходы:
Элемент (0,'а,3)
Элемент (1,'а,7)
Элемент (2,'б,10)
Элемент (3,'б,30)
Элемент (3,'а,12)
Я сделал что-то вроде этого. Сначала разделить, потом объединить. Я еще не знаю, как вернуть 2 потока. Я просто знаю, как обрабатывать их в одном месте, а затем объединять вместе.
val notEqualS = in
.filter(_.isInstanceOf[NotEqual])
.map(_.asInstanceOf[NotEqual])
...
val invalidS = in
.filter(_.isInstanceOf[Invalid])
.map(_.asInstanceOf[Invalid])
...
notEqualS.merge(invalidS)