Гибкая CountDownLatch не может использовать Phaser из-за ограничения
Я получаю большой файл с N записями. Для каждой записи я создаю новую тему. Мне нужно дождаться завершения всех N потоков.
Сначала я использовал Phaser, но его реализация ограничена партиями в 65 тысяч человек. Итак, взрывается, потому что N может быть как 100K.
Затем я попробовал CountDownLatch. Это прекрасно работает, очень простая концепция и очень простая реализация. Но я не знаю число Н.
Phaser - это мое решение, но оно имеет этот предел.
Есть идеи?
Этот пост связан: Гибкий CountDownLatch?
3 ответа
Похоже, проблема, которую вы пытаетесь решить, заключается в том, чтобы как можно быстрее обрабатывать большое количество задач и ждать завершения обработки.
Проблема с одновременной обработкой большого количества задач заключается в том, что это может привести к слишком большому количеству переключений контекста и, по существу, нанести вред вашей машине и замедлить обработку выше определенного (аппаратно-зависимого) числа одновременных потоков. Это означает, что вам нужно иметь верхний предел одновременного выполнения рабочих потоков.
Phaser и CountDownLatch являются примитивами синхронизации, их целью является обеспечение контроля доступа к критическим блокам кода, а не управление параллельным выполнением.
Я бы использовал службу Executor в этом случае. Он поддерживает добавление задач (во многих формах, включая Runnable).
Вы можете легко создать ExecutorService
используя класс Executor s. Я бы порекомендовал использовать для этого пул потоков фиксированного размера с максимальным потоком 20-100 - в зависимости от того, насколько интенсивно загружаются ваши задачи. Чем больше вычислительных мощностей требуется для задачи, тем меньшее количество параллельных потоков может быть обработано без серьезного снижения производительности.
Есть несколько способов ожидания завершения всех задач:
- Собрать все
Future
экземпляры, возвращаемыеsubmit
метод и просто вызвать получить на всех них. Это гарантирует, что каждая из задач будет выполнена ко времени завершения цикла. - Завершите работу службы исполнителя и дождитесь завершения всех представленных задач. Недостатком этого метода является то, что вам нужно указать максимальное количество времени ожидания для завершения задач. Кроме того, это менее элегантно, вы не всегда хотите закрыть
Executor
вниз, это зависит от того, пишете ли вы одноразовое приложение или сервер, который продолжает работать после этого - в случае серверного приложения вам определенно придется придерживаться предыдущего подхода.
Наконец, вот фрагмент кода, иллюстрирующий все это:
List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);
for(TaskFromFile task : tasks) {
// createRunnable is not necessary in case your task implements Runnable
executor.submit(createRunnable(task));
}
// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
ReusableCountLatch
это CountDownLatch
альтернатива, которая позволяет приращения, а также.
Использование заключается в следующем:
ReusableCountLatch latch = new ReusableCountLatch(); // creates latch with initial count 0
ReusableCountLatch latch = new ReusableCountLatch(10); // creates latch with initial count 10
latch.increment(); // increments counter
latch.decrement(); // decrement counter
latch.waitTillZero(); // blocks until counts falls to zero
boolean succeeded = latch.waitTillZero(200, MILLISECONDS); // waits for up to 200 milliseconds until count falls to zero
int count = latch.getCount(); // gets actual count
Чтобы использовать его, просто добавьте эту зависимость gradle/maven в ваш проект: 'com.github.matejtymes:javafixes:1.0'
Более подробную информацию можно найти здесь: https://github.com/MatejTymes/JavaFixes
С AtomicInteger вы можете легко достичь того же. Инициализируйте с 1 и увеличивайте с каждым новым потоком. Однажды сделанный и в рабочем и в продюсере, уменьшите и получите. Если ноль, запустите свою отделку Runnable.