Java - одновременная очистка списка

Я пытаюсь найти хороший способ для достижения следующего API:

void add(Object o);
void processAndClear();

Класс будет хранить объекты и при вызове processAndClear будет перебирать текущие сохраненные объекты, каким-то образом обрабатывать их, а затем очищать хранилище. Этот класс должен быть потокобезопасным.

очевидный подход заключается в использовании блокировки, но я хотел быть более "параллельным". Это подход, который я бы использовал:

class Store{
    private AtomicReference<CopyOnWriteArrayList<Object>> store = new AtomicReference<>(new CopyOnWriteArrayList <>());

    void add(Object o){
        store.get().add(o);
    }

    void processAndClear(){
        CopyOnWriteArrayList<Object> objects = store.get();
        store.compareAndSet(objects, new CopyOnWriteArrayList<>());
        for (Object object : objects) {
            //do sth
        }
    }
}

Это позволило бы потокам, которые пытаются добавить объекты, работать почти немедленно, без какой-либо блокировки / ожидания завершения xlearing. Это более или менее правильный подход?

2 ответа

Решение

Ваш код выше не является потокобезопасным. Представьте себе следующее:

  1. Поток А удерживается в add() сразу после store.get()
  2. Тема B находится в processAndClear(), заменяет список, обрабатывает все элементы старого, затем возвращает.
  3. Тема A возобновляет работу и добавляет новый элемент в устаревший список, который никогда не будет обработан.

Вероятно, самое простое решение здесь - использовать LinkedBlockingQueue, что также значительно упростит задачу:

class Store{
  final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();

  void add(final Object o){
    queue.put(o); // blocks until there is free space in the optionally bounded queue
  }

  void processAndClear(){
    Object element;
    while ((element = queue.poll()) != null) { // does not block on empty list but returns null instead
      doSomething(element);
    }
  }
}

Редактировать: Как это сделать с synchronized:

class Store{
  final LinkedList<Object> queue = new LinkedList<>(); // has to be final for synchronized to work

  void add(final Object o){
    synchronized(queue) { // on the queue as this is the shared object in question
      queue.add(o);
    }
  }

  void processAndClear() {
    final LinkedList<Object> elements = new LinkedList<>(); // temporary local list
    synchronized(queue) { // here as well, as every access needs to be properly synchronized
      elements.addAll(queue);
      queue.clear();
    }

    for (Object e : elements) { 
      doSomething(e); // this is thread-safe as only this thread can access these now local elements
    }
  }
}

Почему это не очень хорошая идея

Хотя это поточно-ориентированный процесс, он намного медленнее по сравнению с параллельной версией. Предположим, что у вас есть система с 100 потоками, которые часто вызывают addпока один поток вызывает processAndClear, Тогда будут возникать следующие узкие места производительности:

  • Если один поток вызывает add остальные 99 тем временем приостановлены.
  • Во время первой части processAndClear все 100 потоков приостановлены.

Если вы предполагаете, что этим 100 потокам добавления больше нечего делать, вы можете легко показать, что приложение работает с той же скоростью, что и однопоточное приложение, за вычетом затрат на синхронизацию. Это означает, что при 100 потоках добавление будет выполняться медленнее, чем при 1. Это не тот случай, если вы используете параллельный список, как в первом примере.

Тем не менее, будет незначительный прирост производительности с потоком обработки, так как doSomething можно запустить на старых элементах, пока добавляются новые. Но опять-таки параллельный пример может быть быстрее, так как вы можете одновременно обрабатывать несколько потоков.

фактически synchronized может также использоваться, но вы автоматически введете узкие места в производительности, что может привести к тому, что приложение будет работать медленнее, чем однопоточное, что заставит вас выполнять сложные тесты производительности. Кроме того, расширение функциональности всегда сопряжено с риском возникновения проблем с многопоточностью, поскольку блокировку необходимо выполнять вручную.
Параллельный список, напротив, решает все эти проблемы без дополнительного кода, и код можно легко изменить или расширить позже.

Класс будет хранить объекты и при вызове processAndClear будет перебирать текущие сохраненные объекты, каким-то образом обрабатывать их, а затем очищать хранилище.

Кажется, вы должны использовать BlockingQueue для этой задачи. Ваш add(...) метод добавит в очередь, и ваш потребитель будет вызывать take() который блокирует ожидание следующего элемента. BlockingQueue (ArrayBlockingQueue это типичная реализация) заботится обо всей синхронизации и сигнализации для вас.

Это означает, что вам не нужно иметь CopyOnWriteArrayList ни AtomicReference, То, что вы потеряли бы, - это коллекция, и вы можете выполнять итерацию по другим причинам, отличным от того, что написано в вашей публикации.

Другие вопросы по тегам