Объедините список ZIO ZStreams в один

Мой старый код поддерживал использование одной очереди SQS с SqsStream. Мне нужно обновить его для поддержки нескольких очередей с учетом списка URL-адресов очереди.

Содержание метода:

for {
  sqs <- Sqs.>.async // async client
  urls <- Sqs.>.queueUrls // List[String] of multiple queues
  _ <- {
    urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          .mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
          .runDrain // ZIO[Env, Throwable, Unit]
          .forever // ZIO[Env, Throwable, Nothing]
      })
} yield ()

но компилятор жалуется, потому что ожидает (ZIO, ZIO, ZIO), тогда как я дал ему (ZIO, ZIO, List). Я предполагаю, что мне нужно свести все эффекты в этом списке в один эффект, который будет выполнятьсяhandleMessage параллельно для всех очередей, но я не уверен в синтаксисе, так как у меня нет опыта работы с ZIO.

По сути, к этому моменту

urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))

мой URL стал ZStream. Думаю мне нужно позвонитьZStream.flatMapParиспользуя этот элемент и следующий, и так далее, пока все они не станут плоскими. Как бы я это сделал?

1 ответ

Решение

runDrain вернет ZIO что вы можете выстрелить и забыть с foreachPar_.

for {
  sqs <- Sqs.>.async
  urls <- Sqs.>.queueUrls
  // Returns ZIO[R, E, Unit] and executes each effect in parallel while discarding the results
  _ <- ZIO.foreachPar_(urls) { url =>
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          // Handles up to 10 messages at a time in parallel.
          .mapMParUnordered(10)(handleMessage)
          // The stream is already unbounded so no need to have `.forever`
          .runDrain
      }
} yield ()

Я также поясню, что SqsStream уже должен быть неограниченным, поэтому вам не нужно использовать forever, а mapMParUnordered Параметр относится к максимальному параллелизму, а не к общему количеству обработанных событий.

Другие вопросы по тегам