Как я могу запустить parSequenceUnordered из Monix и обработать результаты каждой задачи?
В настоящее время я работаю над реализацией клиентских http-запросов к API и решил изучить sttp и monix для этой задачи. Поскольку я новичок в Monix, я все еще не уверен, как запускать задачи и получать их результаты. Моя цель - получить последовательность результатов HTTP-запроса, которую я могу вызывать параллельно -> анализировать -> загружать.
Ниже приведен фрагмент того, что я пробовал до сих пор:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val tasks = Seq(r1).map(i => Task(i))
Task.parSequenceUnordered(tasks).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}
Моя путаница довольно проста (я предполагаю). Как я могу запуститьTask.parSequenceUnordered
которые я создал, и обрабатывать (анализировать результаты http) Задачи в последовательности?
Приятно иметь: из любопытства, можно ли наивно ввести ограничение скорости / дросселирование при обработке последовательности задач запросов? На самом деле я не ищу создания чего-то сложного. Это может быть так же просто, как разнесение пакетов запросов. Интересно, есть ли у Monix помощник для этого?
1 ответ
Спасибо Oleg Pyzhcov и сообществу monix gitter за то, что помогли мне разобраться в этом.
Цитирую Олега здесь:
Поскольку вы уже используете бэкэнд с поддержкой моникса, тип r1 -
Task[Response[Either[String,String]]]
. Итак, когда вы делаетеSeq(r1).map(i => Task(i))
, вы создаете последовательность задач, которые ничего не делают, кроме как дают вам другие задачи, которые дают вам результат (тип будетSeq[Task[Task[Response[...]]]]
). Затем ваш код распараллеливает внешний уровень, задачи-которые-задачи, и в результате вы получаете задачи, с которых начали. Вам нужно только обработать Seq(r1), чтобы он мог выполнять запросы параллельно.Если вы используете Intellij, вы можете нажать
Alt + =
чтобы увидеть тип выбора - это помогает, если вы не можете определить тип только по коду (но с опытом становится лучше).Что касается ограничения скорости, у нас есть parSequenceN, который позволяет вам установить предел параллелизма. Обратите внимание, что неупорядоченность означает лишь небольшое преимущество в производительности за счет того, что результаты выводятся в случайном порядке, в любом случае они выполняются недетерминированно.
В итоге я получил (упрощенную) реализацию, которая выглядит примерно так:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val items = Seq(r1.map(x => x.body))
Task.parSequenceN(1)(items).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println)
}