Как регулировать все исходящие асинхронные вызовы HttpClient между несколькими потоками в проекте.net Core API

Я разрабатываю веб-API ядра.net, который использует внешний API, который я не контролирую. Я нашел несколько отличных ответов о переполнении стека, которые позволили мне регулировать мои запросы к этому внешнему API, находясь в том же потоке, используя семафорлис. Мне интересно, как лучше расширить это регулирование, чтобы оно распространялось на приложения, а не просто регулирование для определенного списка задач. Я узнал о HttpMessageHandlers, и это, кажется, возможный способ перехвата всех исходящих сообщений и применения регулирования. Но я беспокоюсь о безопасности потоков и проблемах блокировки, которые я не могу понять. Я включаю свой текущий код регулирования и надеюсь, что это может помочь понять, что я пытаюсь сделать, но в нескольких потоках, и с задачами, которые постоянно добавляются вместо заранее определенного списка задач.

private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
        var rtn = new List<PagedResultResponse>();
        var allTasks = new List<Task>();
        var throttler = new SemaphoreSlim(initialCount: throttle);
        foreach (var page in pages)
        {
            await throttler.WaitAsync();
            allTasks.Add(
                Task.Run(async () =>
                {
                    try
                    {
                        var result = await GetPagedResult(client, url, page);
                        return result;
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
        }
        await Task.WhenAll(allTasks);
        foreach (var task in allTasks)
        {
            var result = ((Task<PagedResultResponse>)task).Result;
            rtn.Add(result);
        }
        return rtn;
}

1 ответ

Концептуальные вопросы

Простая реализация

Так что ThrottlingDelegatingHandler может выглядеть так:

public class ThrottlingDelegatingHandler : DelegatingHandler
{
    private SemaphoreSlim _throttler;

    public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
    {
        _throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        await _throttler.WaitAsync(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _throttler.Release();
        }
    }
}

Создайте и поддерживайте экземпляр как одиночный:

int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)); 

Примените это DelegatingHandler для всех случаев HttpClient через которые вы хотите параллельно-дросселировать звонки:

HttpClient throttledClient = new HttpClient(throttle);

Тот HttpClient не должен быть одиноким: только throttle экземпляр делает.

Для краткости я опустил код DI Net Core DI, но вы бы зарегистрировали синглтон ThrottlingDelegatingHandler экземпляр с контейнером.Net Core, получить этот синглтон по DI в точке использования и использовать его в HttpClient Вы строите, как показано выше.

Но:

Лучшая реализация с HttpClientFactory (.NET Core 2.1)

Выше все еще возникает вопрос, как вы собираетесь управлять HttpClient время жизни:

  • Синглтон (в приложении) HttpClients не получать обновления DNS. Ваше приложение не будет знать об обновлениях DNS, если вы не убьете и не перезапустите его (возможно, нежелательно).
  • Часто создаваемый и утилизируемый шаблон, using (HttpClient client = ) { } с другой стороны, может вызвать истощение сокета.

Одна из целей дизайна HttpClientFactory должен был управлять жизненными циклами HttpClient экземпляры и их делегирующие обработчики, чтобы избежать этих проблем.

В.NET Core 2.1 вы можете использовать HttpClientFactory подключить все это в ConfigureServices(IServiceCollection services) в Startup класс, как это:

int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));

services.AddHttpClient("MyThrottledClient")
    .AddHttpMessageHandler<ThrottlingDelegatingHandler>();

("MyThrottledClient" здесь представляет собой подход именованного клиента, просто чтобы этот пример был коротким; типизированные клиенты избегают именования строк.)

В точке использования получите IHttpClientFactory по DI ( ссылка), затем позвоните

var client = _clientFactory.CreateClient("MyThrottledClient");

чтобы получить HttpClient экземпляр предварительно настроен с помощью синглтона ThrottlingDelegatingHandler,

Все звонки через HttpClient полученный таким образом экземпляр будет регулироваться (как правило, через приложение) до первоначально настроенного int maxParallelism,

И HttpClientFactory волшебным образом справляется со всеми HttpClient жизненные проблемы.

Использование Polly с IHttpClientFactory, чтобы получить все это "из коробки"

Polly глубоко интегрирован с IHttpClientFactory, а Polly также предоставляет политику Bulkhead, которая работает как дроссель параллелизма с помощью идентичного механизма SemaphoreSlim.

Таким образом, в качестве альтернативы ThrottlingDelegatingHandler Вы также можете просто использовать политику Polly Bulkhead с IHttpClientFactory из коробки. В вашем Startup класс, просто:

int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);

services.AddHttpClient("MyThrottledClient")
    .AddPolicyHandler(throttler);

Получить предварительно настроенный HttpClient экземпляр из HttpClientFactory, как и раньше. Как и раньше, все звонки через такой "MyThrottledClient" HttpClient Экземпляр будет параллельно настроен на настроенный maxParallelism,

Политика Polly Bulkhead дополнительно предлагает возможность настроить, сколько операций вы хотите разрешить одновременно "ставить в очередь" для слота выполнения в главном семафоре. Так, например:

var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);

когда настроено как указано выше в HttpClient, разрешил бы 10 параллельных http-вызовов и до 100 http-вызовов в "очередь" для слота выполнения. Это может обеспечить дополнительную устойчивость для систем с высокой пропускной способностью за счет предотвращения сбоя в нисходящей системе, вызывающего чрезмерное увеличение ресурсов при вызовах в очереди в восходящем направлении.

Чтобы использовать параметры Polly с HttpClientFactory, потяните Microsoft.Extensions.Http.Polly а также Polly пакеты nuget.

Ссылки: Полли глубокая документация на Полли и IHttpClientFactory; Переборка политики.


Приложение к Задачам

Вопрос использует Task.Run(...) и упоминает:

веб-интерфейс.net core, который использует внешний API

а также:

с задачами, которые постоянно добавляются вместо предварительно определенного списка задач.

Если ваш веб-интерфейс.net core использует внешний API только один раз для каждого запроса, обрабатывает веб-интерфейс.net core, и вы применяете подходы, обсуждаемые в оставшейся части этого ответа, перекладывая внешний http-вызов ниже по потоку на новый Task с Task.Run(...) будет ненужным и только создаст дополнительные затраты Task экземпляры и переключение потоков. Ядро Dot Net уже будет выполнять входящие запросы в нескольких потоках в пуле потоков.

Другие вопросы по тегам