Работа с BlockingQueue и Multithreads. Все темы застряли в ожидании

Я создаю систему, которая будет иметь несколько развертываний комплектов, и у каждого развертывания будет очередь тестовых комплектов. Поскольку я хочу, чтобы тестовые наборы запускались одновременно при развертывании их отдельных пакетов, мне нужно добавить параллелизм в код. Я создал упрощенную версию кода, который я использую, но часть параллелизма не работает, когда я пытаюсь закрыть ее.

Когда вызывается Runner.stopEverything(), получается, что очередь очищается и ожидает завершения потоков, но даже когда все тесты завершены, ожидание никогда не заканчивается даже с помощью notifyAll(). В результате процесс просто сидит и никогда не заканчивается. Я смотрю на это в режиме отладки, и в результате все 3 потока показывают ожидание.

Главный:

public static void main(String args[]) throws Exception {
  Runner.queueTestSuites("SD1", Arrays.asList("A", "B", "C"));
  Runner.queueTestSuites("SD2", Arrays.asList("D", "E", "F"));
  Runner.queueTestSuites("SD3", Arrays.asList("G", "H", "I"));

  Thread.sleep(5000);

  System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~");
  Runner.stopEverything();
}

Второе место:

public class Runner {
  private static Map<String, TestQueue> runnerQueueMap = new ConcurrentHashMap<>();

  public synchronized static void queueTestSuites(String suiteDeployment, List<String> testSuiteQueueAsJSON) throws Exception {
    TestQueue queue;
    if(runnerQueueMap.containsKey(suiteDeployment)) {
      queue = runnerQueueMap.get(suiteDeployment);
    } else {
      queue = new TestQueue(suiteDeployment);
    }
    for (int i = 0; i < testSuiteQueueAsJSON.size(); i++) {
      String name = testSuiteQueueAsJSON.get(i);
      queue.addToQueue(name);
    }
    runnerQueueMap.put(suiteDeployment,queue);
  }

  public synchronized static void stopEverything() throws InterruptedException {
    for (String s : runnerQueueMap.keySet()) {
      TestQueue q = runnerQueueMap.get(s);
      q.saveAndClearQueue();
    }

    for (String s : runnerQueueMap.keySet()) {
      TestQueue q = runnerQueueMap.get(s);
      q.waitForThread();
    }

    System.out.println("All done at " + new Date());
  }
}

TestQueue:

public class TestQueue {

  private Consumer consumer;
  private Thread consumerThread;
  private java.util.concurrent.BlockingQueue<String> queue;
  private String suiteDeployment;

  public TestQueue(String suiteDeployment) {
    this.suiteDeployment = suiteDeployment;
    queue = new ArrayBlockingQueue<>(100);
    startConsumer();
  }

  public void addToQueue(String testSuite) {
    try {
      queue.put(testSuite);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public synchronized void waitForThread() {
    try {
      if (consumer.running.get()) {
        synchronized (consumerThread) {
          System.out.println("Waiting for " + consumerThread.getName());
          consumerThread.wait();
        }
      }
      System.out.println("Thread complete at " + new Date());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void saveAndClearQueue() {
    List<String> suiteNames = new ArrayList<>();
    for (String suite : queue) {
      suiteNames.add(suite);
    }
    queue.clear();
  }

  private void startConsumer() {
    consumer = new Consumer(queue,suiteDeployment);
    consumerThread = new Thread(consumer);
    consumerThread.start();
  }

  private class Consumer implements Runnable{
    private BlockingQueue<String> queue;
    private String suiteDeployment;
    public AtomicBoolean running;

    public Consumer(BlockingQueue<String> queue, String suiteDeployment){
      this.queue = queue;
      this.suiteDeployment = suiteDeployment;
      this.running = new AtomicBoolean(false);
    }

    @Override
    public void run() {
      try{
        while(!Thread.currentThread().isInterrupted()) {
          String testSuite = queue.take();
          this.running.set(true);
          new Test(testSuite, suiteDeployment).run();
          this.running.set(false);
        }
        notifyAll();
      }catch(Exception e) {
        e.printStackTrace();
      }
    }
  }
}

Тестовое задание:

public class Test {

  String testSuite = "";
  String suiteDeployment = "";

  public Test(String testSuite, String suiteDeployment) {
    this.testSuite = testSuite;
    this.suiteDeployment = suiteDeployment;
  }

  public void run() {
    int time = new Random().nextInt() % 10000;
    time = Math.max(time, 3000);
    System.out.println("Test Started: " + testSuite + " on " + suiteDeployment + " at " + new Date() + " running for " + time + " on thread " + Thread.currentThread().getName());
    try {
      Thread.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Test Completed: " + testSuite + " on " + suiteDeployment + " at " + new Date());
  }
}

1 ответ

Решение

Внутри метода run вашего потребителя у вас есть блокирующий вызов queue.take(), что означает, что он будет блокироваться до тех пор, пока внутри вашей очереди не появится элемент. В конце концов у вас заканчиваются элементы в очереди, и все ваши потоки блокируются вызовом queue.take(), ожидающим, пока больше элементов не станет доступным для обработки.

Хотя ваш вызов находится в цикле while, где он проверяет, прерван ли поток, вы фактически никогда не прерываете потоки, поэтому он никогда не попадает в оценку цикла while и не блокируется при вызове queue.take()

Таким образом, ваши потоки ждут, пока они ожидают ввода, чтобы стать доступными в вашей очереди блокировки

Также ваш метод saveAndClear должен заблокировать правильный объект, который является самой очередью, как показано ниже:

public void saveAndClearQueue() {
    List<String> suiteNames = new ArrayList<String>();
    synchronized (queue) {
        for (String suite : queue) {
            suiteNames.add(suite);
        }
        queue.clear();
    }
    System.out.println("Saved(not executed) : "+suiteNames);
}

И ваш метод waitForThread должен выполнить следующее:

public void waitForThread() {
    synchronized (consumerThread) {
        while (consumer.running.get()) {
            try {
                consumerThread.wait(100);
            } catch (InterruptedException e) {
                break;
            }
        }
    }

    if (!consumer.running.get()) {
        consumerThread.interrupt();
    }

    System.out.println("Thread complete at " + new Date());
}
Другие вопросы по тегам