Как реализовать ограниченный вызов с помощью модификации клиента и сопрограмм

У меня есть следующий код:

val context = newFixedThreadPoolContext(nThreads = 10, name="myThreadPool")
val total = 1_000_000 //can be other number as well
val maxLimit = 1_000
return runBlocking {
  (0..total step maxLimit).map {
    async(context) {
      val offset = it
      val limit = it + maxLimit
      blockingHttpCall(offset, limit)
    }
  }.flatMap {
    it.await()
  }.associateBy {
    ...
  }.toMutableMap()
}

Я бы хотел, чтобы только 10 звонков происходили одновременно с API блокировки. Тем не менее, кажется, что приведенный выше код не делает это, как я ожидаю (я думаю, что все вызовы начинаются немедленно), или, по крайней мере, я не понимаю, если это так.
Как правильно это реализовать? Будет ли работать то же решение, если я буду использовать async api модернизации?

1 ответ

Решение

Я точно не знаю ваш случай, но самый простой способ - использовать API OkHttp для настройки уровня параллелизма, например, это стратегия параллелизма по умолчанию OkHttp

Но вы можете иметь собственную стратегию, если вы установите собственную Dispatcher экземпляр для OkHttpClient.Builder

Конечно, вы можете использовать также сопрограммы

Ваша текущая реализация неверна, потому что вы создаете диспетчер сопрограмм для каждого элемента, но чтобы иметь общий пул потоков, все сопрограммы должны использовать один и тот же диспетчер, просто переместите newFixedThreadPoolContext создание вне цикла (теперь у вас есть 1000 диспетчеров каждый с 10 потоками).

Но я не рекомендую вам использовать сопрограммы + блокирующие вызовы, лучше настроить параллелизм OkHttp (он более гибкий) и использовать сопрограммы с неблокирующими вызовами (вы можете написать собственный адаптер или использовать существующую библиотеку, такую ​​как https://github.com/gildor/kotlin-coroutines-retrofit), Это позволит вам смешивать ваши запросы HTTP и код пользовательского интерфейса или другие задачи.

Поэтому, если вы используете неблокирующий внутренний параллелизм API + OkHttp, вам не нужно иметь специальный код для управления параллелизмом, конечно, вы можете ограничить количество одновременных вызовов, как в примере выше (с фиксированной конструкцией диспетчера), но Я действительно не думаю, что это имеет большой смысл, потому что вы можете уменьшить уровень параллелизма, а не увеличить его.

После перехода к неблокирующему API вы можете просто запустить все сопрограммы в любом диспетчере сопрограмм параллельно (даже в потоке пользовательского интерфейса) и ожидать результатов без блокировки.

Кроме того, неявное управление параллелизмом с использованием конфигурации OkHttpClient выглядит как более правильный путь с точки зрения архитектуры (у вас может быть DI-код, который настраивает Retrofit + OkHttp и предоставляет его клиентскому коду с предварительно настроенной политикой параллелизма). Конечно, вы можете достичь этого, используя другие подходы, но этот выглядит более естественным для меня.

Другие вопросы по тегам