Как получить доступ к основной очереди ThreadpoolExecutor потокобезопасным способом

Метод getQueue() обеспечивает доступ к базовой очереди блокировки в ThreadPoolExecutor, но это не кажется безопасным.

Обход через очередь, возвращаемую этой функцией, может пропустить обновления, внесенные в очередь ThreadPoolExecutor.

"Метод getQueue() разрешает доступ к рабочей очереди для целей мониторинга и отладки. Использование этого метода для любых других целей настоятельно не рекомендуется".

Что бы вы сделали, если бы вы хотели пересмотреть workQueue, используемый ThreadPoolExecutor? Или есть альтернативный подход?

Это продолжение.. Выбор структуры данных для варианта задачи производителя-потребителя.

Сейчас я пытаюсь использовать несколько производителей для нескольких потребителей, но я хочу использовать существующий пул потоков, так как я не хочу сам управлять пулом потоков, а также я хочу получить обратный вызов, когда ThreadPoolExecutor завершил выполнение некоторой задачи вместе со способностью исследовать потокобезопасным способом структура данных "транзакции в процессе".

3 ответа

Решение

Вы можете переопределить методы beforeExecute и afterExecute, чтобы сообщить, что задача запущена и завершена. Вы можете переопределить execute(), чтобы знать, когда добавляется задача.

Проблема, с которой вы столкнулись, заключается в том, что очередь не предназначена для запроса, и задача может быть использована до того, как вы ее увидите. Одним из способов решения этой проблемы является создание собственной реализации очереди (возможно, переопределение / обертывание ConcurrentLinkedQueue)

Кстати: очередь потокобезопасна, однако не гарантируется, что вы увидите каждую запись.

ConcurrentLinkedQueue.iterator() документируется как

Возвращает итератор для элементов в этой очереди в правильной последовательности. Возвращенный итератор является "слабосогласованным" итератором, который никогда не вызовет исключение ConcurrentModificationException, и гарантирует прохождение элементов в том виде, в каком они существовали при создании итератора, и может (но не гарантируется) отражать любые модификации, следующие после построения.

Если вы хотите скопировать элементы в очереди и убедиться, что то, что у вас есть в очереди, не было выполнено, вы можете попробовать это:

а) Представить возможность приостановить и возобновить выполнение. См. http://download.oracle.com/javase/1,5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html

б) сначала приостановить очередь, затем скопировать очередь, затем возобновить очередь.

И тогда у меня есть свой вопрос. Проблема, которую я вижу, заключается в том, что пока вы выполняете свой "Runnable", этот "Runnable" не помещается в очередь, а является "оболочкой" FutureTask, и я не могу найти какой-либо способ определить, какой именно из моих runnables я ищу в. Таким образом, захват и изучение очереди довольно бесполезны. Кто-нибудь знает, что я там пропустил?

Если вы будете следовать совету Джона Скита в своем принятом ответе на предыдущий вопрос, то вы будете контролировать доступ к своим очередям с помощью блокировок. Если вы получите блокировку в очереди, находящейся в процессе выполнения, вы можете гарантировать, что при прохождении через нее ни один элемент не будет пропущен.

Проблема с этим, конечно, заключается в том, что во время выполнения обхода все другие операции в очереди (другие производители и потребители, пытающиеся получить к ней доступ) будут блокироваться, что может очень сильно повлиять на производительность.

Другие вопросы по тегам