Как регулировать все исходящие асинхронные вызовы HttpClient между несколькими потоками в проекте.net Core API
Я разрабатываю веб-API ядра.net, который использует внешний API, который я не контролирую. Я нашел несколько отличных ответов о переполнении стека, которые позволили мне регулировать мои запросы к этому внешнему API, находясь в том же потоке, используя семафорлис. Мне интересно, как лучше расширить это регулирование, чтобы оно распространялось на приложения, а не просто регулирование для определенного списка задач. Я узнал о HttpMessageHandlers, и это, кажется, возможный способ перехвата всех исходящих сообщений и применения регулирования. Но я беспокоюсь о безопасности потоков и проблемах блокировки, которые я не могу понять. Я включаю свой текущий код регулирования и надеюсь, что это может помочь понять, что я пытаюсь сделать, но в нескольких потоках, и с задачами, которые постоянно добавляются вместо заранее определенного списка задач.
private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
var rtn = new List<PagedResultResponse>();
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: throttle);
foreach (var page in pages)
{
await throttler.WaitAsync();
allTasks.Add(
Task.Run(async () =>
{
try
{
var result = await GetPagedResult(client, url, page);
return result;
}
finally
{
throttler.Release();
}
}));
}
await Task.WhenAll(allTasks);
foreach (var task in allTasks)
{
var result = ((Task<PagedResultResponse>)task).Result;
rtn.Add(result);
}
return rtn;
}
1 ответ
Концептуальные вопросы
SemaphoreSlim
является потокобезопасным, поэтому нет проблем с безопасностью потоков или блокировками при использовании его в качестве дросселя параллелизма между несколькими потоками.HttpMessageHandler
Это действительно механизм промежуточного программного обеспечения для перехвата вызовов, осуществляемых черезHttpClient
, Таким образом, они являются идеальным способом применения регулирования параллелизма к вызовам Http с использованиемSemaphoreSlim
,
Простая реализация
Так что ThrottlingDelegatingHandler
может выглядеть так:
public class ThrottlingDelegatingHandler : DelegatingHandler
{
private SemaphoreSlim _throttler;
public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
{
_throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
if (request == null) throw new ArgumentNullException(nameof(request));
await _throttler.WaitAsync(cancellationToken);
try
{
return await base.SendAsync(request, cancellationToken);
}
finally
{
_throttler.Release();
}
}
}
Создайте и поддерживайте экземпляр как одиночный:
int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism));
Примените это DelegatingHandler
для всех случаев HttpClient
через которые вы хотите параллельно-дросселировать звонки:
HttpClient throttledClient = new HttpClient(throttle);
Тот HttpClient
не должен быть одиноким: только throttle
экземпляр делает.
Для краткости я опустил код DI Net Core DI, но вы бы зарегистрировали синглтон ThrottlingDelegatingHandler
экземпляр с контейнером.Net Core, получить этот синглтон по DI в точке использования и использовать его в HttpClient
Вы строите, как показано выше.
Но:
Лучшая реализация с HttpClientFactory (.NET Core 2.1)
Выше все еще возникает вопрос, как вы собираетесь управлять HttpClient
время жизни:
- Синглтон (в приложении)
HttpClient
s не получать обновления DNS. Ваше приложение не будет знать об обновлениях DNS, если вы не убьете и не перезапустите его (возможно, нежелательно). - Часто создаваемый и утилизируемый шаблон,
using (HttpClient client = ) { }
с другой стороны, может вызвать истощение сокета.
Одна из целей дизайна HttpClientFactory
должен был управлять жизненными циклами HttpClient
экземпляры и их делегирующие обработчики, чтобы избежать этих проблем.
В.NET Core 2.1 вы можете использовать HttpClientFactory
подключить все это в ConfigureServices(IServiceCollection services)
в Startup
класс, как это:
int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));
services.AddHttpClient("MyThrottledClient")
.AddHttpMessageHandler<ThrottlingDelegatingHandler>();
("MyThrottledClient" здесь представляет собой подход именованного клиента, просто чтобы этот пример был коротким; типизированные клиенты избегают именования строк.)
В точке использования получите IHttpClientFactory
по DI ( ссылка), затем позвоните
var client = _clientFactory.CreateClient("MyThrottledClient");
чтобы получить HttpClient
экземпляр предварительно настроен с помощью синглтона ThrottlingDelegatingHandler
,
Все звонки через HttpClient
полученный таким образом экземпляр будет регулироваться (как правило, через приложение) до первоначально настроенного int maxParallelism
,
И HttpClientFactory волшебным образом справляется со всеми HttpClient
жизненные проблемы.
Использование Polly с IHttpClientFactory, чтобы получить все это "из коробки"
Polly глубоко интегрирован с IHttpClientFactory, а Polly также предоставляет политику Bulkhead, которая работает как дроссель параллелизма с помощью идентичного механизма SemaphoreSlim.
Таким образом, в качестве альтернативы ThrottlingDelegatingHandler
Вы также можете просто использовать политику Polly Bulkhead с IHttpClientFactory из коробки. В вашем Startup
класс, просто:
int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);
services.AddHttpClient("MyThrottledClient")
.AddPolicyHandler(throttler);
Получить предварительно настроенный HttpClient
экземпляр из HttpClientFactory, как и раньше. Как и раньше, все звонки через такой "MyThrottledClient" HttpClient
Экземпляр будет параллельно настроен на настроенный maxParallelism
,
Политика Polly Bulkhead дополнительно предлагает возможность настроить, сколько операций вы хотите разрешить одновременно "ставить в очередь" для слота выполнения в главном семафоре. Так, например:
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);
когда настроено как указано выше в HttpClient
, разрешил бы 10 параллельных http-вызовов и до 100 http-вызовов в "очередь" для слота выполнения. Это может обеспечить дополнительную устойчивость для систем с высокой пропускной способностью за счет предотвращения сбоя в нисходящей системе, вызывающего чрезмерное увеличение ресурсов при вызовах в очереди в восходящем направлении.
Чтобы использовать параметры Polly с HttpClientFactory, потяните Microsoft.Extensions.Http.Polly
а также Polly
пакеты nuget.
Ссылки: Полли глубокая документация на Полли и IHttpClientFactory; Переборка политики.
Приложение к Задачам
Вопрос использует Task.Run(...)
и упоминает:
веб-интерфейс.net core, который использует внешний API
а также:
с задачами, которые постоянно добавляются вместо предварительно определенного списка задач.
Если ваш веб-интерфейс.net core использует внешний API только один раз для каждого запроса, обрабатывает веб-интерфейс.net core, и вы применяете подходы, обсуждаемые в оставшейся части этого ответа, перекладывая внешний http-вызов ниже по потоку на новый Task
с Task.Run(...)
будет ненужным и только создаст дополнительные затраты Task
экземпляры и переключение потоков. Ядро Dot Net уже будет выполнять входящие запросы в нескольких потоках в пуле потоков.