Потребитель, производитель-мьютекс, синхронно-критическая секция

Я попробовал типичный пример многопоточности, после чего я хотел бы попробовать типичную проблему производитель-потребитель.

(Производитель может производить, если есть место, а также, если потребитель не потребляет, и наоборот)

Но у меня есть проблема с shared-ресурсами, в Java что-то вроде семафоров в C? (использование с функциями ожидания и отправки)

  • Я нашел синхронизированный пример
  • но я хотел бы попробовать что-то, что я могу контролировать вручную (например, семафоры в C)

Я имею:

  • class MyThread implements Runnable- базовый класс для моих тем
  • class Producer extends MyThread- продюсерская нить
  • class Consumer extends MyThread- потребительский класс
  • class ThreadContainer- общие ресурсы (акции)

В ThreadContainer Я подготовил некоторый замок, который я нашел и попробовал, но он не работает как следует:

java.lang.IllegalMonitorStateException
    at java.lang.Object.notify(Native Method)Running Consumer 0 [1]
java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)
(etc.)

Я буду благодарен, если кто-нибудь объяснит мне, "как".

Классы ниже

MyThread:

public class MyThread implements Runnable {
    private Thread t;
    private String threadName;

    private ThreadContainer container;

    MyThread(String name, ThreadContainer cont) {
        threadName = name;
        this.container = cont;
        System.out.println("Creating " + threadName);
    }

    public void run() {

    }

    public void start() {
        System.out.println("Starting " + threadName);
        if (t == null) {
            t = new Thread(this, threadName);
            t.start();
        }
    }

    public ThreadContainer getContainer() {
        return container;
    }

    public String getThreadName() {
        return threadName;
    }
}

Режиссер:

 public class Producer extends MyThread {

    Producer(String name, ThreadContainer cont) {
        super(name, cont);
    }

    public void produce(int amount) {
        super.getContainer().produce(amount);
    }

    @Override
    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {

                synchronized (super.getContainer().lock) {
                    System.out.println(super.getThreadName()
                            + " want to produce: " + i);
                    while (!super.getContainer().canProduce(i)) {
                        super.getContainer().lock.wait();
                    }
                    System.out.println(super.getThreadName() + " producing: "
                            + i);
                    super.getContainer().produce(i);
                    System.out.println("Container state: "
                            + super.getContainer());
                }

            }

            Thread.sleep(50);
        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }

        System.out.println("Thread " + super.getThreadName() + " exiting.");
    }

}

Потребитель:

 public class Consumer extends MyThread {

    Consumer(String name, ThreadContainer cont) {
        super(name, cont);
    }

    public void consume(int am) {
        super.getContainer().consume(am);
    }

    @Override
    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {
                synchronized (super.getContainer().lock) {
                    System.out.println(super.getThreadName()
                            + " want to consume: " + i);
                    while (!super.getContainer().canConsume(i)) {
                        super.getContainer().lock.wait();
                    }
                    System.out.println(super.getThreadName() + " consuming: "
                            + i);
                    super.getContainer().consume(i);
                    System.out.println("Container state: "
                            + super.getContainer());
                }
            }

            Thread.sleep(50);
        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }

        System.out.println("Thread " + super.getThreadName() + " exiting.");
    }

}

Контейнер:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadContainer {
    private int capacity;
    private int value;

    private Lock locky = new ReentrantLock(true);

    public ThreadContainer(int capacity) {
        this.capacity = capacity;
        this.value = 0; 
    }

    public void produce(int amount){
        if(this.value + amount <= this.capacity){
            this.value += amount;
        }else{
            this.value = capacity;
        }
    }

    public void consume(int amount){
        if(this.value - amount >= 0 ){
            this.value -= amount;
        }else{
            this.value =0;
        } 
    }

    public boolean canProduce(int am){
        return (this.value + am) <= this.capacity;
    }

    public boolean canConsume(int am){
        return (this.value - am) >= 0;
    }

    public boolean tryLock(){
        if(this.locky.tryLock()){
            this.locky.lock();
            return true;
        }else{
            return false;
        }
    }

    public void unlock(){
        this.locky.unlock();
        this.locky.notify();
    }

    public void waitLock() throws InterruptedException{
        this.locky.wait();
    }


    @Override
    public String toString() {
        return "capacity: " + this.capacity + ", value: " + this.value;
    }

}

MainClass:

public class RunFrom {
    public static void main(String args[]) {
        ThreadContainer container = new ThreadContainer(25);

        /*
        Producer prod = new Producer("Producer", container);
        prod.start();

        Consumer cons = new Consumer("Consumer", container);
        cons.start();
        */

        int prodCount =0;
        int conCount =0;
        for (int i = 0; i < 5; i++) {
            if(i%2 == 0){
                Producer prod = new Producer("Producer " + prodCount + " [" + i + "]", container);
                prodCount++;
                prod.start();
            }else{
                Consumer cons = new Consumer("Consumer " + conCount + " [" + i + "]", container);
                conCount++;
                cons.start();
            }
        }
    }
}

