BufferBlock<T> постоянно
Я хочу реализовать шаблон потребитель / производитель, используя BufferBlock<T>
в контексте запроса ASP.NET, но мне также нужно:
- буферизировать "произведенные" сообщения, чтобы предотвратить заполнение очереди доступной памяти (скажем, 50 элементов в буфере за раз)
- Производитель будет "производить" непрерывно, и поток, на котором он работает, будет остановлен вскоре после того, как он выдаст кучу сообщений. (когда заканчивается запрос ASP.NET)
- Потребитель будет "потреблять" непрерывно, так как несколько производителей (запросы ASP.NET) создают кучу сообщений каждое, поэтому мы думаем, что использование отдельного потока, потребляющего, - это путь.
[Компонент, который мне в конечном счете нужен, является промежуточным звеном для регистрации вызовов внутри долго работающей службы REST, размещенной на ASP.NET, где каждый запрос "генерирует" много сообщений регистрации по мере выполнения, и потребитель в конечном итоге "потребляет" их, записывая их в какое-то хранилище сохраняемости (которое сравнительно медленнее, по сети).]
Мы следовали указаниям Стивена Клири по использованию BufferBlock<T>
здесь, но кажется, что все эти примеры называют BufferBlock<T>.Complete()
метод, который прекращает производство сообщений. (т.е. один раз Complete()
называется, зовет SendAsync()
игнорируется блоком.)
Так как мы можем использовать BufferBlock<T>
в "непрерывном" режиме, когда потребитель непрерывно потребляет, а производитель непрерывно производит? Думаю без звонка Complete()
,
Мы используем BoundedCapacity
определить буфер:
var _queue = new BufferBlock<TMessage>(new DataflowBlockOptions
{
BoundedCapacity = 50
});
а потом позже... для начала потребителю
Task.Run(async () =>
{
while (true)
{
try
{
while (await _queue.OutputAvailableAsync())
{
var message = await _queue.ReceiveAsync();
try
{
_storage.Store(message)
}
catch (Exception)
{
//Log and Ignore exception and continue
}
}
}
catch (Exception)
{
//Log and Ignore exception and continue
}
}
});
Итак, когда потребитель работает бесконечно, мы теперь хотим непрерывно звонить SendAsync()
(каждым потоком запросов ASP.NET) и очередь сообщений, в то время как потребитель (поток пула потоков) непрерывно потребляет их.
Как вы используете BufferBlock<T>
или другие типы TPL для достижения этой цели?
ОБНОВЛЕНИЕ (22/7/2016): я закончил тем, что связал ActionBlock с BufferBlock и просто передал делегата _storage.Store(message)
к блоку ActionBlock для упрощения обслуживания, а также для избежания цикла Complete()
вещь.