FS2: возможно ли завершить очередь изящно?

Предположим, что я хочу преобразовать некоторые устаревшие асинхронные API в потоки FS2. API предоставляет интерфейс с 3 обратными вызовами: следующий элемент, успех, ошибка. Я бы хотел, чтобы Stream испускал все элементы, а затем выполнял их при получении обратного вызова в случае успеха или ошибки.

Руководство для FS2 ( https://functional-streams-for-scala.github.io/fs2/guide.html) предлагает использовать fs2.Queue для таких ситуаций, и это прекрасно работает для постановки в очередь, но все примеры, которые я видел до сих пор, ожидают, что поток, который queue.dequeue возвраты никогда не завершатся - в моей ситуации нет очевидного способа обработки обратного вызова "успех / ошибка". Я пытался использовать queue.dequeue.interruptWhen(...here goes the signal...), но если обратный вызов "успех / ошибка" поступает до того, как клиент прочитает данные из потока, поток преждевременно завершается - все еще остаются непрочитанные элементы. Я бы хотел, чтобы потребитель закончил читать их, прежде чем завершить поток.

Возможно ли это сделать с помощью FS2? С Akka Streams это тривиально - SourceQueueWithComplete имеет complete а также fail методы.

ОБНОВЛЕНИЕ: мне удалось получить достаточно хороший результат, обернув элементы в Option и рассматривая None как сигнал для прекращения чтения потока, а также используя Promise для распространения ошибок:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

Тем не менее, я упустил более естественный способ делать такие вещи?

2 ответа

Решение

Один идиоматический способ сделать это - создать Queue[Option[A]] вместо Queue[A], При постановке в очередь, заверните в Someи вы можете явно поставить в очередь None сигнализировать о завершении. На стороне вытеснения сделать q.dequeue.unNoneTerminate, который дает вам Stream[F, A] который заканчивается, как только выходит очередь None

Ответ на ваше обновление: Объединить unNoneTerminate с rethrow, который занимает Stream[F, Either[Throwable, A]] и возвращает Stream[F, A] что ошибки с Stream.raiseError когда он принимает бросаемый.

Ваш полный стек будет тогда Stream[F, Either[Throwable, Option[A]]] и ты разворачиваешься в Stream[F,A] позвонив .rethrow.unNoneTerminate

Другие вопросы по тегам