Создание Случая Перед Отношением с AtomicBoolean
Чтение этого кода AsyncSubscriber.java: кодер использует AtomicBoolean для создания отношений Happens Before, я хочу знать:
1_ Это эквивалентно использовать синхронизированный блок? похоже, что линии if (on.get())
не гарантирует, что блок
try {
final Signal s = inboundSignals.poll(); // We take a signal off the queue
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof OnNext<?>)
handleOnNext(((OnNext<T>)s).next);
else if (s instanceof OnSubscribe)
handleOnSubscribe(((OnSubscribe)s).subscription);
else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
handleOnError(((OnError)s).error);
else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
handleOnComplete();
}
}
будет выполняться 1 потоком одновременно.
Действительно, когда on.get()
вернуть true, что мешает другому потоку войти в критическую секцию?!
2_ Является ли это более эффективным, чем синхронизированный блок? (учитывая, что AtomicBoolean использует Volatile
переменная)
здесь часть кода:
// We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
// obeying rule 2.7 and 2.11
private final AtomicBoolean on = new AtomicBoolean(false);
@SuppressWarnings("unchecked")
@Override public final void run() {
if(on.get()) { // establishes a happens-before relationship with the end of the previous run
try {
final Signal s = inboundSignals.poll(); // We take a signal off the queue
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof OnNext<?>)
handleOnNext(((OnNext<T>)s).next);
else if (s instanceof OnSubscribe)
handleOnSubscribe(((OnSubscribe)s).subscription);
else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
handleOnError(((OnError)s).error);
else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
handleOnComplete();
}
} finally {
on.set(false); // establishes a happens-before relationship with the beginning of the next run
if(!inboundSignals.isEmpty()) // If we still have signals to process
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
}
}
}
// What `signal` does is that it sends signals to the `Subscription` asynchronously
private void signal(final Signal signal) {
if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
}
// This method makes sure that this `Subscriber` is only executing on one Thread at a time
private final void tryScheduleToExecute() {
if(on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
if (!done) {
try {
done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
} finally {
inboundSignals.clear(); // We're not going to need these anymore
// This subscription is cancelled by now, but letting the Subscriber become schedulable again means
// that we can drain the inboundSignals queue if anything arrives after clearing
on.set(false);
}
}
}
}
3_ Это безопасно?
4_ Обычно ли это используется для этой цели (Создание Бывает до Отношений)?
1 ответ
Да, запись / чтение в AtomicBolean устанавливает произойдет до того, как отношения:
compareAndSet и все другие операции чтения и обновления, такие как getAndIncrement, имеют эффект памяти как чтения, так и записи изменяемых переменных.
Поскольку вы не опубликовали весь код, и мы не знаем, как именно он используется, трудно сказать, является ли он потокобезопасным или нет, но:
объявление 1. это не эквивалентно синхронизированному блоку - потоки не ждут
Объявление 2. Да, это может быть более эффективным, но CompareAndSwap не обязан поддерживать volatile
переменная - это дата реализации.
объявление 3. Трудно сказать, но факт того, что run
является публичным методом, который предоставляет возможность возникновения ошибок, например, если два потока будут вызывать run
непосредственно когда go
будет иметь значение true
, С моей точки зрения было бы лучше сделать сравнение AndSwap непосредственно в run
метод, но я не знаю всех требований, так что это всего лишь предложение.
Объявление 4. Да, AtomicBoolean обычно используется.