Объедините список ZIO ZStreams в один
Мой старый код поддерживал использование одной очереди SQS с SqsStream. Мне нужно обновить его для поддержки нескольких очередей с учетом списка URL-адресов очереди.
Содержание метода:
for {
sqs <- Sqs.>.async // async client
urls <- Sqs.>.queueUrls // List[String] of multiple queues
_ <- {
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
.mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
.runDrain // ZIO[Env, Throwable, Unit]
.forever // ZIO[Env, Throwable, Nothing]
})
} yield ()
но компилятор жалуется, потому что ожидает (ZIO, ZIO, ZIO), тогда как я дал ему (ZIO, ZIO, List). Я предполагаю, что мне нужно свести все эффекты в этом списке в один эффект, который будет выполнятьсяhandleMessage
параллельно для всех очередей, но я не уверен в синтаксисе, так как у меня нет опыта работы с ZIO.
По сути, к этому моменту
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
мой URL стал ZStream. Думаю мне нужно позвонитьZStream.flatMapPar
используя этот элемент и следующий, и так далее, пока все они не станут плоскими. Как бы я это сделал?
1 ответ
runDrain
вернет ZIO
что вы можете выстрелить и забыть с foreachPar_
.
for {
sqs <- Sqs.>.async
urls <- Sqs.>.queueUrls
// Returns ZIO[R, E, Unit] and executes each effect in parallel while discarding the results
_ <- ZIO.foreachPar_(urls) { url =>
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
// Handles up to 10 messages at a time in parallel.
.mapMParUnordered(10)(handleMessage)
// The stream is already unbounded so no need to have `.forever`
.runDrain
}
} yield ()
Я также поясню, что SqsStream
уже должен быть неограниченным, поэтому вам не нужно использовать forever
, а mapMParUnordered
Параметр относится к максимальному параллелизму, а не к общему количеству обработанных событий.