Как использовать BlockingCollection<T>, чтобы заблокировать всех производителей, пока не будут использованы все элементы (пакетное потребление)?

Я прочитал несколько похожих ссылок, но не одинаковых, пытаясь найти ответы на некоторые вопросы: Как использовать BlockingCollection в пакетном режиме

Однако (в приведенной выше ссылке) неиспользование GetConsumingEnumerable выглядит подозрительно.

Каков правильный метод эффективной блокировки производителей, пока потребитель (должен быть единичным) опустошает коллекцию?

[Мы хотим выполнить пакетную обработку, потому что каждый пакет выполняет вызов веб-службы, который был бы узким местом, если бы каждое отдельное сообщение / элемент нуждался в своем собственном вызове. Пакетирование сообщений / элементов является решением этой проблемы.]

В идеале:

1) Получить сообщение

2) Новое задание продюсера в коллекцию

3) Когда коллекция "заполнена" (произвольное ограничение), заблокируйте всех производителей, новую потребительскую задачу для использования ВСЕХ коллекций, затем разблокируйте производителей.

Другими словами; Я хочу, чтобы (параллельные производители) xor (единый потребитель) действовал на коллекции в любое время.

Похоже, это должно было быть сделано раньше, но я не могу найти фрагмент кода, который действует именно так.

Спасибо за любую помощь.

2 ответа

При использовании этой модели вся работа полностью сериализуется, то есть вы никогда не работаете более чем с одной "вещью" одновременно. Либо производитель работает, либо потребитель работает. Из-за этого вам на самом деле не нужна коллекция, которой манипулируют как производители, так и потребители, вместо этого вы можете иметь производителя, который производит партии традиционной коллекции, которую потребитель потребляет, когда она готова. Это может выглядеть примерно так:

public Task<List<Thing>> Produce(Message message)
{
    //...
}

public Task Consume(List<Thing> data)
{
    //...
}

public async Task MessageReceived(Message message)
{
    while(HaveMoreBatches(message))
    {
        await Consume(await Produce(message));
    }
}

Это позволяет вам производить партию, затем потреблять ее, затем производить другую партию, затем потреблять ее и т. Д., Пока не останется больше партий для производства.

Согласно вашему расплывчатому описанию, я считаю, что вам нужен двойной буфер.

Просто создайте два буфера. Производители пишут в один пока. Когда он заполняется или срабатывает таймер, он "заменяется" на второй, и производители начинают запись в новый. Затем потребитель начинает читать первый, теперь полный буфер.

Это позволяет как производителям, так и потребителям работать одновременно. И убедитесь, что потребитель обрабатывает все ранее созданные работы в пакете, прежде чем повторять цикл снова.

Другие вопросы по тегам