Простой сценарий с использованием wait() и notify() в Java
Могу ли я получить полный простой сценарий, то есть учебник, который предлагает, как это следует использовать, особенно с очередью?
7 ответов
wait()
а также notify()
методы предназначены для обеспечения механизма, позволяющего потоку блокироваться, пока не будет выполнено определенное условие. Для этого я предполагаю, что вы хотите написать реализацию очереди блокировки, где у вас есть резервное хранилище элементов фиксированного размера.
Первое, что вам нужно сделать, это определить условия, которые вы хотите, чтобы методы ждали. В этом случае вам понадобится put()
метод блокировки, пока в магазине не будет свободного места, и вы захотите take()
метод для блокировки до тех пор, пока не будет элемента для возврата.
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
while(queue.size() == capacity) {
wait();
}
queue.add(element);
notify(); // notifyAll() for multiple producer/consumer threads
}
public synchronized T take() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
T item = queue.remove();
notify(); // notifyAll() for multiple producer/consumer threads
return item;
}
}
Следует отметить несколько моментов, касающихся того, как вы должны использовать механизмы ожидания и уведомления.
Во-первых, вы должны убедиться, что любые звонки на wait()
или же notify()
находятся в синхронизированной области кода (с wait()
а также notify()
синхронизация вызовов на одном и том же объекте). Причина этого (кроме стандартных проблем безопасности потока) связана с тем, что известно как пропущенный сигнал.
Примером этого является то, что поток может вызвать put()
когда очередь переполняется, она проверяет условие и видит, что очередь переполнена, однако, прежде чем она сможет заблокировать, запланирован другой поток. Эта вторая тема тогда take()
's элемент из очереди, и уведомляет ожидающие потоки, что очередь больше не заполнена. Поскольку первый поток уже проверил условие, он просто вызовет wait()
после перепланировки, хотя это может сделать успехи.
Синхронизируя на общем объекте, вы можете гарантировать, что эта проблема не возникает, так как второй поток take()
Вызов не сможет добиться прогресса, пока первый поток фактически не заблокирован.
Во-вторых, вам нужно поместить условие, которое вы проверяете, в цикл while, а не в оператор if, из-за проблемы, известной как ложные пробуждения. Это где ожидающий поток иногда может быть повторно активирован без notify()
будучи призванным Помещение этой проверки в цикл while гарантирует, что в случае ложного пробуждения условие будет перепроверено, и поток вызовет wait()
снова.
Как уже упоминалось в некоторых других ответах, Java 1.5 представила новую библиотеку параллелизма (в java.util.concurrent
пакет), который был разработан для обеспечения более высокого уровня абстракции по механизму ожидания / уведомления. Используя эти новые функции, вы можете переписать исходный пример следующим образом:
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public void put(T element) throws InterruptedException {
lock.lock();
try {
while(queue.size() == capacity) {
notFull.await();
}
queue.add(element);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while(queue.isEmpty()) {
notEmpty.await();
}
T item = queue.remove();
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
Конечно, если вам действительно нужна очередь блокировки, вам следует использовать реализацию интерфейса BlockingQueue.
Кроме того, для подобных вещей я настоятельно рекомендую Java Concurrency in Practice, поскольку он охватывает все, что вы хотели бы знать о проблемах и решениях, связанных с параллелизмом.
Не пример очереди, но очень просто:)
class MyHouse {
private boolean pizzaArrived = false;
public void eatPizza(){
synchronized(this){
while(!pizzaArrived){
wait();
}
}
System.out.println("yumyum..");
}
public void pizzaGuy(){
synchronized(this){
this.pizzaArrived = true;
notifyAll();
}
}
}
Некоторые важные моменты:
1) НИКОГДА не делай
if(!pizzaArrived){
wait();
}
Всегда используйте while(условие), потому что
- а) потоки могут время от времени выходить из состояния ожидания, не будучи никем уведомленными. (даже когда парень пиццы не звонил в колокольчик, кто-то решал попробовать съесть пиццу.)
- б) Вы должны проверить состояние снова после получения синхронизированной блокировки. Допустим, пицца не вечна. Вы просыпаетесь, очередь за пиццей, но этого недостаточно для всех. Если вы не проверяете, вы можете съесть бумагу!:)
(вероятно, лучшим примером будет
while(!pizzaExists){ wait(); }
,
2) Вы должны удерживать блокировку (синхронизированную) перед вызовом wait / nofity. Потоки также должны получить блокировку перед пробуждением.
3) Старайтесь избегать получения какой-либо блокировки в вашем синхронизированном блоке и старайтесь не вызывать инопланетные методы (методы, которые вы точно не знаете, что они делают). Если вам нужно, обязательно примите меры, чтобы избежать тупиков.
4) Будьте осторожны с уведомлением (). Придерживайтесь notifyAll (), пока не узнаете, что делаете.
5) И последнее, но не менее важное: прочитайте Java Concurrency на практике!
Даже если вы просили wait()
а также notify()
в частности, я чувствую, что эта цитата все еще достаточно важна:
Джош Блох, Effective Java 2nd Edition, Item 69: Предпочитают утилиты параллелизма wait
а также notify
(акцент его):
Учитывая сложность использования
wait
а такжеnotify
правильно, вы должны использовать высокоуровневые утилиты параллелизма вместо [...] использованияwait
а такжеnotify
напрямую напоминает программирование на "языке ассемблера параллелизма" по сравнению с языком более высокого уровня, предоставляемымjava.util.concurrent
, Существует редко, если вообще, причина использоватьwait
а такжеnotify
в новом коде.
Вы смотрели на этот учебник Java?
Кроме того, я бы посоветовал вам держаться подальше от игры с такими вещами в реальном программном обеспечении. Хорошо играть с ним, чтобы вы знали, что это такое, но параллелизм повсеместен. Лучше использовать абстракции более высокого уровня и синхронизированные коллекции или очереди JMS, если вы создаете программное обеспечение для других людей.
Это по крайней мере то, что я делаю. Я не эксперт по параллелизму, поэтому я стараюсь избегать обработки потоков вручную везде, где это возможно.
Пример
public class myThread extends Thread{
@override
public void run(){
while(true){
threadCondWait();// Circle waiting...
//bla bla bla bla
}
}
public synchronized void threadCondWait(){
while(myCondition){
wait();//Comminucate with notify()
}
}
}
public class myAnotherThread extends Thread{
@override
public void run(){
//Bla Bla bla
notify();//Trigger wait() Next Step
}
}
Вопрос запрашивает wait() + notify() с участием очереди (буфера). Первое, что приходит в голову, - это сценарий производитель-потребитель с использованием буфера.
Три компонента в нашей системе:
- Очередь [буфер] - очередь фиксированного размера, совместно используемая между потоками.
- Производитель - поток создает / вставляет значения в буфер
- Потребитель - поток потребляет / удаляет значения из буфера
PRODUCER THREAD: производитель вставляет значения в буфер до тех пор, пока буфер не заполнится. Если буфер заполнен, производитель вызывает wait() и переходит в стадию ожидания, пока потребитель не разбудит его.
static class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Producer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
if (queue.size() == maxSize) {
try {
System.out.println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println(" ^^^ Producing value : " + i);
queue.add(i);
queue.notify();
}
sleepRandom();
}
}
}
ПОТРЕБИТЕЛЬСКАЯ НИТЬ: Потребительский поток удаляет значение из буфера до тех пор, пока буфер не станет пустым. Если буфер пуст, потребитель вызывает метод wait() и входит в состояние ожидания, пока производитель не отправит сигнал уведомления.
static class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Consumer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
Random random = new Random();
while (true) {
synchronized (queue) {
if (queue.isEmpty()) {
System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue");
try {
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println(" vvv Consuming value : " + queue.remove());
queue.notify();
}
sleepRandom();
}
}
}
ИСПОЛЬЗУЕМЫЙ МЕТОД:
public static void sleepRandom(){
Random random = new Random();
try {
Thread.sleep(random.nextInt(250));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Код приложения:
public static void main(String args[]) {
System.out.println("How to use wait and notify method in Java");
System.out.println("Solving Producer Consumper Problem");
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start();
consumer.start();
}
Пример вывода:
^^^ Producing value : 1268801606
vvv Consuming value : 1268801606
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
^^^ Producing value : -191710046
vvv Consuming value : -191710046
^^^ Producing value : -1096119803
vvv Consuming value : -1096119803
^^^ Producing value : -1502054254
vvv Consuming value : -1502054254
Queue is empty,Consumer thread is waiting for producer thread to put something in queue
^^^ Producing value : 408960851
vvv Consuming value : 408960851
^^^ Producing value : 2140469519
vvv Consuming value : 65361724
^^^ Producing value : 1844915867
^^^ Producing value : 1551384069
^^^ Producing value : -2112162412
vvv Consuming value : -887946831
vvv Consuming value : 1427122528
^^^ Producing value : -181736500
^^^ Producing value : -1603239584
^^^ Producing value : 175404355
vvv Consuming value : 1356483172
^^^ Producing value : -1505603127
vvv Consuming value : 267333829
^^^ Producing value : 1986055041
Queue is full, Producer thread waiting for consumer to take something from queue
vvv Consuming value : -1289385327
^^^ Producing value : 58340504
vvv Consuming value : 1244183136
^^^ Producing value : 1582191907
Queue is full, Producer thread waiting for consumer to take something from queue
vvv Consuming value : 1401174346
^^^ Producing value : 1617821198
vvv Consuming value : -1827889861
vvv Consuming value : 2098088641
Пример для wait() и notifyall() в Threading.
Синхронизированный список статических массивов используется в качестве ресурса, и метод wait() вызывается, если список массивов пуст. Метод notify() вызывается после добавления элемента в список массивов.
public class PrinterResource extends Thread{
//resource
public static List<String> arrayList = new ArrayList<String>();
public void addElement(String a){
//System.out.println("Add element method "+this.getName());
synchronized (arrayList) {
arrayList.add(a);
arrayList.notifyAll();
}
}
public void removeElement(){
//System.out.println("Remove element method "+this.getName());
synchronized (arrayList) {
if(arrayList.size() == 0){
try {
arrayList.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
arrayList.remove(0);
}
}
}
public void run(){
System.out.println("Thread name -- "+this.getName());
if(!this.getName().equalsIgnoreCase("p4")){
this.removeElement();
}
this.addElement("threads");
}
public static void main(String[] args) {
PrinterResource p1 = new PrinterResource();
p1.setName("p1");
p1.start();
PrinterResource p2 = new PrinterResource();
p2.setName("p2");
p2.start();
PrinterResource p3 = new PrinterResource();
p3.setName("p3");
p3.start();
PrinterResource p4 = new PrinterResource();
p4.setName("p4");
p4.start();
try{
p1.join();
p2.join();
p3.join();
p4.join();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("Final size of arraylist "+arrayList.size());
}
}