Java - одновременная очистка списка
Я пытаюсь найти хороший способ для достижения следующего API:
void add(Object o);
void processAndClear();
Класс будет хранить объекты и при вызове processAndClear будет перебирать текущие сохраненные объекты, каким-то образом обрабатывать их, а затем очищать хранилище. Этот класс должен быть потокобезопасным.
очевидный подход заключается в использовании блокировки, но я хотел быть более "параллельным". Это подход, который я бы использовал:
class Store{
private AtomicReference<CopyOnWriteArrayList<Object>> store = new AtomicReference<>(new CopyOnWriteArrayList <>());
void add(Object o){
store.get().add(o);
}
void processAndClear(){
CopyOnWriteArrayList<Object> objects = store.get();
store.compareAndSet(objects, new CopyOnWriteArrayList<>());
for (Object object : objects) {
//do sth
}
}
}
Это позволило бы потокам, которые пытаются добавить объекты, работать почти немедленно, без какой-либо блокировки / ожидания завершения xlearing. Это более или менее правильный подход?
2 ответа
Ваш код выше не является потокобезопасным. Представьте себе следующее:
- Поток А удерживается в
add()
сразу послеstore.get()
- Тема B находится в
processAndClear()
, заменяет список, обрабатывает все элементы старого, затем возвращает. - Тема A возобновляет работу и добавляет новый элемент в устаревший список, который никогда не будет обработан.
Вероятно, самое простое решение здесь - использовать LinkedBlockingQueue, что также значительно упростит задачу:
class Store{
final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
void add(final Object o){
queue.put(o); // blocks until there is free space in the optionally bounded queue
}
void processAndClear(){
Object element;
while ((element = queue.poll()) != null) { // does not block on empty list but returns null instead
doSomething(element);
}
}
}
Редактировать: Как это сделать с synchronized
:
class Store{
final LinkedList<Object> queue = new LinkedList<>(); // has to be final for synchronized to work
void add(final Object o){
synchronized(queue) { // on the queue as this is the shared object in question
queue.add(o);
}
}
void processAndClear() {
final LinkedList<Object> elements = new LinkedList<>(); // temporary local list
synchronized(queue) { // here as well, as every access needs to be properly synchronized
elements.addAll(queue);
queue.clear();
}
for (Object e : elements) {
doSomething(e); // this is thread-safe as only this thread can access these now local elements
}
}
}
Почему это не очень хорошая идея
Хотя это поточно-ориентированный процесс, он намного медленнее по сравнению с параллельной версией. Предположим, что у вас есть система с 100 потоками, которые часто вызывают add
пока один поток вызывает processAndClear
, Тогда будут возникать следующие узкие места производительности:
- Если один поток вызывает
add
остальные 99 тем временем приостановлены. - Во время первой части
processAndClear
все 100 потоков приостановлены.
Если вы предполагаете, что этим 100 потокам добавления больше нечего делать, вы можете легко показать, что приложение работает с той же скоростью, что и однопоточное приложение, за вычетом затрат на синхронизацию. Это означает, что при 100 потоках добавление будет выполняться медленнее, чем при 1. Это не тот случай, если вы используете параллельный список, как в первом примере.
Тем не менее, будет незначительный прирост производительности с потоком обработки, так как doSomething
можно запустить на старых элементах, пока добавляются новые. Но опять-таки параллельный пример может быть быстрее, так как вы можете одновременно обрабатывать несколько потоков.
фактически synchronized
может также использоваться, но вы автоматически введете узкие места в производительности, что может привести к тому, что приложение будет работать медленнее, чем однопоточное, что заставит вас выполнять сложные тесты производительности. Кроме того, расширение функциональности всегда сопряжено с риском возникновения проблем с многопоточностью, поскольку блокировку необходимо выполнять вручную.
Параллельный список, напротив, решает все эти проблемы без дополнительного кода, и код можно легко изменить или расширить позже.
Класс будет хранить объекты и при вызове processAndClear будет перебирать текущие сохраненные объекты, каким-то образом обрабатывать их, а затем очищать хранилище.
Кажется, вы должны использовать BlockingQueue
для этой задачи. Ваш add(...)
метод добавит в очередь, и ваш потребитель будет вызывать take()
который блокирует ожидание следующего элемента. BlockingQueue
(ArrayBlockingQueue
это типичная реализация) заботится обо всей синхронизации и сигнализации для вас.
Это означает, что вам не нужно иметь CopyOnWriteArrayList
ни AtomicReference
, То, что вы потеряли бы, - это коллекция, и вы можете выполнять итерацию по другим причинам, отличным от того, что написано в вашей публикации.