Потребитель, производитель-мьютекс, синхронно-критическая секция
Я попробовал типичный пример многопоточности, после чего я хотел бы попробовать типичную проблему производитель-потребитель.
(Производитель может производить, если есть место, а также, если потребитель не потребляет, и наоборот)
Но у меня есть проблема с shared-ресурсами, в Java что-то вроде семафоров в C? (использование с функциями ожидания и отправки)
- Я нашел синхронизированный пример
- но я хотел бы попробовать что-то, что я могу контролировать вручную (например, семафоры в C)
Я имею:
class MyThread implements Runnable
- базовый класс для моих темclass Producer extends MyThread
- продюсерская нитьclass Consumer extends MyThread
- потребительский классclass ThreadContainer
- общие ресурсы (акции)
В ThreadContainer
Я подготовил некоторый замок, который я нашел и попробовал, но он не работает как следует:
java.lang.IllegalMonitorStateException
at java.lang.Object.notify(Native Method)Running Consumer 0 [1]
java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Unknown Source)
(etc.)
Я буду благодарен, если кто-нибудь объяснит мне, "как".
Классы ниже
MyThread:
public class MyThread implements Runnable {
private Thread t;
private String threadName;
private ThreadContainer container;
MyThread(String name, ThreadContainer cont) {
threadName = name;
this.container = cont;
System.out.println("Creating " + threadName);
}
public void run() {
}
public void start() {
System.out.println("Starting " + threadName);
if (t == null) {
t = new Thread(this, threadName);
t.start();
}
}
public ThreadContainer getContainer() {
return container;
}
public String getThreadName() {
return threadName;
}
}
Режиссер:
public class Producer extends MyThread {
Producer(String name, ThreadContainer cont) {
super(name, cont);
}
public void produce(int amount) {
super.getContainer().produce(amount);
}
@Override
public void run() {
System.out.println("Running " + super.getThreadName());
try {
for (int i = 10; i > 0; i--) {
synchronized (super.getContainer().lock) {
System.out.println(super.getThreadName()
+ " want to produce: " + i);
while (!super.getContainer().canProduce(i)) {
super.getContainer().lock.wait();
}
System.out.println(super.getThreadName() + " producing: "
+ i);
super.getContainer().produce(i);
System.out.println("Container state: "
+ super.getContainer());
}
}
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
System.out.println("Thread " + super.getThreadName() + " exiting.");
}
}
Потребитель:
public class Consumer extends MyThread {
Consumer(String name, ThreadContainer cont) {
super(name, cont);
}
public void consume(int am) {
super.getContainer().consume(am);
}
@Override
public void run() {
System.out.println("Running " + super.getThreadName());
try {
for (int i = 10; i > 0; i--) {
synchronized (super.getContainer().lock) {
System.out.println(super.getThreadName()
+ " want to consume: " + i);
while (!super.getContainer().canConsume(i)) {
super.getContainer().lock.wait();
}
System.out.println(super.getThreadName() + " consuming: "
+ i);
super.getContainer().consume(i);
System.out.println("Container state: "
+ super.getContainer());
}
}
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
System.out.println("Thread " + super.getThreadName() + " exiting.");
}
}
Контейнер:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadContainer {
private int capacity;
private int value;
private Lock locky = new ReentrantLock(true);
public ThreadContainer(int capacity) {
this.capacity = capacity;
this.value = 0;
}
public void produce(int amount){
if(this.value + amount <= this.capacity){
this.value += amount;
}else{
this.value = capacity;
}
}
public void consume(int amount){
if(this.value - amount >= 0 ){
this.value -= amount;
}else{
this.value =0;
}
}
public boolean canProduce(int am){
return (this.value + am) <= this.capacity;
}
public boolean canConsume(int am){
return (this.value - am) >= 0;
}
public boolean tryLock(){
if(this.locky.tryLock()){
this.locky.lock();
return true;
}else{
return false;
}
}
public void unlock(){
this.locky.unlock();
this.locky.notify();
}
public void waitLock() throws InterruptedException{
this.locky.wait();
}
@Override
public String toString() {
return "capacity: " + this.capacity + ", value: " + this.value;
}
}
MainClass:
public class RunFrom {
public static void main(String args[]) {
ThreadContainer container = new ThreadContainer(25);
/*
Producer prod = new Producer("Producer", container);
prod.start();
Consumer cons = new Consumer("Consumer", container);
cons.start();
*/
int prodCount =0;
int conCount =0;
for (int i = 0; i < 5; i++) {
if(i%2 == 0){
Producer prod = new Producer("Producer " + prodCount + " [" + i + "]", container);
prodCount++;
prod.start();
}else{
Consumer cons = new Consumer("Consumer " + conCount + " [" + i + "]", container);
conCount++;
cons.start();
}
}
}
}
Итак, я сделал модификацию следующим образом: ссылка @fildor в посте. Похоже, что она работает для 2 потоков, нормально (1 потребитель и 1 производитель), но все же есть проблема, пока я создаю больше потоков..
- MyThread так же, как оригинал
- потребитель просто потребляет
- производитель просто производит
- блокировка решена в складе контейнера
потребитель
//...
try {
for (int i = 10; i > 0; i--) {
System.out.println(super.getThreadName() + " want to consume: "
+ i);
System.out.println(super.getThreadName() + " consuming: " + i);
super.getContainer().consume(i);
System.out.println("Container state: " + super.getContainer());
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
//...
Режиссер
//...
try {
for (int i = 10; i > 0; i--) {
System.out.println(super.getThreadName() + " want to produce: "
+ i);
System.out.println(super.getThreadName() + " producing: " + i);
super.getContainer().produce(i);
System.out.println("Container state: " + super.getContainer());
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
//...
Складской контейнер
//...
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
//...
public void produce(int amount) {
lock.lock();
try {
while (!canProduce(amount)) {
notFull.wait();
}
if (this.value + amount <= this.capacity) {
this.value += amount;
} else {
this.value = capacity;
}
notEmpty.signal();
} catch (InterruptedException e) {
System.out.println("InterruptedException" + e);
} finally {
lock.unlock();
}
}
public void consume(int amount) {
lock.lock();
try {
while (!canConsume(amount)) {
notEmpty.wait();
}
if (this.value - amount >= 0) {
this.value -= amount;
} else {
this.value = 0;
}
notFull.signal();
} catch (InterruptedException e) {
System.out.println("InterruptedException" + e);
} finally {
lock.unlock();
}
}
для 4 потоков (2 производителя и 2 потребителя) вывод выглядит так:
Creating Producer 0 [0]
Starting Producer 0 [0]
Running Producer 0 [0]
Producer 0 [0] want to produce: 10
Producer 0 [0] producing: 10
Container state: capacity: 25, value: 10
Creating Consumer 0 [1]
Starting Consumer 0 [1]
Creating Producer 1 [2]
Starting Producer 1 [2]
Creating Consumer 1 [3]
Running Consumer 0 [1]
Starting Consumer 1 [3]
Creating Producer 2 [4]
Starting Producer 2 [4]
Running Producer 1 [2]
Producer 1 [2] want to produce: 10
Producer 1 [2] producing: 10
Consumer 0 [1] want to consume: 10
Consumer 0 [1] consuming: 10
Container state: capacity: 25, value: 20
Container state: capacity: 25, value: 10
Running Consumer 1 [3]
Consumer 1 [3] want to consume: 10
Running Producer 2 [4]
Producer 2 [4] want to produce: 10
Producer 2 [4] producing: 10
Container state: capacity: 25, value: 20
Consumer 1 [3] consuming: 10
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 9
Producer 0 [0] producing: 9
Container state: capacity: 25, value: 19
Consumer 0 [1] want to consume: 9
Consumer 0 [1] consuming: 9
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 9
Producer 1 [2] producing: 9
Container state: capacity: 25, value: 19
Producer 2 [4] want to produce: 9
Producer 2 [4] producing: 9
Exception in thread "Producer 2 [4]" Consumer 1 [3] want to consume: 9
Consumer 1 [3] consuming: 9
Container state: capacity: 25, value: 10
java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Unknown Source)
at test.ThreadContainer.produce(ThreadContainer.java:24)
at test.Producer.run(Producer.java:21)
at java.lang.Thread.run(Unknown Source)
Producer 0 [0] want to produce: 8
Producer 0 [0] producing: 8
Container state: capacity: 25, value: 18
Consumer 0 [1] want to consume: 8
Consumer 0 [1] consuming: 8
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 8
Producer 1 [2] producing: 8
Container state: capacity: 25, value: 18
Consumer 1 [3] want to consume: 8
Consumer 1 [3] consuming: 8
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 7
Producer 0 [0] producing: 7
Container state: capacity: 25, value: 17
Consumer 0 [1] want to consume: 7
Consumer 0 [1] consuming: 7
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 7
Producer 1 [2] producing: 7
Container state: capacity: 25, value: 17
Consumer 1 [3] want to consume: 7
Consumer 1 [3] consuming: 7
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 6
Producer 0 [0] producing: 6
Container state: capacity: 25, value: 16
Consumer 0 [1] want to consume: 6
Consumer 0 [1] consuming: 6
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 6
Producer 1 [2] producing: 6
Container state: capacity: 25, value: 16
Consumer 1 [3] want to consume: 6
Consumer 1 [3] consuming: 6
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 5
Producer 0 [0] producing: 5
Container state: capacity: 25, value: 15
Consumer 0 [1] want to consume: 5
Consumer 0 [1] consuming: 5
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 5
Producer 1 [2] producing: 5
Container state: capacity: 25, value: 15
Consumer 1 [3] want to consume: 5
Consumer 1 [3] consuming: 5
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 4
Producer 0 [0] producing: 4
Container state: capacity: 25, value: 14
Consumer 0 [1] want to consume: 4
Consumer 0 [1] consuming: 4
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 4
Producer 1 [2] producing: 4
Container state: capacity: 25, value: 14
Consumer 1 [3] want to consume: 4
Consumer 1 [3] consuming: 4
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 3
Producer 0 [0] producing: 3
Container state: capacity: 25, value: 13
Consumer 0 [1] want to consume: 3
Consumer 0 [1] consuming: 3
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 3
Producer 1 [2] producing: 3
Container state: capacity: 25, value: 13
Consumer 1 [3] want to consume: 3
Consumer 1 [3] consuming: 3
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 2
Producer 0 [0] producing: 2
Container state: capacity: 25, value: 12
Consumer 0 [1] want to consume: 2
Consumer 0 [1] consuming: 2
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 2
Producer 1 [2] producing: 2
Container state: capacity: 25, value: 12
Consumer 1 [3] want to consume: 2
Consumer 1 [3] consuming: 2
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 1
Producer 0 [0] producing: 1
Container state: capacity: 25, value: 11
Consumer 0 [1] want to consume: 1
Consumer 0 [1] consuming: 1
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 1
Producer 1 [2] producing: 1
Container state: capacity: 25, value: 11
Consumer 1 [3] want to consume: 1
Consumer 1 [3] consuming: 1
Container state: capacity: 25, value: 10
Thread Producer 0 [0] exiting.
Thread Consumer 0 [1] exiting.
Thread Producer 1 [2] exiting.
Thread Consumer 1 [3] exiting.
Нет ли проблемы, потому что все больше потребителей ждут того же сигнала, что и другие производители?
2 ответа
Вы действительно должны где-нибудь прочитать учебник по потокам в Java 101. Исключение, которое вы получаете, состоит в том, что вы ожидаете на объекте, не приобретая встроенную блокировку. Учитывая любой объект, идентифицированный с lock
Идиоматический код:
synchronized (lock) {
while (!condition) {
lock.wait();
}
}
Почему так сложно?
В чем выгода MyThread
учебный класс? Все, что я вижу, это код для запуска потока. Мне не нужно определять новый класс для этого. Я могу начать новый поток с одной строки кода:
new Thread(new Producer(...)).start;
Тогда есть твой ThreadContainer
класс, который имеет методы, предназначенные только для вызова потоком потребителя, и другие методы, предназначенные только для потока производителя. Это нарушает основной принцип проектирования: каждый класс должен отвечать только за одну вещь.
Большая часть вашего кода - это клей, который связывает вещи с другими вещами. (например, методы получения и установки являются клеем, и ваши MyThread
класс не что иное, как клей). Связывание вещей с другими вещами называется связью, а когда у вас их много, это называется тесной связью.
Чем более тесно связаны части вашей программы, тем сложнее будет понять и изменить программу. Сильно связанные системы, более вероятно, будут сломаны, и их будет сложнее диагностировать и ремонтировать, если они сломаны.
Слабая связь всегда лучше: не делайте классы и методы зависимыми друг от друга, когда это не является абсолютно необходимым.
Наконец, если бы я хотел продемонстрировать концепцию производителя / потребителя, я бы не загромождал демо с помощью wait()/notify(). Методы wait () и notify () являются низкоуровневыми примитивами, которые предназначены для использования в реализации объектов синхронизации более высокого уровня, и они должны быть скрыты от кода, который работает на более высоком уровне.
Если вы хотите продемонстрировать wait()/notify(), это одно. Если вы хотите продемонстрировать производителя / потребителя, это другое. Я бы не пытался втиснуть обе демонстрации в одну программу. Каждое понятие только мешает хвастаться другим.
Стандартная библиотека Java предоставляет множество готовых высокоуровневых объектов синхронизации, которые вы можете использовать. Вероятно, наиболее универсальным является BlockingQueue
, У моей демонстрации производителя / потребителя была бы ветка производителя, которая вставляет "продукты" в ArrayBlockingQueue
и потребительская нить, которая вытаскивает их и оперирует ими.