BufferBlock<T> постоянно

Я хочу реализовать шаблон потребитель / производитель, используя BufferBlock<T>в контексте запроса ASP.NET, но мне также нужно:

  1. буферизировать "произведенные" сообщения, чтобы предотвратить заполнение очереди доступной памяти (скажем, 50 элементов в буфере за раз)
  2. Производитель будет "производить" непрерывно, и поток, на котором он работает, будет остановлен вскоре после того, как он выдаст кучу сообщений. (когда заканчивается запрос ASP.NET)
  3. Потребитель будет "потреблять" непрерывно, так как несколько производителей (запросы ASP.NET) создают кучу сообщений каждое, поэтому мы думаем, что использование отдельного потока, потребляющего, - это путь.

[Компонент, который мне в конечном счете нужен, является промежуточным звеном для регистрации вызовов внутри долго работающей службы REST, размещенной на ASP.NET, где каждый запрос "генерирует" много сообщений регистрации по мере выполнения, и потребитель в конечном итоге "потребляет" их, записывая их в какое-то хранилище сохраняемости (которое сравнительно медленнее, по сети).]

Мы следовали указаниям Стивена Клири по использованию BufferBlock<T> здесь, но кажется, что все эти примеры называют BufferBlock<T>.Complete() метод, который прекращает производство сообщений. (т.е. один раз Complete() называется, зовет SendAsync() игнорируется блоком.)

Так как мы можем использовать BufferBlock<T> в "непрерывном" режиме, когда потребитель непрерывно потребляет, а производитель непрерывно производит? Думаю без звонка Complete(),

Мы используем BoundedCapacity определить буфер:

var _queue = new BufferBlock<TMessage>(new DataflowBlockOptions
    {
        BoundedCapacity = 50
    });

а потом позже... для начала потребителю

Task.Run(async () =>
{
    while (true)
    {
        try
        {
            while (await _queue.OutputAvailableAsync())
            {
                var message = await _queue.ReceiveAsync();

                try
                {
                    _storage.Store(message)
                }
                catch (Exception)
                {
                    //Log and Ignore exception and continue
                }
            }
        }
        catch (Exception)
        {
            //Log and Ignore exception and continue
        }
    }
});

Итак, когда потребитель работает бесконечно, мы теперь хотим непрерывно звонить SendAsync() (каждым потоком запросов ASP.NET) и очередь сообщений, в то время как потребитель (поток пула потоков) непрерывно потребляет их.

Как вы используете BufferBlock<T> или другие типы TPL для достижения этой цели?

ОБНОВЛЕНИЕ (22/7/2016): я закончил тем, что связал ActionBlock с BufferBlock и просто передал делегата _storage.Store(message) к блоку ActionBlock для упрощения обслуживания, а также для избежания цикла Complete() вещь.

0 ответов

Другие вопросы по тегам