Phaser - как использовать его как CountDownLatch(1)?

Я понимаю, я мог бы просто использовать CountDownLatch непосредственно, однако, в качестве упражнения и понять Phaser лучше, я хотел бы использовать его вместо COuntDownLatch,

Таким образом, я бы создал N число ожидающих и один поток, который должен щелкнуть защелкой. Все ожидающие, если прибывают до переворота, блокируются, однако после обратного отсчета защелки все последующие await() возвращается мгновенно.

С Phaser Я не уверен, как этого добиться... Барьер - это легко, поскольку у нас есть N + 1 потоков, каждый из которых прибывает и ожидает. Тем не менее, чтобы убедиться, что ни один поток не будет ждать после первого этапа, как-то ускользает от меня.

Единственный способ, которым я мог придумать, что нехорошо, заключается в следующем:

Phaser p = new Phaser(1);
int phase = p.getPhase();
....
// thread that awaits()
new Thread(new Runnable() {
    ....
    p.awaitAdvance(phase)
}

А другой поток просто продвигает фазер к следующему этапу. Это не идеально, поэтому любые указатели будут оценены.

1 ответ

TL;DR: в этом контексте использовать Phaser.arriveAndDeregister() для неблокирующего сигнала официанту фазера, что соответствует работе CountDownLatch.countDown() ,

Phaser а также CountDownLatch , Первое, что нужно уточнить, это то, что Phaser не может вообще кодировать CountDownLatch, Задачи, синхронизирующиеся с Phaser все должны ждать друг друга (синхронизация "все ко всем"). В CountDownLatch, есть группа задач, которая ожидает какую-то другую задачу, чтобы открыть защелку.

Phaser а также CyclicBarrier , Оба эти механизма используются для всеобщей синхронизации. Два различия между ними: 1) Phaser набор задач с использованием фазера может расти в течение жизненного цикла фазера, тогда как в CyclicBarrier количество участников является фиксированным; 2) с Phasers задача может уведомлять других участников (участников) и не ждать, пока она отменяет регистрацию из этого фазера, тогда как все задачи, использующие циклический барьер, могут только ждать и уведомлять.

Кодирование CountDownLatch с Phaser , Для кодирования CountDownLatch(1) с помощью фазера необходимо помнить следующее:

  1. Количество сторон = количество официантов + 1: Количество зарегистрированных сторон, либо через new Phaser(PARTIES_COUNT) или через Phaser.register,
  2. CountDown.await() знак равно Phaser.arriveAndAwaitAdvance()
  3. CountDown.countDown() знак равно Phaser.arriveAndDeregister()

Пример. Предположим, вы хотите, чтобы дочерняя задача ожидала сигнала родительской задачи. С помощью CountDownLatch вы бы написали:

import java.util.concurrent.*;

class CountDownExample {
    public static void main(String[] args) throws Exception {
        CountDownLatch l = new CountDownLatch(1);
        new Thread(() -> {
            try {
                l.await();
                System.out.println("Child: running");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
        System.out.println("Parent: waiting a bit.");
        Thread.sleep(100);
        l.countDown();
    }
}

Используя Phaser вы бы написали:

import java.util.concurrent.*;

class PhaserExample {
    public static void main(String[] args) throws Exception {
        Phaser ph = new Phaser(2); // 2 parties = 1 signaler and 1 waiter
        new Thread(() -> {
            ph.arriveAndAwaitAdvance();
            System.out.println("Child: running");
        }).start();
        System.out.println("Parent: waiting a bit.");
        Thread.sleep(100);
        ph.arriveAndDeregister();
    }
}

Возможно, вы захотите посмотреть на этот пост для другого примера.

Я не согласен с ответом @tiago-cogumbreiro. По факту, Phaser может использоваться для моделирования поведения CountDownLatch.

Вы можете добиться этого, используя Phaser arrive()метод. Он не будет ждать прибытия других потоков. Следовательно, CountDownLatch поведение может быть достигнуто.

Phaser's arriveAndAwaitAdvance()заставляет поток, в котором он вызывается, ждать, пока не прибудут другие потоки / стороны. Следовательно, этот метод можно использовать для моделирования поведения CyclicBarrier.

Ниже приведен исходный код, имитирующий поведение CountDownLatch(3) с помощью Phaser. Если вы используете только один WorkerThread, вы можете смоделировать CountDownLatch(1)

Если вы замените, arrive() метод с arriveAndAwaitAdvance() метод, ниже код будет имитировать CyclicBarrier.

public class PhaserAsCountDownLatch {

    static Phaser phaser = new Phaser(3);

    public static void main(String[] args) {
     
        new WorkerThread2().start();
        new WorkerThread3().start();
        new WorkerThread().start();
        
        //waits till phase 0 is completed and phase is advanced to next.
        phaser.awaitAdvance(0); 
        
        System.out.println("\nFROM MAIN THREAD: PHASE 0 COMPLETED");
        System.out.println("FROM MAIN THREAD: PHASE ADVANCED TO 1");
        System.out.println("MAIN THREAD ENDS HERE\n");
    }

    static class WorkerThread extends Thread{
        @Override
        public void run() {
            for (int i = 1; i < 5; i++) {
                System.out.println("tid: " + Thread.currentThread().getId() +  ", BEFORE ARRIVING val is: " + i);
            }
        
            phaser.arrive();
        
            System.out.println("ARRIVED tid: " + Thread.currentThread().getId());
        }
    }

    static class WorkerThread2 extends Thread{
        @Override
        public void run() {
        
            //won't wait for other threads to arrive. Hence, CountDownLatch behaviour can be achieved
            phaser.arrive();
        
            System.out.println("ARRIVED tid: " + Thread.currentThread().getId());
            for (int i = 200; i < 231; i++) {
                System.out.println("tid: " + Thread.currentThread().getId() +  " AFTER ARRIVING. val is: " + i);
            }
        }
    }

    static class WorkerThread3 extends Thread{
        @Override
        public void run() {
        
            //won't wait for other threads to arrive. Hence, CountDownLatch behaviour can be achieved
            phaser.arrive();
        
            System.out.println("ARRIVED tid: " + Thread.currentThread().getId());
            for (int i = 300; i < 331; i++) {
                System.out.println("tid: " + Thread.currentThread().getId() +  " AFTER ARRIVING. val is: " + i);
            }
        }
    }
}
Другие вопросы по тегам