Влияние на производительность параллельных потоков Java
Каковы лучшие практики использования .stream().parallel()
?
Например, если у вас есть куча блокирующих вызовов ввода / вывода, и вы хотите проверить, .anyMatch(...)
Делать это параллельно кажется разумной вещью.
Пример кода:
public boolean hasAnyRecentReference(JobId jobid) {
<...>
return pendingJobReferences.stream()
.parallel()
.anyMatch(pendingRef -> {
JobReference readReference = pendingRef.sync();
Duration referenceAge = timeService.timeSince(readReference.creationTime());
return referenceAge.lessThan(maxReferenceAge)
});
}
На первый взгляд это выглядит разумно, потому что мы можем выполнять несколько блокировочных чтений одновременно, так как мы заботимся только о том, что совпадает, вместо проверки одного за другим (поэтому, если каждое чтение занимает 50 мс, нам нужно только ждать (50 мс * Ожидаемый_номер OfNonRecentRefs) / numThreads).
Может ли внедрение этого кода в производственную среду иметь непредвиденные последствия для производительности других частей кодовой базы?
1 ответ
РЕДАКТИРОВАТЬ: как @edharned указывает .parallel()
сейчас использует CountedCompleter
вместо звонка .join()
что имеет свои проблемы, а также объяснил Эд в http://coopsoft.com/ar/Calamity2Article.html под What is currently being done?
раздел.
Я полагаю, что информация ниже все еще полезна, чтобы понять, почему структура fork-join является хитрой, и альтернативы, предложенные для .parallel()
среди выводов до сих пор актуальны.
Хотя дух кода верен, фактический код может оказывать влияние на всю систему, которая использует .parallel()
хотя это совсем не очевидно.
Некоторое время назад я нашел статью, которая рекомендовала не делать этого: https://dzone.com/articles/think-twice-using-java-8, но до недавнего времени я не копал глубже.
Это мои мысли после прочтения:
.parallel()
в Java используетForkJoinPool.commonPool()
который является единственнымForkJoinPool
общий для всех потоков (ForkJoinPool.commonPool()
это публичный статический метод, поэтому теоретически другие библиотеки / части кода могут его использовать)ForkJoinPool
реализует похищение работ и имеет очереди для каждого потока в дополнение к общей очереди- Воровство работы означает, что когда поток простаивает, он будет искать дополнительную работу
- Сначала я подумал: по этому определению не
cached
пул потоков также выполняет кражу работы (хотя некоторые ссылки называют это разделением работы для кэшированных пулов потоков)? Оказывается, что при использовании слова idle возникает некоторая неопределенность в терминологии:
- В
cached
Threadpool, поток простаивает только после того, как он завершил свою задачу. Это не становится бездействующим, если это заблокировано, ожидая на вызове блокировки В
forkjoin
поток пула, поток простаивает, когда он завершил свою задачу или когда он вызывает.join()
метод (который является специальным блокирующим вызовом) в подзадаче.когда
.join()
вызывается в подзадаче, поток становится бездействующим, ожидая завершения этой подзадачи. Во время простоя он попытается выполнить любую другую доступную задачу, даже если он находится в очереди другого потока (он крадет работу).[Это важный бит] Как только он нашел другую задачу для выполнения, он должен завершить ее, прежде чем возобновить свое первоначальное выполнение, даже если подзадача, которую он ожидал, завершается, пока поток все еще выполняет украденную задачу.
[Это также важно] Такое поведение при краже работы относится только к потокам, которые вызывают
.join()
, Если поток блокируется чем-то другим, например, вводом / выводом, он становится бездействующим (то есть не может похитить работу).
- В
Потоки Java не позволяют вам предоставлять пользовательский ForkJoinPool, но https://github.com/amaembo/streamex делает
Мне потребовалось некоторое время, чтобы понять последствия 2.3.2
Поэтому я приведу небольшой пример, чтобы проиллюстрировать проблему:
Примечание: это фиктивные примеры, но вы можете попасть в эквивалентные ситуации, не осознавая этого, используя потоки, которые внутренне выполняют функцию fork join.
Кроме того, я буду использовать чрезвычайно упрощенный псевдокод, который служит только для иллюстрации проблемы.parallel (), но не обязательно имеет смысл в противном случае.
Допустим, мы реализуем сортировку слиянием
merge_sort(list):
left, right = split(list)
leftTask = mergeSortTask(left).fork()
rightTask = mergeSortTaks(right).fork()
return merge(leftTask.join(), rightTask.join())
Давайте теперь скажем, у нас есть другой кусок кода, который выполняет следующее:
dummy_collect_results(queriesIds):
pending_results = []
for id in queriesIds:
pending_results += longBlockingIOTask(id).fork()
// do more stuff
Что здесь происходит?
Когда вы пишете код сортировки слиянием, вы думаете, что вызовы сортировки не выполняют никаких операций ввода-вывода, поэтому их производительность должна быть довольно детерминированной, верно?
Правильно. То, что вы не можете ожидать, это то, что, так как dummy_collect_results
метод создал кучу долго выполняющихся и блокирующих подзадач, когда потоки, выполняющие задачи слияния, блокируются на .join()
в ожидании завершения подзадач они могут начать выполнение одной из длинных подзадач блокировки.
Это плохо, потому что, как упомянуто выше, однажды длинная блокировка (на I/O, не .join()
вызов, так что поток больше не будет бездействовать) был украден, его необходимо выполнить независимо от того, ожидала ли подзадача, через которую поток ожидал .join()
завершено при блокировке ввода / вывода.
Это делает выполнение задач сортировки слиянием более не детерминированным, поскольку выполняющие их потоки могут в итоге украсть интенсивные задачи ввода-вывода, сгенерированные кодом, который живет где-то еще целиком.
Это также довольно страшно и трудно поймать, потому что вы могли бы использовать .parallel()
по всей вашей кодовой базе без каких-либо проблем, и все, что нужно, это один класс, который вводит длительные задачи при использовании .parallel()
и внезапно все остальные части вашей кодовой базы могут получить непоследовательную производительность.
Итак, мои выводы:
- Теоретически,
.parallel()
хорошо, если вы можете гарантировать, что все задачи, которые когда-либо будут создаваться где-либо в вашем коде, короткие .parallel()
может иметь общесистемные последствия для производительности, которые неочевидны, если вы не знаете (например, если позже вы добавите один фрагмент кода, который использует.parallel()
и имеет длинные задачи, вы можете повлиять на производительность всего кода, который использует.parallel()
)- Потому что
2.
тебе лучше избегать.parallel()
в целом и либо использоватьExecutorCompletionService
или используйте https://github.com/amaembo/streamex который позволяет вам предоставить свой собственныйForkJoinPool
(что позволяет немного больше изоляции). Более того, вы можете использовать https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53, что дает точный контроль над вашим механизмом параллелизма.