FS2: возможно ли завершить очередь изящно?
Предположим, что я хочу преобразовать некоторые устаревшие асинхронные API в потоки FS2. API предоставляет интерфейс с 3 обратными вызовами: следующий элемент, успех, ошибка. Я бы хотел, чтобы Stream испускал все элементы, а затем выполнял их при получении обратного вызова в случае успеха или ошибки.
Руководство для FS2 ( https://functional-streams-for-scala.github.io/fs2/guide.html) предлагает использовать fs2.Queue
для таких ситуаций, и это прекрасно работает для постановки в очередь, но все примеры, которые я видел до сих пор, ожидают, что поток, который queue.dequeue
возвраты никогда не завершатся - в моей ситуации нет очевидного способа обработки обратного вызова "успех / ошибка". Я пытался использовать queue.dequeue.interruptWhen(...here goes the signal...)
, но если обратный вызов "успех / ошибка" поступает до того, как клиент прочитает данные из потока, поток преждевременно завершается - все еще остаются непрочитанные элементы. Я бы хотел, чтобы потребитель закончил читать их, прежде чем завершить поток.
Возможно ли это сделать с помощью FS2? С Akka Streams это тривиально - SourceQueueWithComplete
имеет complete
а также fail
методы.
ОБНОВЛЕНИЕ: мне удалось получить достаточно хороший результат, обернув элементы в Option и рассматривая None как сигнал для прекращения чтения потока, а также используя Promise для распространения ошибок:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
Тем не менее, я упустил более естественный способ делать такие вещи?
2 ответа
Один идиоматический способ сделать это - создать Queue[Option[A]]
вместо Queue[A]
, При постановке в очередь, заверните в Some
и вы можете явно поставить в очередь None
сигнализировать о завершении. На стороне вытеснения сделать q.dequeue.unNoneTerminate
, который дает вам Stream[F, A]
который заканчивается, как только выходит очередь None
Ответ на ваше обновление: Объединить unNoneTerminate
с rethrow
, который занимает Stream[F, Either[Throwable, A]]
и возвращает Stream[F, A]
что ошибки с Stream.raiseError
когда он принимает бросаемый.
Ваш полный стек будет тогда Stream[F, Either[Throwable, Option[A]]]
и ты разворачиваешься в Stream[F,A]
позвонив .rethrow.unNoneTerminate