Итак, я сделал модификацию следующим образом: ссылка @fildor в посте. Похоже, что она работает для 2 потоков, нормально (1 потребитель и 1 производитель), но все же есть проблема, пока я создаю больше потоков..

  • MyThread так же, как оригинал
  • потребитель просто потребляет
  • производитель просто производит
  • блокировка решена в складе контейнера

потребитель

//...
    try {
                for (int i = 10; i > 0; i--) {
                    System.out.println(super.getThreadName() + " want to consume: "
                            + i);
                    System.out.println(super.getThreadName() + " consuming: " + i);
                    super.getContainer().consume(i);
                    System.out.println("Container state: " + super.getContainer());
                    Thread.sleep(100);
                }

            } catch (InterruptedException e) {
                System.out.println("Thread " + super.getThreadName()
                        + " interrupted.");
            }
//...

Режиссер

//...
try {
            for (int i = 10; i > 0; i--) {
                System.out.println(super.getThreadName() + " want to produce: "
                        + i);
                System.out.println(super.getThreadName() + " producing: " + i);
                super.getContainer().produce(i);
                System.out.println("Container state: " + super.getContainer());
                Thread.sleep(100);
            }

        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }
//...

Складской контейнер

//...
final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
//...

public void produce(int amount) {
        lock.lock();
        try {
            while (!canProduce(amount)) {
                notFull.wait();
            }

            if (this.value + amount <= this.capacity) {
                this.value += amount;
            } else {
                this.value = capacity;
            }
            notEmpty.signal();
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {
            lock.unlock();
        }
    }

    public void consume(int amount) {
        lock.lock();
        try {
            while (!canConsume(amount)) {
                notEmpty.wait();
            }

            if (this.value - amount >= 0) {
                this.value -= amount;
            } else {
                this.value = 0;
            }
            notFull.signal();
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {
            lock.unlock();
        }
    }

для 4 потоков (2 производителя и 2 потребителя) вывод выглядит так:

Creating Producer 0 [0]
Starting Producer 0 [0]
Running Producer 0 [0]
Producer 0 [0] want to produce: 10
Producer 0 [0] producing: 10
Container state: capacity: 25, value: 10
Creating Consumer 0 [1]
Starting Consumer 0 [1]
Creating Producer 1 [2]
Starting Producer 1 [2]
Creating Consumer 1 [3]
Running Consumer 0 [1]
Starting Consumer 1 [3]
Creating Producer 2 [4]
Starting Producer 2 [4]
Running Producer 1 [2]
Producer 1 [2] want to produce: 10
Producer 1 [2] producing: 10
Consumer 0 [1] want to consume: 10
Consumer 0 [1] consuming: 10
Container state: capacity: 25, value: 20
Container state: capacity: 25, value: 10
Running Consumer 1 [3]
Consumer 1 [3] want to consume: 10
Running Producer 2 [4]
Producer 2 [4] want to produce: 10
Producer 2 [4] producing: 10
Container state: capacity: 25, value: 20
Consumer 1 [3] consuming: 10
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 9
Producer 0 [0] producing: 9
Container state: capacity: 25, value: 19
Consumer 0 [1] want to consume: 9
Consumer 0 [1] consuming: 9
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 9
Producer 1 [2] producing: 9
Container state: capacity: 25, value: 19
Producer 2 [4] want to produce: 9
Producer 2 [4] producing: 9
Exception in thread "Producer 2 [4]" Consumer 1 [3] want to consume: 9
Consumer 1 [3] consuming: 9
Container state: capacity: 25, value: 10
java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)
    at test.ThreadContainer.produce(ThreadContainer.java:24)
    at test.Producer.run(Producer.java:21)
    at java.lang.Thread.run(Unknown Source)
Producer 0 [0] want to produce: 8
Producer 0 [0] producing: 8
Container state: capacity: 25, value: 18
Consumer 0 [1] want to consume: 8
Consumer 0 [1] consuming: 8
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 8
Producer 1 [2] producing: 8
Container state: capacity: 25, value: 18
Consumer 1 [3] want to consume: 8
Consumer 1 [3] consuming: 8
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 7
Producer 0 [0] producing: 7
Container state: capacity: 25, value: 17
Consumer 0 [1] want to consume: 7
Consumer 0 [1] consuming: 7
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 7
Producer 1 [2] producing: 7
Container state: capacity: 25, value: 17
Consumer 1 [3] want to consume: 7
Consumer 1 [3] consuming: 7
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 6
Producer 0 [0] producing: 6
Container state: capacity: 25, value: 16
Consumer 0 [1] want to consume: 6
Consumer 0 [1] consuming: 6
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 6
Producer 1 [2] producing: 6
Container state: capacity: 25, value: 16
Consumer 1 [3] want to consume: 6
Consumer 1 [3] consuming: 6
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 5
Producer 0 [0] producing: 5
Container state: capacity: 25, value: 15
Consumer 0 [1] want to consume: 5
Consumer 0 [1] consuming: 5
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 5
Producer 1 [2] producing: 5
Container state: capacity: 25, value: 15
Consumer 1 [3] want to consume: 5
Consumer 1 [3] consuming: 5
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 4
Producer 0 [0] producing: 4
Container state: capacity: 25, value: 14
Consumer 0 [1] want to consume: 4
Consumer 0 [1] consuming: 4
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 4
Producer 1 [2] producing: 4
Container state: capacity: 25, value: 14
Consumer 1 [3] want to consume: 4
Consumer 1 [3] consuming: 4
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 3
Producer 0 [0] producing: 3
Container state: capacity: 25, value: 13
Consumer 0 [1] want to consume: 3
Consumer 0 [1] consuming: 3
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 3
Producer 1 [2] producing: 3
Container state: capacity: 25, value: 13
Consumer 1 [3] want to consume: 3
Consumer 1 [3] consuming: 3
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 2
Producer 0 [0] producing: 2
Container state: capacity: 25, value: 12
Consumer 0 [1] want to consume: 2
Consumer 0 [1] consuming: 2
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 2
Producer 1 [2] producing: 2
Container state: capacity: 25, value: 12
Consumer 1 [3] want to consume: 2
Consumer 1 [3] consuming: 2
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 1
Producer 0 [0] producing: 1
Container state: capacity: 25, value: 11
Consumer 0 [1] want to consume: 1
Consumer 0 [1] consuming: 1
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 1
Producer 1 [2] producing: 1
Container state: capacity: 25, value: 11
Consumer 1 [3] want to consume: 1
Consumer 1 [3] consuming: 1
Container state: capacity: 25, value: 10
Thread Producer 0 [0] exiting.
Thread Consumer 0 [1] exiting.
Thread Producer 1 [2] exiting.
Thread Consumer 1 [3] exiting.

Нет ли проблемы, потому что все больше потребителей ждут того же сигнала, что и другие производители?

2 ответа

Решение

Вы действительно должны где-нибудь прочитать учебник по потокам в Java 101. Исключение, которое вы получаете, состоит в том, что вы ожидаете на объекте, не приобретая встроенную блокировку. Учитывая любой объект, идентифицированный с lockИдиоматический код:

synchronized (lock) {
  while (!condition) {
    lock.wait();
  }
}

Почему так сложно?

В чем выгода MyThread учебный класс? Все, что я вижу, это код для запуска потока. Мне не нужно определять новый класс для этого. Я могу начать новый поток с одной строки кода:

new Thread(new Producer(...)).start;

Тогда есть твой ThreadContainer класс, который имеет методы, предназначенные только для вызова потоком потребителя, и другие методы, предназначенные только для потока производителя. Это нарушает основной принцип проектирования: каждый класс должен отвечать только за одну вещь.

Большая часть вашего кода - это клей, который связывает вещи с другими вещами. (например, методы получения и установки являются клеем, и ваши MyThread класс не что иное, как клей). Связывание вещей с другими вещами называется связью, а когда у вас их много, это называется тесной связью.

Чем более тесно связаны части вашей программы, тем сложнее будет понять и изменить программу. Сильно связанные системы, более вероятно, будут сломаны, и их будет сложнее диагностировать и ремонтировать, если они сломаны.

Слабая связь всегда лучше: не делайте классы и методы зависимыми друг от друга, когда это не является абсолютно необходимым.


Наконец, если бы я хотел продемонстрировать концепцию производителя / потребителя, я бы не загромождал демо с помощью wait()/notify(). Методы wait () и notify () являются низкоуровневыми примитивами, которые предназначены для использования в реализации объектов синхронизации более высокого уровня, и они должны быть скрыты от кода, который работает на более высоком уровне.

Если вы хотите продемонстрировать wait()/notify(), это одно. Если вы хотите продемонстрировать производителя / потребителя, это другое. Я бы не пытался втиснуть обе демонстрации в одну программу. Каждое понятие только мешает хвастаться другим.

Стандартная библиотека Java предоставляет множество готовых высокоуровневых объектов синхронизации, которые вы можете использовать. Вероятно, наиболее универсальным является BlockingQueue, У моей демонстрации производителя / потребителя была бы ветка производителя, которая вставляет "продукты" в ArrayBlockingQueueи потребительская нить, которая вытаскивает их и оперирует ими.

Другие вопросы по тегам