Использование синхронизации фазера

Общий вопрос

Хорошо известно, что Phaser можно использовать для синхронизации времени запуска всех задач, как указано в JavaDocs и этом блоге Никласа Шлимма.

Никлас нарисовал довольно понятное изображение синхронизации:

         |Phaser   |Phaser   |Phaser   |  
Task 1   | ------> | ------> | ------> | ...
Task 2   | ------> | ------> | ------> | ...
...

Теперь предполагается, что существует иерархия задач:

         |Phaser   |Phaser   |Phaser   |Phaser   |Phaser   |Phaser   |Phaser   |   ...
Master   |         |         | ------> |         |         | ------> |         |   ...
Task 1.1 | ----------------> |         | ----------------> |         | ----------> ...
Task 1.2 | ----------------> |         | ----------------> |         | ----------> ...
...      |         |         |         |         |         |         |         |   ...
Task 2.1 | ------> |         |         | ------> |         |         | ------> |   ...
Task 2.2 | ------> |         |         | ------> |         |         | ------> |   ...
...      |         |         |         |         |         |         |         |   ...
Task 3.1 |         | ------> |         |         | ------> |         |         |   ...
Task 3.2 |         | ------> |         |         | ------> |         |         |   ...
...      |         |         |         |         |         |         |         |   ...

Таким образом, дерево зависимостей выглядит так:

                      Master
           /-----------/  \-----------\
           |                        Task 2 
         Task 1                       |
           |                        Task 3
           \-----------\  /-----------/
                      Master'

В общем случае существует дерево зависимостей, которые нужно решить (скажем, в игровом конвейере, некоторые являются задачами AI / Game Logic / Render). К счастью, есть "большая" точка синхронизации, и дерево фиксировано (но не количество партий). Это тривиально решить с помощью нескольких фазеров. Но возможно ли использовать только один фазер?

Один особый случай

В частности, я сделал программу для решения следующей проблемы.

         |phasers[0]|phasers[1]|phasers[2]|phasers[0]|phasers[1]|phasers[2]| ...
Task 1   | -------> |          |          | -------> |          |          | ...
Task 2   | -------> |          |          | -------> |          |          | ...
Task 3   |          |          | -------> |          |          | -------> | ...
Task 4   |          | -------> |          |          | -------> |          | ...

Код здесь:

public class VolatileTester {

    private int a = 0, b = 0;       // change to volatile here
    private int c = 0;

    private final int TEST_COUNT = 100_000;
    private int[] testResult = new int[TEST_COUNT];

    private static void printResult(int[] result) {
        final Map<Integer, Integer> countMap = new HashMap<>();
        for (final int n : result) {
            countMap.put(n, countMap.getOrDefault(n, 0) + 1);
        }

        countMap.forEach((n, count) -> {
            System.out.format("%d -> %d%n", n, count);
        });
    }

    private void runTask1() {
        a = 5;
        b = 10;
    }

    private void runTask2() {
        if (b == 10) {
            if (a == 5) {
                c = 1;
            } else {
                c = 2;
            }
        } else {
            if (a == 5) {
                c = 3;
            } else {
                c = 4;
            }
        }
    }

    private void runTask3() {
        // "reset task"
        a = 0;
        b = 0;
        c = 0;
    }

    private static class PhaserRunner implements Runnable {
        private final Phaser loopStartPhaser;
        private final Phaser loopEndPhaser;
        private final Runnable runnable;

        public PhaserRunner(Phaser loopStartPhaser, Phaser loopEndPhaser, Runnable runnable) {
            this.loopStartPhaser = loopStartPhaser;
            this.loopEndPhaser = loopEndPhaser;
            this.runnable = runnable;
        }

        @Override
        public void run() {
            while (loopStartPhaser.arriveAndAwaitAdvance() >= 0) {
                runnable.run();
                loopEndPhaser.arrive();
            }
        }
    }

    void runTest() throws InterruptedException {
        final Phaser[] phasers = new Phaser[]{new Phaser(3), new Phaser(3), new Phaser(2)};

        final Thread[] threads = new Thread[]{
                // build tree of dependencies here
                new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask1)),
                new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask2)),
                new Thread(new PhaserRunner(phasers[2], phasers[0], this::runTask3))
        };

        try {
            for (Thread thread : threads) {
                thread.start();
            }

            phasers[0].arrive();        // phaser of last round

            for (int i = 0; i < TEST_COUNT; i++) {
                phasers[1].arriveAndAwaitAdvance();

                // Task4 here
                testResult[i] = c;

                phasers[2].arrive();
            }
        } finally {
            for (Phaser phaser : phasers) {
                phaser.forceTermination();
            }
        }

        for (Thread thread : threads) {
            thread.join();
        }

        printResult(testResult);
    }
}

