ForkJoinPool, Phaser и управляемая блокировка: в какой степени они работают против тупиков?
Этот небольшой фрагмент кода никогда не заканчивается на jdk8u45 и используется для правильного завершения на jdk8u20:
public class TestForkJoinPool {
final static ExecutorService pool = Executors.newWorkStealingPool(8);
private static volatile long consumedCPU = System.nanoTime();
public static void main(String[] args) throws InterruptedException {
final int numParties = 100;
final Phaser p = new Phaser(1);
final Runnable r = () -> {
p.register();
p.arriveAndAwaitAdvance();
p.arriveAndDeregister();
};
for (int i = 0; i < numParties; ++i) {
consumeCPU(1000000);
pool.submit(r);
}
while (p.getArrivedParties() != numParties) {}
}
static void consumeCPU(long tokens) {
// Taken from JMH blackhole
long t = consumedCPU;
for (long i = tokens; i > 0; i--) {
t += (t * 0x5DEECE66DL + 0xBL + i) & (0xFFFFFFFFFFFFL);
}
if (t == 42) {
consumedCPU += t;
}
}
}
Док Фазер утверждает, что
Фазеры также могут использоваться задачами, выполняющимися в ForkJoinPool, что обеспечит достаточный параллелизм для выполнения задач, когда другие блокируются в ожидании перехода фазы.
Однако в Javadoc ForkjoinPool # mangedBlock говорится:
При запуске в ForkJoinPool пул может быть сначала расширен для обеспечения достаточного параллелизма
Только мая там. Поэтому я не уверен, является ли это ошибкой или просто плохим кодом, который не зависит от контракта Phaser/ForkJoinPool: насколько сложно работает контракт комбинации Phaser / ForkJoinPool для предотвращения взаимоблокировок?
Мой конфиг:
- Linux adc 3.14.27-100.fc19.x86_64 # 1 SMP ср 17 дек 19:36:34 UTC 2014 x86_64 x86_64 x86_64 GNU / Linux
- 8 ядер i7
1 ответ
Похоже, ваша проблема связана с изменением кода ForkJoinPool между JDK 8u20 и 8u45.
В версии u20 потоки ForkJoin всегда были активны в течение не менее 200 миллисекунд (см. ForkJoinPool.FAST_IDLE_TIMEOUT) до их восстановления.
В u45, как только ForkJoinPool достигнет целевого параллелизма плюс 2 дополнительных потока, потоки умрут, как только у них закончится работа без ожидания. Вы можете увидеть это изменение в методе awaitWork в ForkJoinPool.java (строка 1810):
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false;
Ваша программа использует задачи Phasers для создания дополнительных работников. Каждая задача порождает нового компенсирующего работника, который предназначен для получения следующей отправленной задачи.
Однако, как только вы достигнете целевого параллелизма + 2, компенсирующий работник умирает немедленно, не дожидаясь, и не имеет возможности забрать задание, которое будет отправлено немедленно после этого.
Надеюсь, это поможет.