Как я могу отправлять HTTP-запросы асинхронно при обработке ограничений скорости?
Отказ от ответственности: я новичок в sttp и Monix, и это моя попытка узнать больше об этих библиотеках. Моя цель - получить данные (на стороне клиента) из заданного API через запросы HTTP GET -> проанализировать ответы JSON -> записать эту информацию в базу данных. Мой вопрос касается только первой части. Моя цель - запускать запросы на получение асинхронным (надеюсь, быстрым) способом, имея возможность либо избежать, либо обработать ограничения скорости.
Ниже приведен фрагмент того, что я уже пробовал и, похоже, работает для одного запроса:
package com.github.client
import io.circe.{Decoder, HCursor}
import sttp.client._
import sttp.client.circe._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
case class Bla(paging: Int)
implicit val dataDecoder: Decoder[Bla] = (hCursor: HCursor) => {
for {
next_page <- hCursor.downField("foo").downArray.downField("bar").as[Int]
} yield Bla(next_page)
}
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r = basicRequest
.get(uri"https://foo.bar.io/v1/baz")
.header("accept", "application/json")
.header("Authorization", "hushh!")
.response(asJson[Bla])
r.send() // How can I instead of operating on a single request, operate on multiple
.flatMap { response =>
Task(response.body)
}
.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runSyncUnsafe() match {
case Left(error) => println(s"Error when executing request: $error")
case Right(data) => println(data)
}
}
Мои вопросы:
- Как я могу работать с несколькими запросами GET (вместо одного запроса) с помощью Monix, сохраняя при этом код асинхронным и компонуемым
- Как я могу избежать или справиться с ограничениями скорости, установленными сервером api
Кстати, я также могу гибко использовать другой сервер, если он будет поддерживать цель ограничения скорости.
1 ответ
Решение
Вы можете использовать monix.reactive.Observable вот так
Observable.repeatEval(postTask) // we generate infinite observable of the task
.throttle(1.second, 3) // set throttling
.mapParallelOrderedF(2)(_.runToFuture) // set execution parallelism and execute tasks
.subscribe() // start the pipline
while (true) {}