Как я могу закрыть серверную часть STTP после выполнения моих запросов?
В настоящее время я изучаю STTP и играю с ним, используя бэкэнд Monix. Я в основном застрял в закрытии серверной части после того, как все мои запросы (каждый запрос - это задача) были обработаны.
Я создал образец / макет кода, чтобы он напоминал мою проблему (насколько я понимаю, моя проблема носит более общий характер, а не специфична для моего кода):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
И моя функция fetch (api call maker) выглядит так:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
Поскольку моя основная задача содержит другие задачи, которые мне позже нужно будет обработать, мне нужно найти альтернативный способ закрыть бэкэнд Monix извне. Есть ли чистый способ закрыть серверную часть после того, как я использую запросы вList[Task[Response[Either[String, String]]]]
?
1 ответ
Проблемы возникают из-за того, что при открытом бэкэнде sttp вы вычисляете список задач, которые нужно выполнить - List[Task[Response[Either[String, String]]]]
, но вы их не запускаете. Следовательно, нам нужно упорядочить выполнение этих задач до закрытия серверной части.
Ключевым моментом здесь является создание единого описания задачи, которая выполняет все эти запросы, пока серверная часть все еще открыта.
Как только вы вычислите data
(что само по себе является задачей - описание вычисления, которое при запуске дает список задач, а также описания вычислений), нам нужно преобразовать это в единый, не вложенный Task
. Это можно сделать разными способами (например, используя простую последовательность), но в вашем случае это будет использоватьObservable
:
AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] =
ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
val activities = Observable
.fromTask(data)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.mapEval(identity)
.completedL
activities.guarantee(
backend.close()
)
}
Прежде всего отметим, что Observable.fromTask(...)
находится внутри самого внешнего flatMap
, поэтому создается, когда серверная часть еще открыта. Мы создаем наблюдаемый объект, дросселируем его и т. Д., А затем приходит решающий факт: как только у нас есть дросселируемый поток, мы оцениваем каждый элемент (каждый элемент являетсяTask[...]
- описание того, как отправить http-запрос) с помощью mapEval
. Мы получаем потокEither[String, String]
, которые являются результатами запросов.
Наконец, мы конвертируем поток в Task
с помощью .completedL
(отбрасывая результаты), который ожидает завершения всего потока.
Эта последняя задача затем выполняется с закрытием серверной части. Последняя последовательность побочных эффектов, которые произойдут, как описано выше, такова:
- открыть бэкэнд
- создать список задач (
data
) - создать поток, который регулирует элементы из списка, рассчитанного
data
- оценить каждый элемент в потоке (отправить запросы)
- закрыть серверную часть