Как читать сообщения с ZHub через ZStream?

Я новичок в ZHub и ZStream и хотел ознакомиться с их API.

К сожалению, я не смог заставить работать этот простой пример:

      for
    hub <- Hub.bounded[String](4)
    stream = ZStream.fromHub(hub)
    _ <- hub.publish("Hello")
    _ <- hub.publish("World")
    collected <- stream.runCollect
    _ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
    ()

Я подозреваю, что эта программа не завершается, потому что я пытаюсь собрать бесконечный поток. Я также попытался распечатать сообщения, используя stream.tap(...)или выключить хаб. Ничего не помогло.

Что мне здесь не хватает? Любая помощь приветствуется, спасибо.

1 ответ

@adamgfraser любезно предоставил рабочий пример на GitHub:

      import zio._
import zio.stream._

object Example extends App {

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    for {
      promise <- Promise.make[Nothing, Unit]
      hub     <- Hub.bounded[String](2)
      stream = ZStream.managed(hub.subscribe).flatMap { queue =>
                 ZStream.fromEffect(promise.succeed(())) *>
                   ZStream.fromQueue(queue)
               }
      fiber     <- stream.take(2).runCollect.fork
      _         <- promise.await
      _         <- hub.publish("Hello")
      _         <- hub.publish("World")
      collected <- fiber.join
      _         <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
    } yield ExitCode.success
}

Моя ошибка заключалась в публикации значений в хабе до завершения подписки.

Другие вопросы по тегам