Защелка, которая может быть увеличена
Кто-нибудь знает, есть ли реализация защелки, которая делает следующее:
- есть метод для уменьшения значения защелки или ожидания, если значение равно нулю
- есть метод ожидания нулевого значения защелки
- есть метод для добавления числа к значению защелки
7 ответов
Вместо того, чтобы начинать с AQS, вы можете использовать простую реализацию, как показано ниже. Он несколько наивен (он синхронизирован с алгоритмами без блокировки AQS), но если вы не планируете использовать его в довольном сценарии, он может быть достаточно хорошим.
public class CountUpAndDownLatch {
private CountDownLatch latch;
private final Object lock = new Object();
public CountUpAndDownLatch(int count) {
this.latch = new CountDownLatch(count);
}
public void countDownOrWaitIfZero() throws InterruptedException {
synchronized(lock) {
while(latch.getCount() == 0) {
lock.wait();
}
latch.countDown();
lock.notifyAll();
}
}
public void waitUntilZero() throws InterruptedException {
synchronized(lock) {
while(latch.getCount() != 0) {
lock.wait();
}
}
}
public void countUp() { //should probably check for Integer.MAX_VALUE
synchronized(lock) {
latch = new CountDownLatch((int) latch.getCount() + 1);
lock.notifyAll();
}
}
public int getCount() {
synchronized(lock) {
return (int) latch.getCount();
}
}
}
Примечание: я не проверял это подробно, но, похоже, он ведет себя как ожидалось:
public static void main(String[] args) throws InterruptedException {
final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
Runnable up = new Runnable() {
@Override
public void run() {
try {
System.out.println("IN UP " + latch.getCount());
latch.countUp();
System.out.println("UP " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
Runnable downOrWait = new Runnable() {
@Override
public void run() {
try {
System.out.println("IN DOWN " + latch.getCount());
latch.countDownOrWaitIfZero();
System.out.println("DOWN " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
Runnable waitFor0 = new Runnable() {
@Override
public void run() {
try {
System.out.println("WAIT FOR ZERO " + latch.getCount());
latch.waitUntilZero();
System.out.println("ZERO " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
new Thread(waitFor0).start();
up.run();
downOrWait.run();
Thread.sleep(100);
downOrWait.run();
new Thread(up).start();
downOrWait.run();
}
Выход:
IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0
Вы также можете использовать Phaser (java.util.concurrent.Phaser)
final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
phaser.register(); // Equivalent to countUp
// do some work asynchronously, invoking
// phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete
Надеюсь, это поможет.
java.util.concurrent.Semaphore, кажется, отвечает всем требованиям.
- приобрести () или приобрести (п)
также приобрести () (не уверен, что я понимаю, в чем здесь разница)(*)- выпуск () или выпуск (n)
(*) Хорошо, нет встроенного метода для ожидания, пока семафор не станет недоступным. Я полагаю, вы бы написали свою собственную обертку для acquire
это делает tryAcquire
сначала, и если это не удается, запускает ваше "занятое событие" (и продолжает использовать обычный acquire
). Каждый должен был бы вызвать вашу обертку. Может быть, подкласс семафор?
Для тех, кто нуждается в решении на основе AQS, вот то, которое работает для меня:
public class CountLatch {
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int arg) {
return count.get() == releaseValue ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long releaseValue;
public CountLatch(final long initial, final long releaseValue) {
this.releaseValue = releaseValue;
this.count = new AtomicLong(initial);
this.sync = new Sync();
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public long countUp() {
final long current = count.incrementAndGet();
if (current == releaseValue) {
sync.releaseShared(0);
}
return current;
}
public long countDown() {
final long current = count.decrementAndGet();
if (current == releaseValue) {
sync.releaseShared(0);
}
return current;
}
public long getCount() {
return count.get();
}
}
Вы инициализируете синхронизатор с начальным и целевым значением. Как только будет достигнуто целевое значение (путем подсчета вверх и / или вниз), ожидающие потоки будут освобождены.
Это вариация на CounterLatch
доступно с сайта Apache.
Их версия, по причинам, наиболее известным самим себе, блокирует поток вызывающего, в то время как переменная (AtomicInteger
) по заданному значению.
Но проще всего настроить этот код, чтобы вы могли выбрать либо то, что делает версия Apache, либо... сказать: "подождите здесь, пока счетчик не достигнет определенного значения". Возможно, у последнего будет больше применимости. В моем конкретном случае я пошутил, потому что хотел проверить, что все "куски" были опубликованы в SwingWorker.process()
... но с тех пор я нашел для него другое применение.
Здесь написано на Jython, официально лучшем языке в мире (TM). Я собираюсь шуметь версию Java в свое время.
class CounterLatch():
def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
self.count = java.util.concurrent.atomic.AtomicLong( initial )
self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )
class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
def tryAcquireShared( sync_self, arg ):
if lift_on_reached:
return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
else:
return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
def tryReleaseShared( self, args ):
return True
self.sync = Sync()
self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False
def await( self, *args ):
if args:
assert len( args ) == 2
assert type( args[ 0 ] ) is int
timeout = args[ 0 ]
assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
unit = args[ 1 ]
return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
else:
self.sync.acquireSharedInterruptibly( 1 )
def count_relative( self, n ):
previous = self.count.addAndGet( n )
if previous == self.signal.get():
self.sync.releaseShared( 0 )
return previous
NB версия Apache использует ключевое слово volatile
за signal
а также released
, В Jython я не думаю, что это существует как таковое, но используя AtomicInteger
а также AtomicBoolean
Следует убедиться, что никакие значения не являются "устаревшими" в каком-либо потоке.
Пример использования:
В конструкторе SwingWorker:
self.publication_counter_latch = CounterLatch()
В SW.publish:
# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )
В SW.process:
# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )
В потоке, ожидающем остановки обработки чанка:
worker.publication_counter_latch.await()
Мне нужен был один, и я построил его, используя ту же стратегию, что и CountDownLatch, которая использует AQS (неблокирующую), этот класс также очень похож (если не точен) на класс, созданный для Apache Camel, я думаю, что он также легче, чем JDK Phaser, это будет действовать так же, как CountDownLact из JDK, он не позволит вам вести обратный отсчет ниже нуля и позволит вам считать вниз и вверх:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class CountingLatch
{
/**
* Synchronization control for CountingLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer
{
private Sync()
{
}
private Sync(final int initialState)
{
setState(initialState);
}
int getCount()
{
return getState();
}
protected int tryAcquireShared(final int acquires)
{
return getState()==0 ? 1 : -1;
}
protected boolean tryReleaseShared(final int delta)
{
// Decrement count; signal when transition to zero
for(; ; ){
final int c=getState();
final int nextc=c+delta;
if(nextc<0){
return false;
}
if(compareAndSetState(c,nextc)){
return nextc==0;
}
}
}
}
private final Sync sync;
public CountingLatch()
{
sync=new Sync();
}
public CountingLatch(final int initialCount)
{
sync=new Sync(initialCount);
}
public void increment()
{
sync.releaseShared(1);
}
public int getCount()
{
return sync.getCount();
}
public void decrement()
{
sync.releaseShared(-1);
}
public void await() throws InterruptedException
{
sync.acquireSharedInterruptibly(1);
}
public boolean await(final long timeout) throws InterruptedException
{
return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
}
}
Кажется CountDownLatch
буду делать как хочешь
CountDownLatch инициализируется с заданным количеством. Методы await блокируются до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов метода countDown(), после чего все ожидающие потоки освобождаются и любые последующие вызовы await немедленно возвращаются. Это одноразовое явление - счет не может быть сброшен. Если вам нужна версия, которая сбрасывает счет, рассмотрите возможность использования CyclicBarrier.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html