Как разделить поток Fs2 по ключу, чтобы преобразовать каждый раздел отдельно?

Чего я хочу добиться, например, с помощью данных:

time, part, data
0, a, 3
1, a, 4
2, b, 10
3, b, 20
3, a, 5

и преобразование:

stream.keyBy(_.part).scan(0)((s, d) => s + d)

получить:

0, a, 3
1, a, 7
2, b, 10
3, b, 30
3, a, 12

Я попытался разделить его с помощью groupAdjacentBy, но это становится слишком сложным, потому что мне нужно сохранять сложное состояние между каждым чанком с ключом. Интересно, есть ли что-то похожее на Flink DataStream. KeyBy? Или более простой способ реализовать это?

3 ответа

ОК, я нашел интересное решение (не может быть flatten, хоть)

Проблема, как было сказано, может быть решена путем "разбиения" в самой операции сканирования:

import cats.implicits._
import cats.effect.IO
import fs2._

case class Element(time: Long, part: Symbol, value: Int)

val elements = Stream(
  Element(0, 'a, 3),
  Element(1, 'a, 4),
  Element(2, 'b, 10),
  Element(3, 'b, 20),
  Element(3, 'a, 5)
)

val runningSumsByPart = elements
  .scan(Map.empty[Symbol, Int] -> none[Element]) {
    case ((sums, _), el@Element(_, part, value)) =>
      val sum = sums.getOrElse(part, 0) + value
      (sums + (part -> sum), el.copy(value = sum).some)
  }
  .collect { case (_, Some(el)) => el }

runningSumsByPart.covary[IO].evalTap(el => IO { println(el) }).compile.drain.unsafeRunSync()

Выходы:

Элемент (0,'а,3)

Элемент (1,'а,7)

Элемент (2,'б,10)

Элемент (3,'б,30)

Элемент (3,'а,12)

Я сделал что-то вроде этого. Сначала разделить, потом объединить. Я еще не знаю, как вернуть 2 потока. Я просто знаю, как обрабатывать их в одном месте, а затем объединять вместе.

          val notEqualS = in
      .filter(_.isInstanceOf[NotEqual])
      .map(_.asInstanceOf[NotEqual])
      ...

    val invalidS = in
      .filter(_.isInstanceOf[Invalid])
      .map(_.asInstanceOf[Invalid])
      ...

    notEqualS.merge(invalidS)
Другие вопросы по тегам