ForkJoinPool, Phaser и управляемая блокировка: в какой степени они работают против тупиков?

Этот небольшой фрагмент кода никогда не заканчивается на jdk8u45 и используется для правильного завершения на jdk8u20:

public class TestForkJoinPool {

    final static ExecutorService pool = Executors.newWorkStealingPool(8);
    private static volatile long consumedCPU = System.nanoTime();

    public static void main(String[] args) throws InterruptedException {
        final int numParties = 100;
        final Phaser p = new Phaser(1);
        final Runnable r = () -> {
            p.register();
            p.arriveAndAwaitAdvance();
            p.arriveAndDeregister();
        };

        for (int i = 0; i < numParties; ++i) {
            consumeCPU(1000000);
            pool.submit(r);
        }

        while (p.getArrivedParties() != numParties) {}
    }

    static void consumeCPU(long tokens) {
        // Taken from JMH blackhole
        long t = consumedCPU;
        for (long i = tokens; i > 0; i--) {
            t += (t * 0x5DEECE66DL + 0xBL + i) & (0xFFFFFFFFFFFFL);
        }
        if (t == 42) {
            consumedCPU += t;
        }
    }
}

Док Фазер утверждает, что

Фазеры также могут использоваться задачами, выполняющимися в ForkJoinPool, что обеспечит достаточный параллелизм для выполнения задач, когда другие блокируются в ожидании перехода фазы.

Однако в Javadoc ForkjoinPool # mangedBlock говорится:

При запуске в ForkJoinPool пул может быть сначала расширен для обеспечения достаточного параллелизма

Только мая там. Поэтому я не уверен, является ли это ошибкой или просто плохим кодом, который не зависит от контракта Phaser/ForkJoinPool: насколько сложно работает контракт комбинации Phaser / ForkJoinPool для предотвращения взаимоблокировок?


Мой конфиг:

  1. Linux adc 3.14.27-100.fc19.x86_64 # 1 SMP ср 17 дек 19:36:34 UTC 2014 x86_64 x86_64 x86_64 GNU / Linux
  2. 8 ядер i7

1 ответ

Решение

Похоже, ваша проблема связана с изменением кода ForkJoinPool между JDK 8u20 и 8u45.

В версии u20 потоки ForkJoin всегда были активны в течение не менее 200 миллисекунд (см. ForkJoinPool.FAST_IDLE_TIMEOUT) до их восстановления.

В u45, как только ForkJoinPool достигнет целевого параллелизма плюс 2 дополнительных потока, потоки умрут, как только у них закончится работа без ожидания. Вы можете увидеть это изменение в методе awaitWork в ForkJoinPool.java (строка 1810):

    int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
    if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
        return false; 

Ваша программа использует задачи Phasers для создания дополнительных работников. Каждая задача порождает нового компенсирующего работника, который предназначен для получения следующей отправленной задачи.
Однако, как только вы достигнете целевого параллелизма + 2, компенсирующий работник умирает немедленно, не дожидаясь, и не имеет возможности забрать задание, которое будет отправлено немедленно после этого.

Надеюсь, это поможет.

Другие вопросы по тегам