Правильно ли выполнена блокировка
Я читаю фид UDP, затем декодирую его и пишу в MSMQ(очередь сообщений).
Я создаю новый поток, который вызывает UDPReader. В свою очередь UDPReader создает пул потоков и вызывает класс ipaddrConnection. Запуск внутри ipaddrConnection содержит цикл while, который непрерывно читает пакеты из многоадресного сокета и отправляет его в класс parseUDP. Из parseUDP он декодируется и, наконец, передается в класс, который пишет в MSMQ. Я считаю, что я не блокирую потоки должным образом, когда дело доходит до цикла while в ipaddrConnection, потому что потоки пытаются записать в ту же область памяти в MSMQ. Я думал, что, поместив мою блокировку в цикл while, каждый поток в пуле будет иметь свое собственное время в "Критическом разделе": 1. получить пакет, затем 2. расшифровать и записать в MSMQ. Я все еще изучаю параллелизм и ищу некоторую помощь. Я предоставил файл аварийного дампа, который я не понимаю, как правильно читать, и мои классы UDPReader и ipaddrConnection. parseUDP вызывает класс для декодирования пакета, и этот класс вызывает класс MSMQ для записи в память. Все это в моем критическом разделе.
class UDPReader implements Runnable
{
private final String ip, socket, queue, threadName;
private final JTextArea screen;
UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen)
{
this.ip = ip;
this.socket = socket;
this.queue = queue;
this.threadName = threadName;
this.screen = screen;
}
public void run()
{
screen.append("Thread " + threadName + " running\n\n");
ExecutorService executor = Executors.newFixedThreadPool(5);
Runnable reader = new ipaddrConnection(ip, socket, queue);
executor.execute(reader);
}
}
public final class ipaddrConnection implements Runnable
{
private final ReentrantLock lock = new ReentrantLock();
byte[] bytes = new byte[(int)100000];
InetAddress group;
MulticastSocket s;
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
private String queue;
public ipaddrConnection(String ip, String socket, String queue) {
try {
this.s = new MulticastSocket(Integer.parseInt(socket));
this.group = InetAddress.getByName(ip);
this.queue = queue;
} catch (IOException ex) {
Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
try {
parseUDP p = new parseUDP(queue);
s.joinGroup(group);
s.setSoTimeout(95000);
try{
while(true){
lock.lock();
s.receive(packet);
p.parseUDP(packet.getData());
}
}finally {
lock.unlock();
}
} catch (SocketException ex) {
Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
Отчет о https://drive.google.com/file/d/0B4GWNCU6_CBlM2tJNGJqNzRVazg/view?usp=sharing
3 ответа
В вашем коде ваши блокировки не делают ничего полезного.
Каждый поток имеет свою собственную блокировку, поэтому одновременно может быть несколько потоков, использующих очередь (потому что поток 1 заблокировал блокировку 1, а поток 2 заблокировал блокировку 2, и ничто не мешает им использовать очередь одновременно).
Если вы сделаете lock
поле static
в вашем коде все потоки будут использовать одну и ту же блокировку.
У вас все еще могут быть проблемы, потому что потоки никогда не снимают блокировку (если они не встречают исключение), поэтому только одному потоку будет разрешено выполнять работу:
try{
while(true){
lock.lock();
s.receive(packet);
p.parseUDP(packet.getData());
}
}finally {
lock.unlock();
}
Заметьте, как единственный способ, которым поток может разблокировать блокировку, - это исключение?
Возможно, вы хотели что-то более похожее на это:
while(true) {
s.receive(packet);
try {
lock.lock();
s.parseUDP(packet.getData());
} finally {
lock.unlock();
}
}
- с этой структурой потоки будут удерживать блокировку только во время синтаксического анализа пакетов, а не во время приема пакетов. (Я не знаю, действительно ли вы этого хотите)
ExecutorService executor = Executors.newFixedThreadPool(5);
Runnable reader = new ipaddrConnection(ip, socket, queue);
executor.execute(reader);
Этот код, по сути, является однопоточным, поскольку, хотя в пуле пять потоков, вы используете только один.
- Использование UDPReader
Runnable
И егоrun()
реализация по меньшей мере не идиоматическая. - Как уже упоминалось в immibis, ваши блокирующие объекты не разделяются между потоками и не обеспечивают защиту, которую вы ищете.
Вы разблокируете только при выходе
while (true) { ... }
что сказать никогда. Имея это в виду, вы можете рассмотреть что-то вроде:public class UDPReader { ... UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen, numberOfThreads) { ... this.numberOfThreads = numberOfThreads; this.lock = new ReentrantLock(); } public void run() { ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++){ executor.execute(new ipaddrConnection(ip, socket, queue, lock)); } } } public final class ipaddrConnection implements Runnable { private lock ; ... public ipaddrConnection(String ip, String socket, String queue, ReentrantLock lock) { ... this.lock = lock; } @Override public void run() { ... while (true) { try { lock.lock(); s.receive(packet); p.parseUDP(packet.getData()); } finally { lock.unlock(); } } .... } } }