Параллельная коллекция происходит до отношений
Сейчас я изучаю параллелизм, и я попытался написать программу, которая должна демонстрировать отношения "до и при использовании" при одновременном сборе. Как указано в пакете java.concurrent:
Методы всех классов в java.util.concurrent и его подпакетах расширяют эти гарантии для синхронизации более высокого уровня. В частности: Действия в потоке до помещения объекта в любую параллельную коллекцию выполняются до выполнения действий после доступа или удаления этого элемента из коллекции в другом потоке.
Я написал следующий класс:
import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class HappensBefore {
static int checks = 0;
static int shouldNotHappen = 0;
static Deque<Integer> syncList = new LinkedBlockingDeque<>();
public static boolean varToHappenBefore = false; //this var must be false when new element added to collection
static AtomicBoolean flag = new AtomicBoolean();
static class SyncTask implements Runnable {
int localTemp = -1;
private final Thread t = new Thread(new Counter());
@Override
public void run() {
t.start();
while (syncList.isEmpty()) { //just skip
}
while (true) {
if (!Thread.interrupted()) {
if (flag.get()) {
int r = syncList.peekLast();
if (r != localTemp) {
if (varToHappenBefore) {
shouldNotHappen++;
}
varToHappenBefore = true;
localTemp = r;
checks++;
}
flag.set(false);
}
} else {
t.interrupt();
break;
}
}
}
}
static class Counter implements Runnable {
int ctr = 0;
@Override
public void run() {
while (!Thread.interrupted()) {
if (!flag.get()) {
flag.set(true);
varToHappenBefore = false;
syncList.add(ctr++);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
SyncTask st = new SyncTask();
Thread s = new Thread(st);
s.start();
Thread.sleep(10000);//runtime ms
s.interrupt();
// s1.interrupt();
System.out.println("Elems times added: " + checks);
System.out
.println("Happens-before violated times: " + shouldNotHappen);
}
}
То, что я делаю, запускает thread1, который запускает thread2. Thread1 проверяет общедоступный логический тип varToHappenBefore, для которого изначально установлено значение false. Когда thread1 сохраняет новый элемент из коллекции, он устанавливает для этого логического значения значение true. На следующем новом элем. получение, если этот логический аргумент все еще имеет значение true, произойдет до того, как имело место нарушение и должен быть увеличен notHappen.
Thread1 проверяет, есть ли в параллельной коллекции новый элемент, если это так, сохраняет его в temp var и увеличивает общий счетчик проверок. Затем он переключает атомное логическое значение, чтобы позволить thread2 добавить новый элемент. Перед добавлением нового элемента для varToHappenBefore устанавливается значение false. Из-за атомарного логического флага, thread2 не выполняет код, прежде чем делает thread1. Но переключение флага в thread2 выполняется перед добавлением элемента и проверкой varToHappenBefore, потому что эти две операции (elem add и boolean toggle) синхронизируются с помощью атомарного логического значения в противном случае. Я гарантирую, что thread2 запускается только один раз после запуска thread1. И если varToHappenBefore произойдет до добавления элем. в коллекцию в потоке 2, которая затем читается в потоке 1(поток 1 проверяет varToHappenBefore, только если новый элемент читается из коллекции), тогда varToHappenBefore должен оставаться равным 0 после завершения программы. Но я получаю следующие результаты:
Добавлено раз: ~10 000 000
Бывает-раньше нарушено раз: 0-10
Вероятно, я делаю что-то не так, многопоточность тонкая и сложная. Надеюсь на вашу помощь.
Изменить: мне нужно выйти из ситуации, когда thread1 setFlag(false)
сразу после истины и до получения элем. из коллекции, потому что тогда thread2 может работать между получением элемента из коллекций и настройкой varToHappenBefore = true;
в теме 1. Если я сделаю AtomicBoolean.compareAndSet()
проверяя блок, я получаю 80 тысяч неудач за 8 мил. И это предсказуемо и понятно. Но когда я не делаю такого окна для thread2, чтобы добавить дополнительные элементы между чтением из коллекции и установкой логического значения, я все равно получаю несколько последовательных чтений, когда логическое значение равно true и появляется новый элемент.
3 ответа
Ваш SyncTask
рассеется flag
даже если он обнаружит, что список еще не был изменен. Таким образом, вы можете просто пропустить события добавления списка. Это делает ваши предположения неверными:
- Следующий раунд после пропущенного события,
SyncTask
список находок изменен, предполагаетсяCounter
завершил свою задачу, проверки (успешно) и задним ходомvarToHappenBefore
, НоCounter
фактически добавляет элемент в список после этого. - Следующий раунд
SyncTask
список оснований снова изменился, он предполагаетCounter
выполнил свою задачу, проверяетvarToHappenBefore
и находит, что это не очищено. Это только потому, чтоCounter
еще не очистил это.
Просто двигайся flag.set(false);
в if (r != localTemp)
блок, и все будет работать.
Ваш пример немного сложен для анализа и исправления. Но если вы просто делаете это, чтобы понять контракт "происходит до" в случае одновременных сборов, попробуйте эту программу -
public class HappensBeforeTest {
private static boolean producerStopped;
private static final Queue<String> queue = new LinkedBlockingQueue<>();
//private static final Queue<String> queue = new LinkedList<>();
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
private static class Producer implements Runnable {
public void run() {
int count = 0;
while (count++ < 10) {
producerStopped = (count == 10);
queue.add(String.valueOf(count));
}
System.out.println("Producer finished");
}
}
private static class Consumer implements Runnable {
public void run() {
while (!producerStopped) {
queue.poll();
}
System.out.println("Consumer finished");
}
}
}
когда queue
является LinkedBlockingQueue
обе темы Producer
а также Consumer
Конец.
Но когда queue
является LinkedList
иногда Consumer
не заканчивается и продолжает работать, потому что producerStopped
флаг все еще виден как ложный для Consumer
даже если Producer
обновил его до истины.
Самая чистая демонстрация была бы очень простой. Пусть поток А создаст экземпляр некоторого изменяемого класса, обновит его и put(...)
это в очередь. Затем попросите поток B удалить экземпляр из очереди и убедиться, что состояние объекта соответствует ожидаемому.