Правильно ли выполнена блокировка

Я читаю фид UDP, затем декодирую его и пишу в MSMQ(очередь сообщений).

Я создаю новый поток, который вызывает UDPReader. В свою очередь UDPReader создает пул потоков и вызывает класс ipaddrConnection. Запуск внутри ipaddrConnection содержит цикл while, который непрерывно читает пакеты из многоадресного сокета и отправляет его в класс parseUDP. Из parseUDP он декодируется и, наконец, передается в класс, который пишет в MSMQ. Я считаю, что я не блокирую потоки должным образом, когда дело доходит до цикла while в ipaddrConnection, потому что потоки пытаются записать в ту же область памяти в MSMQ. Я думал, что, поместив мою блокировку в цикл while, каждый поток в пуле будет иметь свое собственное время в "Критическом разделе": 1. получить пакет, затем 2. расшифровать и записать в MSMQ. Я все еще изучаю параллелизм и ищу некоторую помощь. Я предоставил файл аварийного дампа, который я не понимаю, как правильно читать, и мои классы UDPReader и ipaddrConnection. parseUDP вызывает класс для декодирования пакета, и этот класс вызывает класс MSMQ для записи в память. Все это в моем критическом разделе.

class UDPReader implements Runnable
{
    private final String ip, socket, queue, threadName;
    private final JTextArea screen;

    UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen) 
    {
        this.ip = ip;
        this.socket = socket;
        this.queue = queue;
        this.threadName = threadName;
        this.screen = screen;
    }

    public void run()
    {
        screen.append("Thread " + threadName + " running\n\n");
        ExecutorService executor = Executors.newFixedThreadPool(5);
        Runnable reader = new ipaddrConnection(ip, socket, queue);
        executor.execute(reader);
    }

}

public final class ipaddrConnection implements Runnable
{
    private final ReentrantLock lock = new ReentrantLock();
    byte[] bytes = new byte[(int)100000];
    InetAddress group; 
    MulticastSocket s;
    DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
    private String queue;

    public ipaddrConnection(String ip, String socket, String queue) {
        try {
            this.s = new MulticastSocket(Integer.parseInt(socket));
            this.group = InetAddress.getByName(ip);
            this.queue = queue;
        } catch (IOException ex) {
            Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

        @Override
        public void run() {
            try {
                parseUDP p = new parseUDP(queue);
                s.joinGroup(group);
                s.setSoTimeout(95000);

                try{
                    while(true){
                        lock.lock();
                        s.receive(packet);
                        p.parseUDP(packet.getData());
                    } 
                }finally {
                    lock.unlock();
                }


             } catch (SocketException ex) {
                Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

}

Отчет о https://drive.google.com/file/d/0B4GWNCU6_CBlM2tJNGJqNzRVazg/view?usp=sharing

3 ответа

Решение

В вашем коде ваши блокировки не делают ничего полезного.

Каждый поток имеет свою собственную блокировку, поэтому одновременно может быть несколько потоков, использующих очередь (потому что поток 1 заблокировал блокировку 1, а поток 2 заблокировал блокировку 2, и ничто не мешает им использовать очередь одновременно).

Если вы сделаете lock поле static в вашем коде все потоки будут использовать одну и ту же блокировку.

У вас все еще могут быть проблемы, потому что потоки никогда не снимают блокировку (если они не встречают исключение), поэтому только одному потоку будет разрешено выполнять работу:

try{
    while(true){
        lock.lock();
        s.receive(packet);
        p.parseUDP(packet.getData());
    } 
}finally {
    lock.unlock();
}

Заметьте, как единственный способ, которым поток может разблокировать блокировку, - это исключение?

Возможно, вы хотели что-то более похожее на это:

while(true) {
    s.receive(packet);
    try {
        lock.lock();
        s.parseUDP(packet.getData());
    } finally {
        lock.unlock();
    }
}

- с этой структурой потоки будут удерживать блокировку только во время синтаксического анализа пакетов, а не во время приема пакетов. (Я не знаю, действительно ли вы этого хотите)

ExecutorService executor = Executors.newFixedThreadPool(5);
Runnable reader = new ipaddrConnection(ip, socket, queue);
executor.execute(reader);

Этот код, по сути, является однопоточным, поскольку, хотя в пуле пять потоков, вы используете только один.

  1. Использование UDPReader Runnable И его run() реализация по меньшей мере не идиоматическая.
  2. Как уже упоминалось в immibis, ваши блокирующие объекты не разделяются между потоками и не обеспечивают защиту, которую вы ищете.
  3. Вы разблокируете только при выходе while (true) { ... } что сказать никогда. Имея это в виду, вы можете рассмотреть что-то вроде:

        public class UDPReader {
            ...
    
            UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen, numberOfThreads) {
                ...
                this.numberOfThreads = numberOfThreads;
                this.lock = new ReentrantLock();
            }
    
            public void run() {
                ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
                for (int i = 0; i < numberOfThreads; i++){
                    executor.execute(new ipaddrConnection(ip, socket, queue, lock));
                }
            }
         }
    
    
         public final class ipaddrConnection implements Runnable {
            private lock ;
            ...
    
            public ipaddrConnection(String ip, String socket, String queue, ReentrantLock lock) {
                ...
                this.lock = lock;
            }
    
            @Override
            public void run() {
                    ...
                    while (true) {
                        try {
                            lock.lock();
                            s.receive(packet);
                            p.parseUDP(packet.getData());
                        } finally {
                            lock.unlock();
                        }
                    }
                 ....
            }
        }
    }
    
Другие вопросы по тегам