Вы можете увидеть, что несколько Phasers используются. Лучше оставить несколько фазеров (как выше) или просто использовать один большой фазер? Или какие-либо другие методы синхронизации в Java рекомендуется?

2 ответа

Решение

Все задачи в вашем приложении выполняются пошагово, что означает, что одного фазера будет достаточно. Требование состоит в том, что задачи могут пропускать циклически, например, для каждых трех фаз данная задача должна проходить две фазы, а затем пропускает одну фазу (простаивает в течение одной фазы). В итоге,

  • Задача должна выполнить arriveAndAwaitAdvance() перед каждым шагом работы.
  • Задача должна просто вызвать arriveAndAwaitAdvance() пропустить фазу.

Для этого каждая задача может использовать логический массив, в примере enabled, который указывает, включен ли он для данного номера фазы. Используя модульную алгебру (enabled[phase % enabled.length]) мы можем определить циклические модели. Например, чтобы указать, что задача должна запускать один из трех тиков, мы объявляем enabled как new boolean[]{true, false, false},

Имейте в виду, что в задачах необходимо продвигать фазу независимо от того, выполняют ли они какую-либо реальную работу.

Я исправил ваш пример соответственно:

import java.util.concurrent.*;
import java.util.*;

public class VolatileTester {

    private int a = 0, b = 0;       // change to volatile here
    private int c = 0;

    private final int TEST_COUNT = 100;
    private int[] testResult = new int[TEST_COUNT];

    private static void printResult(int[] result) {
        final Map<Integer, Integer> countMap = new HashMap<>();
        for (final int n : result) {
            countMap.put(n, countMap.getOrDefault(n, 0) + 1);
        }

        countMap.forEach((n, count) -> {
            System.out.format("%d -> %d%n", n, count);
        });
    }

    private void runTask1() {
        a = 5;
        b = 10;
    }

    private void runTask2() {
        if (b == 10) {
            if (a == 5) {
                c = 1;
            } else {
                c = 2;
            }
        } else {
            if (a == 5) {
                c = 3;
            } else {
                c = 4;
            }
        }
    }

    private void runTask3() {
        // "reset task"
        a = 0;
        b = 0;
        c = 0;
    }

    private static class PhaserRunner implements Runnable {
        private final Phaser phaser;
        private final Runnable runnable;
        private boolean[] enabled;

        public PhaserRunner(Phaser phaser, boolean[] enabled, Runnable runnable) {
            this.phaser = phaser;
            this.runnable = runnable;
            this.enabled = enabled;
        }

        @Override
        public void run() {
            int phase;
            for (;;) {
                phase = phaser.arriveAndAwaitAdvance();
                if (phase < 0) {
                    break;
                } else if (enabled[phase % enabled.length]) {
                    System.out.println("I'm running: " + Thread.currentThread());
                    runnable.run();
                }
            }
        }
    }

    public void runTest() throws InterruptedException {
        final Phaser phaser = new Phaser(4);

        final Thread[] threads = new Thread[]{
                // build tree of dependencies here
                new Thread(new PhaserRunner(phaser, new boolean[]{true, false, false}, this::runTask1), "T1"),
                new Thread(new PhaserRunner(phaser, new boolean[]{false, false, true}, this::runTask2), "T2"),
                new Thread(new PhaserRunner(phaser, new boolean[]{false, true, false}, this::runTask3), "T3")
        };

        try {
            for (Thread thread : threads) {
                thread.start();
            }

            for (int i = 0; i < TEST_COUNT; i++) {
                testResult[i] = c;
                phaser.arriveAndAwaitAdvance();
            }
        } finally {
            phaser.forceTermination();
        }

        for (Thread thread : threads) {
            thread.join();
        }

        printResult(testResult);
    }

    public static void main(String[]args) throws Exception {
        new VolatileTester().runTest();
    }
}

Да, вы можете обойтись с одним Phaser, CyclicBarrier имеет Runnable barrierAction который запускается после каждого цикла. Phaser поддерживает аналогичные функции с переопределением onAdvance,

Когда прибывает последняя сторона для данной фазы, выполняется дополнительное действие, и фаза продвигается. Эти действия выполняются стороной, инициирующей продвижение фазы, и упорядочиваются методом переопределения onAdvance(int, int), который также контролирует завершение. Переопределение этого метода аналогично, но более гибко, чем предоставление барьерного действия для CyclicBarrier.

Так по сути

Phaser phaser = new Phaser() {
    protected boolean onAdvance(int phase, int parties) {
        // Signal Master thread to perform its task and wait for it to finish
    }
};
Другие вопросы по тегам