Как читать сообщения с ZHub через ZStream?
Я новичок в ZHub и ZStream и хотел ознакомиться с их API.
К сожалению, я не смог заставить работать этот простой пример:
for
hub <- Hub.bounded[String](4)
stream = ZStream.fromHub(hub)
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- stream.runCollect
_ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
()
Я подозреваю, что эта программа не завершается, потому что я пытаюсь собрать бесконечный поток. Я также попытался распечатать сообщения, используя
stream.tap(...)
или выключить хаб. Ничего не помогло.
Что мне здесь не хватает? Любая помощь приветствуется, спасибо.
1 ответ
@adamgfraser любезно предоставил рабочий пример на GitHub:
import zio._
import zio.stream._
object Example extends App {
def run(args: List[String]): URIO[ZEnv, ExitCode] =
for {
promise <- Promise.make[Nothing, Unit]
hub <- Hub.bounded[String](2)
stream = ZStream.managed(hub.subscribe).flatMap { queue =>
ZStream.fromEffect(promise.succeed(())) *>
ZStream.fromQueue(queue)
}
fiber <- stream.take(2).runCollect.fork
_ <- promise.await
_ <- hub.publish("Hello")
_ <- hub.publish("World")
collected <- fiber.join
_ <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
} yield ExitCode.success
}
Моя ошибка заключалась в публикации значений в хабе до завершения подписки.