Создание Случая Перед Отношением с AtomicBoolean

Чтение этого кода AsyncSubscriber.java: кодер использует AtomicBoolean для создания отношений Happens Before, я хочу знать:

1_ Это эквивалентно использовать синхронизированный блок? похоже, что линии if (on.get()) не гарантирует, что блок

try {
            final Signal s = inboundSignals.poll(); // We take a signal off the queue


 if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
          // Below we simply unpack the `Signal`s and invoke the corresponding methods
          if (s instanceof OnNext<?>)
            handleOnNext(((OnNext<T>)s).next);
          else if (s instanceof OnSubscribe)
            handleOnSubscribe(((OnSubscribe)s).subscription);
          else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
            handleOnError(((OnError)s).error);
          else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
            handleOnComplete();
        }
      }

будет выполняться 1 потоком одновременно.

Действительно, когда on.get() вернуть true, что мешает другому потоку войти в критическую секцию?!

2_ Является ли это более эффективным, чем синхронизированный блок? (учитывая, что AtomicBoolean использует Volatile переменная)

здесь часть кода:

    // We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
      // obeying rule 2.7 and 2.11
      private final AtomicBoolean on = new AtomicBoolean(false);

       @SuppressWarnings("unchecked")
       @Override public final void run() {
        if(on.get()) { // establishes a happens-before relationship with the end of the previous run
          try {
            final Signal s = inboundSignals.poll(); // We take a signal off the queue
            if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
              // Below we simply unpack the `Signal`s and invoke the corresponding methods
              if (s instanceof OnNext<?>)
                handleOnNext(((OnNext<T>)s).next);
              else if (s instanceof OnSubscribe)
                handleOnSubscribe(((OnSubscribe)s).subscription);
              else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
                handleOnError(((OnError)s).error);
              else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
                handleOnComplete();
            }
          } finally {
            on.set(false); // establishes a happens-before relationship with the beginning of the next run
            if(!inboundSignals.isEmpty()) // If we still have signals to process
              tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
          }
        }
      }
// What `signal` does is that it sends signals to the `Subscription` asynchronously
  private void signal(final Signal signal) {
    if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
      tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
  }

  // This method makes sure that this `Subscriber` is only executing on one Thread at a time
  private final void tryScheduleToExecute() {
    if(on.compareAndSet(false, true)) {
      try {
        executor.execute(this);
      } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
        if (!done) {
          try {
            done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
          } finally {
            inboundSignals.clear(); // We're not going to need these anymore
            // This subscription is cancelled by now, but letting the Subscriber become schedulable again means
            // that we can drain the inboundSignals queue if anything arrives after clearing
            on.set(false);
          }
        }
      }
    }

3_ Это безопасно?

4_ Обычно ли это используется для этой цели (Создание Бывает до Отношений)?

1 ответ

Решение

Да, запись / чтение в AtomicBolean устанавливает произойдет до того, как отношения:

compareAndSet и все другие операции чтения и обновления, такие как getAndIncrement, имеют эффект памяти как чтения, так и записи изменяемых переменных.

Поскольку вы не опубликовали весь код, и мы не знаем, как именно он используется, трудно сказать, является ли он потокобезопасным или нет, но:

объявление 1. это не эквивалентно синхронизированному блоку - потоки не ждут

Объявление 2. Да, это может быть более эффективным, но CompareAndSwap не обязан поддерживать volatile переменная - это дата реализации.

объявление 3. Трудно сказать, но факт того, что run является публичным методом, который предоставляет возможность возникновения ошибок, например, если два потока будут вызывать run непосредственно когда go будет иметь значение true, С моей точки зрения было бы лучше сделать сравнение AndSwap непосредственно в run метод, но я не знаю всех требований, так что это всего лишь предложение.

Объявление 4. Да, AtomicBoolean обычно используется.

Другие вопросы по тегам