Выбор структуры данных для варианта задачи потребителя производителя

Прямо сейчас у меня есть очередь с несколькими производителями и одним потребителем.

Потребительская резьба работает медленно. Кроме того, потребитель принимает элемент из очереди посредством операции просмотра, и пока операция потребления не будет завершена, элемент не может быть удален из очереди. Это связано с тем, что поток производителя как побочная операция также делает снимок всех элементов, которые не были полностью обработаны в этот момент времени.

Теперь я хочу изменить свой код для поддержки нескольких потребителей. Итак, допустим, у меня есть три потока, один поток возьмет первый элемент, который может быть прочитан через операцию просмотра. Второй потребительский поток может перейти ко второму элементу, но у меня нет способа получить его, поскольку очередь не поддерживает извлечение второго элемента.

Итак, возможность использовать стандартную версию ConcurrentLinkedQueue (которую я сейчас использую) отсутствует.

Я думаю об использовании очереди приоритетов, но тогда мне нужно будет связать с каждым элементом флаг, который сообщает мне, используется ли этот элемент каким-либо потоком или нет.

Какая структура данных больше всего подходит для этой проблемы?

2 ответа

Решение

Похоже, у вас должно быть две очереди:

  • Необработанные
  • В ходе выполнения

Потребитель атомарно (через блокировку) извлекает из необработанной очереди и добавляет в текущую очередь. Таким образом, несколько потребителей могут работать одновременно... но производитель все еще может сделать снимок обеих очередей, когда это необходимо. Когда потребитель завершает задачу, он удаляет ее из очереди выполнения. (Это не обязательно должна быть очередь, так как ничто не "вытягивает" ее как таковое. Просто некоторая коллекция, которую вы легко можете добавить и удалить из нее.)

Учитывая, что вам нужна блокировка, чтобы сделать передачу атомарной, вам, вероятно, не нужны базовые очереди, чтобы быть параллельными - вы уже будете защищать весь общий доступ.

Я согласен с Джоном Скитом (+1) в том, что вам нужно два магазина для записи ожидающих и незавершенных предметов. Я бы использовал LinkedBlockingQueue и пусть каждый из ваших потребителей звонит take() в теме. Когда элемент поступает в очередь, его принимает один из потребителей.

Запись того, что выполняется и что завершено, будет отдельной операцией. Я бы поддержал HashSet из всех элементов, которые еще не были завершены, и мой продюсер сначала (атомарно) добавил элемент в HashSet из незавершенных элементов, а затем вставил элемент в очередь. Как только потребитель закончил свою работу, он удаляет элемент из HashSet.

Ваш производитель может отсканировать HashSet, чтобы определить, что является выдающимся.

Другие вопросы по тегам