Как использовать BlockingCollection<T>, чтобы заблокировать всех производителей, пока не будут использованы все элементы (пакетное потребление)?
Я прочитал несколько похожих ссылок, но не одинаковых, пытаясь найти ответы на некоторые вопросы: Как использовать BlockingCollection
Однако (в приведенной выше ссылке) неиспользование GetConsumingEnumerable выглядит подозрительно.
Каков правильный метод эффективной блокировки производителей, пока потребитель (должен быть единичным) опустошает коллекцию?
[Мы хотим выполнить пакетную обработку, потому что каждый пакет выполняет вызов веб-службы, который был бы узким местом, если бы каждое отдельное сообщение / элемент нуждался в своем собственном вызове. Пакетирование сообщений / элементов является решением этой проблемы.]
В идеале:
1) Получить сообщение
2) Новое задание продюсера в коллекцию
3) Когда коллекция "заполнена" (произвольное ограничение), заблокируйте всех производителей, новую потребительскую задачу для использования ВСЕХ коллекций, затем разблокируйте производителей.
Другими словами; Я хочу, чтобы (параллельные производители) xor (единый потребитель) действовал на коллекции в любое время.
Похоже, это должно было быть сделано раньше, но я не могу найти фрагмент кода, который действует именно так.
Спасибо за любую помощь.
2 ответа
При использовании этой модели вся работа полностью сериализуется, то есть вы никогда не работаете более чем с одной "вещью" одновременно. Либо производитель работает, либо потребитель работает. Из-за этого вам на самом деле не нужна коллекция, которой манипулируют как производители, так и потребители, вместо этого вы можете иметь производителя, который производит партии традиционной коллекции, которую потребитель потребляет, когда она готова. Это может выглядеть примерно так:
public Task<List<Thing>> Produce(Message message)
{
//...
}
public Task Consume(List<Thing> data)
{
//...
}
public async Task MessageReceived(Message message)
{
while(HaveMoreBatches(message))
{
await Consume(await Produce(message));
}
}
Это позволяет вам производить партию, затем потреблять ее, затем производить другую партию, затем потреблять ее и т. Д., Пока не останется больше партий для производства.
Согласно вашему расплывчатому описанию, я считаю, что вам нужен двойной буфер.
Просто создайте два буфера. Производители пишут в один пока. Когда он заполняется или срабатывает таймер, он "заменяется" на второй, и производители начинают запись в новый. Затем потребитель начинает читать первый, теперь полный буфер.
Это позволяет как производителям, так и потребителям работать одновременно. И убедитесь, что потребитель обрабатывает все ранее созданные работы в пакете, прежде чем повторять цикл снова.