Прервать обработку ZStream mapMPar
У меня есть следующий код, который из-за ограничений максимального количества строк Excel ограничен ~ 1 миллионом строк:
ZStream.unwrap(generateStreamData).mapMPar(32) {m =>
streamDataToCsvExcel
}
Все достаточно просто и работает отлично. Я отслеживаю количество переданных строк, а затем прекращаю запись данных. Однако я хочу прервать все дочерние волокна, созданные в mapMPar, примерно так:
ZStream.unwrap(generateStreamData).interruptWhen(effect.true).mapMPar(32) {m =>
streamDataToCsvExcel
}
К сожалению, здесь процесс немедленно прерывается. Я, наверное, упускаю что-то очевидное...
1 ответ
Вот что я могу придумать после вашего разъяснения. Версия ZIO 1.x немного уродливее из-за отсутствия
.dropRight
По сути, мы можем использовать для подсчета размера элементов, которые мы должны остановить, как только мы достигнем максимального размера (а затем использовать .dropRight или дополнительный фильтр, чтобы отбросить последний элемент, который превысит лимит)
Это гарантирует, что оба
- Вы только бежите
streamDataToCsvExcel
до последнего возможного сообщения перед достижением предела размера - Потому что потоки ленивы
expensiveQuery
запускается только для такого количества сообщений, которое вы можете уместить в пределах ограничения (или N+1, если последнее значение отбрасывается, потому что оно превысит лимит)
import zio._
import zio.stream._
object Main extends zio.App {
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val expensiveQuery = ZIO.succeed(Chunk(1, 2))
val generateStreamData = ZIO.succeed(ZStream.repeatEffect(expensiveQuery))
def streamDataToCsvExcel = ZIO.unit
def count(ref: Ref[Int], size: Int): UIO[Boolean] =
ref.updateAndGet(_ + size).map(_ > 10)
for {
counter <- Ref.make(0)
_ <- ZStream
.unwrap(generateStreamData)
.takeUntilM(next => count(counter, next.size)) // Count size of messages and stop when it's reached
.filterM(_ => counter.get.map(_ <= 10)) // Filter last message from `takeUntilM`. Ideally should be .dropRight(1) with ZIO 2
.mapMPar(32)(_ => streamDataToCsvExcel)
.runDrain
} yield ExitCode.success
}
}
Если полагаться на ленивость потоков не работает для вашего варианта использования, вы можете вызвать какое-либо прерывание из
takeUntilM
условие. Например, вы можете обновить функцию подсчета до
def count(ref: Ref[Int], size: Int): UIO[Boolean] =
ref.updateAndGet(_ + size).map(_ > 10)
.tapSome { case true => someFiber.interrupt }