Работа с BlockingQueue и Multithreads. Все темы застряли в ожидании
Я создаю систему, которая будет иметь несколько развертываний комплектов, и у каждого развертывания будет очередь тестовых комплектов. Поскольку я хочу, чтобы тестовые наборы запускались одновременно при развертывании их отдельных пакетов, мне нужно добавить параллелизм в код. Я создал упрощенную версию кода, который я использую, но часть параллелизма не работает, когда я пытаюсь закрыть ее.
Когда вызывается Runner.stopEverything(), получается, что очередь очищается и ожидает завершения потоков, но даже когда все тесты завершены, ожидание никогда не заканчивается даже с помощью notifyAll(). В результате процесс просто сидит и никогда не заканчивается. Я смотрю на это в режиме отладки, и в результате все 3 потока показывают ожидание.
Главный:
public static void main(String args[]) throws Exception {
Runner.queueTestSuites("SD1", Arrays.asList("A", "B", "C"));
Runner.queueTestSuites("SD2", Arrays.asList("D", "E", "F"));
Runner.queueTestSuites("SD3", Arrays.asList("G", "H", "I"));
Thread.sleep(5000);
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~");
Runner.stopEverything();
}
Второе место:
public class Runner {
private static Map<String, TestQueue> runnerQueueMap = new ConcurrentHashMap<>();
public synchronized static void queueTestSuites(String suiteDeployment, List<String> testSuiteQueueAsJSON) throws Exception {
TestQueue queue;
if(runnerQueueMap.containsKey(suiteDeployment)) {
queue = runnerQueueMap.get(suiteDeployment);
} else {
queue = new TestQueue(suiteDeployment);
}
for (int i = 0; i < testSuiteQueueAsJSON.size(); i++) {
String name = testSuiteQueueAsJSON.get(i);
queue.addToQueue(name);
}
runnerQueueMap.put(suiteDeployment,queue);
}
public synchronized static void stopEverything() throws InterruptedException {
for (String s : runnerQueueMap.keySet()) {
TestQueue q = runnerQueueMap.get(s);
q.saveAndClearQueue();
}
for (String s : runnerQueueMap.keySet()) {
TestQueue q = runnerQueueMap.get(s);
q.waitForThread();
}
System.out.println("All done at " + new Date());
}
}
TestQueue:
public class TestQueue {
private Consumer consumer;
private Thread consumerThread;
private java.util.concurrent.BlockingQueue<String> queue;
private String suiteDeployment;
public TestQueue(String suiteDeployment) {
this.suiteDeployment = suiteDeployment;
queue = new ArrayBlockingQueue<>(100);
startConsumer();
}
public void addToQueue(String testSuite) {
try {
queue.put(testSuite);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void waitForThread() {
try {
if (consumer.running.get()) {
synchronized (consumerThread) {
System.out.println("Waiting for " + consumerThread.getName());
consumerThread.wait();
}
}
System.out.println("Thread complete at " + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void saveAndClearQueue() {
List<String> suiteNames = new ArrayList<>();
for (String suite : queue) {
suiteNames.add(suite);
}
queue.clear();
}
private void startConsumer() {
consumer = new Consumer(queue,suiteDeployment);
consumerThread = new Thread(consumer);
consumerThread.start();
}
private class Consumer implements Runnable{
private BlockingQueue<String> queue;
private String suiteDeployment;
public AtomicBoolean running;
public Consumer(BlockingQueue<String> queue, String suiteDeployment){
this.queue = queue;
this.suiteDeployment = suiteDeployment;
this.running = new AtomicBoolean(false);
}
@Override
public void run() {
try{
while(!Thread.currentThread().isInterrupted()) {
String testSuite = queue.take();
this.running.set(true);
new Test(testSuite, suiteDeployment).run();
this.running.set(false);
}
notifyAll();
}catch(Exception e) {
e.printStackTrace();
}
}
}
}
Тестовое задание:
public class Test {
String testSuite = "";
String suiteDeployment = "";
public Test(String testSuite, String suiteDeployment) {
this.testSuite = testSuite;
this.suiteDeployment = suiteDeployment;
}
public void run() {
int time = new Random().nextInt() % 10000;
time = Math.max(time, 3000);
System.out.println("Test Started: " + testSuite + " on " + suiteDeployment + " at " + new Date() + " running for " + time + " on thread " + Thread.currentThread().getName());
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Test Completed: " + testSuite + " on " + suiteDeployment + " at " + new Date());
}
}
1 ответ
Внутри метода run вашего потребителя у вас есть блокирующий вызов queue.take(), что означает, что он будет блокироваться до тех пор, пока внутри вашей очереди не появится элемент. В конце концов у вас заканчиваются элементы в очереди, и все ваши потоки блокируются вызовом queue.take(), ожидающим, пока больше элементов не станет доступным для обработки.
Хотя ваш вызов находится в цикле while, где он проверяет, прерван ли поток, вы фактически никогда не прерываете потоки, поэтому он никогда не попадает в оценку цикла while и не блокируется при вызове queue.take()
Таким образом, ваши потоки ждут, пока они ожидают ввода, чтобы стать доступными в вашей очереди блокировки
Также ваш метод saveAndClear должен заблокировать правильный объект, который является самой очередью, как показано ниже:
public void saveAndClearQueue() {
List<String> suiteNames = new ArrayList<String>();
synchronized (queue) {
for (String suite : queue) {
suiteNames.add(suite);
}
queue.clear();
}
System.out.println("Saved(not executed) : "+suiteNames);
}
И ваш метод waitForThread должен выполнить следующее:
public void waitForThread() {
synchronized (consumerThread) {
while (consumer.running.get()) {
try {
consumerThread.wait(100);
} catch (InterruptedException e) {
break;
}
}
}
if (!consumer.running.get()) {
consumerThread.interrupt();
}
System.out.println("Thread complete at " + new Date());
}