Понимание Java ExecutorService

Я пытаюсь узнать, как использовать executorservice Java,

Я читал следующее обсуждение Java поток простой очереди

В этом есть пример

ExecutorService service = Executors.newFixedThreadPool(10);
// now submit our jobs
service.submit(new Runnable() {
    public void run() {
    do_some_work();
   }
});
// you can submit any number of jobs and the 10 threads will work on them
// in order
...
// when no more to submit, call shutdown
service.shutdown();
// now wait for the jobs to finish
service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

Я попытался реализовать это решение, для этого я создал одну форму и поместил кнопки "Пуск" и "Стоп", но проблема, с которой я сталкиваюсь, заключается в том, что если я вызову этот процесс при нажатии кнопки "Пуск", он повесит полную форму, и нам нужно подождать, пока весь процесс выполнен.

Я также попытался прочитать следующий https://www3.ntu.edu.sg/home/ehchua/programming/java/J5e_multithreading.html

но до сих пор я не могу понять, как заставить его работать, так как после нажатия кнопки "Пуск" я должен получить доступ обратно, предположим, что я хочу остановить процесс.

Может кто-нибудь, пожалуйста, направьте меня в правильном направлении.

Спасибо

Чтобы сделать мою ситуацию более понятной, я добавляю код, который я тестирую.

Проблемы

1) заполненная форма остается замороженной при выполнении программы. 2) Progressbar не работает, будет отображать статус только после завершения всего процесса.

private void btnStartActionPerformed(java.awt.event.ActionEvent evt) {                                         
  TestConneciton();

}                                        

private void btnStopActionPerformed(java.awt.event.ActionEvent evt) {                                        
    flgStop = true;
}   

   private static final int MYTHREADS = 30;
private boolean flgStop = false;
public  void TestConneciton() {
    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS);
    String[] hostList = { "http://crunchify.com", "http://yahoo.com",
            "http://www.ebay.com", "http://google.com",
            "http://www.example.co", "https://paypal.com",
            "http://bing.com/", "http://techcrunch.com/",
            "http://mashable.com/", "http://thenextweb.com/",
            "http://wordpress.com/", "http://wordpress.org/",
            "http://example.com/", "http://sjsu.edu/",
            "http://ebay.co.uk/", "http://google.co.uk/",
            "http://www.wikipedia.org/",
            "http://en.wikipedia.org/wiki/Main_Page" };

    pbarStatus.setMaximum(hostList.length-1);
    pbarStatus.setValue(0);
    for (int i = 0; i < hostList.length; i++) {

        String url = hostList[i];
        Runnable worker = new MyRunnable(url);
        executor.execute(worker);
    }
    executor.shutdown();
    // Wait until all threads are finish
//        while (!executor.isTerminated()) {
// 
//        }
    System.out.println("\nFinished all threads");
}

public  class MyRunnable implements Runnable {
    private final String url;

    MyRunnable(String url) {
        this.url = url;
    }

    @Override
    public void run() {

        String result = "";
        int code = 200;
        try {
            if(flgStop == true)
            {
                //Stop thread execution
            }
            URL siteURL = new URL(url);
            HttpURLConnection connection = (HttpURLConnection) siteURL
                    .openConnection();
            connection.setRequestMethod("GET");
            connection.connect();

            code = connection.getResponseCode();
            pbarStatus.setValue(pbarStatus.getValue()+1);
            if (code == 200) {
                result = "Green\t";
            }
        } catch (Exception e) {
            result = "->Red<-\t";
        }
        System.out.println(url + "\t\tStatus:" + result);
    }
}

3 ответа

Решение

Согласно API ExecutorService, это блокирует:

service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

API цитата:

Блокируется до тех пор, пока все задачи не завершат выполнение после запроса на выключение, или не истечет время ожидания, или текущий поток не прервется, в зависимости от того, что произойдет раньше.

Если вы не хотите, чтобы это блокировало ваш текущий поток, то, возможно, вам следует вызвать его в другом потоке. Кроме того, у вас есть приложение Swing, а затем подумайте об использовании SwingWorker, который, как я считаю, использует ExecutorServices "под капотом".


Основываясь на вашем последнем кусочке кода, я бы использовал

  • SwingWorker для управления всеми фоновыми потоками.
  • Я бы дал SwingWorker ExecutorService
  • а также ExecutorCompletionService, который был инициализирован с ExecutorService, поскольку это позволило бы мне получать результаты задачи по мере их завершения
  • Я бы заполнил его Callables, а не Runnables, так как это позволило бы задаче что-то вернуть, возможно, String, чтобы показать прогресс.
  • Я бы установил свойство прогресса SwingWorker в (100 * taskCount) / totalTaskCount, и мой JProgressBar изменился бы с 0 на 100.
  • Затем я бы использовал пары методов публикации / процесса SwingWorker для извлечения строк, возвращаемых вызываемым объектом.
  • Я бы слушал прогресс SwingWorker в моем GUI с помощью PropertyChangeListener
  • Затем внесите изменения в графический интерфейс этого слушателя.
  • Я бы поменял if (code == 200) { в if (code == HttpURLConnection.HTTP_OK) { избегать магических чисел.
  • Действие JButton отключит само себя, затем создаст новый объект SwingWorker, добавит ProperChangeListener работника к работнику, а затем выполнит работника.

Например

import java.awt.Font;
import java.awt.event.ActionEvent;
import java.awt.event.KeyEvent;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.DefaultListModel;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JList;
import javax.swing.JPanel;
import javax.swing.JProgressBar;
import javax.swing.JScrollPane;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;

@SuppressWarnings("serial")
public class SwingExecutorCompletionService extends JPanel {
   public static final int MYTHREADS = 10;
   private static final int LIST_PROTOTYPE_SIZE = 120;
   private static final String LIST_PROTOTYPE_STRING = "%" + LIST_PROTOTYPE_SIZE + "s";
   public static final String[] HOST_LIST = { 
      "http://crunchify.com",
      "http://yahoo.com", 
      "http://www.ebay.com", 
      "http://google.com",
      "http://www.example.co", 
      "https://paypal.com", 
      "http://bing.com/",
      "http://techcrunch.com/", 
      "http://mashable.com/",
      "http://thenextweb.com/", 
      "http://wordpress.com/",
      "http://wordpress.org/", 
      "http://example.com/", 
      "http://sjsu.edu/",
      "http://ebay.co.uk/", 
      "http://google.co.uk/",
      "http://www.wikipedia.org/", 
      "http://en.wikipedia.org/wiki/Main_Page" };

   private JProgressBar pbarStatus = new JProgressBar(0, 100);
   private JButton doItButton = new JButton(new DoItAction("Do It", KeyEvent.VK_D));
   private DefaultListModel<String> listModel = new DefaultListModel<>();
   private JList<String> resultList = new JList<>(listModel);

   public SwingExecutorCompletionService() {
      resultList.setVisibleRowCount(10);
      resultList.setPrototypeCellValue(String.format(LIST_PROTOTYPE_STRING, ""));
      resultList.setFont(new Font(Font.MONOSPACED, Font.PLAIN, 12));

      add(pbarStatus);
      add(doItButton);
      add(new JScrollPane(resultList, JScrollPane.VERTICAL_SCROLLBAR_ALWAYS, 
            JScrollPane.HORIZONTAL_SCROLLBAR_AS_NEEDED));
   }

   public void addToCompletionList(String element) {
      listModel.addElement(element);
   }

   public void setStatusValue(int progress) {
      pbarStatus.setValue(progress);
   }

   class DoItAction extends AbstractAction {
      public DoItAction(String name, int mnemonic) {
         super(name);
         putValue(MNEMONIC_KEY, mnemonic);
      }

      @Override
      public void actionPerformed(ActionEvent e) {
         setEnabled(false);
         DoItWorker worker = new DoItWorker(HOST_LIST, MYTHREADS);
         SwingExecutorCompletionService gui = SwingExecutorCompletionService.this;
         PropertyChangeListener workerListener = new WorkerChangeListener(gui, this);
         worker.addPropertyChangeListener(workerListener);
         worker.execute();
      }
   }

   private static void createAndShowGui() {
      SwingExecutorCompletionService mainPanel = new SwingExecutorCompletionService();

      JFrame frame = new JFrame("Swing ExecutorCompletionService");
      frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
      frame.getContentPane().add(mainPanel);
      frame.pack();
      frame.setLocationByPlatform(true);
      frame.setVisible(true);
   }

   public static void main(String[] args) {
      SwingUtilities.invokeLater(new Runnable() {
         public void run() {
            createAndShowGui();
         }
      });
   }
}

class MyCallable implements Callable<String> {
   private static final String RED = "->Red<-";
   private static final String GREEN = "Green";
   private final String url;
   private volatile boolean flgStop;

   MyCallable(String url) {
      this.url = url;
   }

   @Override
   public String call() throws Exception {
      String result = "";
      int code = HttpURLConnection.HTTP_OK;
      try {
         // if(flgStop == true)
         if (flgStop) {
            // Stop thread execution
         }
         URL siteURL = new URL(url);
         HttpURLConnection connection = (HttpURLConnection) siteURL
               .openConnection();
         connection.setRequestMethod("GET");
         connection.connect();

         code = connection.getResponseCode();

         // No don't set the prog bar in a background thread!
         // !! pbarStatus.setValue(pbarStatus.getValue()+1); 
         // avoid magic numbers
         if (code == HttpURLConnection.HTTP_OK) {
            result = GREEN;
         }
      } catch (Exception e) {
         result = RED;
      }
      return String.format("%-40s %s", url + ":", result);
   }

}

class WorkerChangeListener implements PropertyChangeListener {
   private Action action;
   private SwingExecutorCompletionService gui;

   public WorkerChangeListener(SwingExecutorCompletionService gui, Action button) {
      this.gui = gui;
      this.action = button;
   }

   @Override
   public void propertyChange(PropertyChangeEvent evt) {
      DoItWorker worker = (DoItWorker)evt.getSource();
      if (evt.getNewValue() == SwingWorker.StateValue.DONE) {
         action.setEnabled(true);
         try {
            worker.get();
         } catch (InterruptedException e) {
            e.printStackTrace();
         } catch (ExecutionException e) {
            e.printStackTrace();
         }
      } else if (DoItWorker.INTERMEDIATE_RESULT.equals(evt.getPropertyName())) {
         gui.addToCompletionList(evt.getNewValue().toString());
      } else if ("progress".equals(evt.getPropertyName())) {
         gui.setStatusValue(worker.getProgress());
      }
   }
}

class DoItWorker extends SwingWorker<Void, String> {
   public static final String INTERMEDIATE_RESULT = "intermediate result";
   private static final long TIME_OUT = 5;
   private static final TimeUnit UNIT = TimeUnit.MINUTES;

   private String intermediateResult;
   private ExecutorService executor;
   private CompletionService<String> completionService;
   private String[] hostList;

   public DoItWorker(String[] hostList, int myThreads) {
      this.hostList = hostList;
      executor = Executors.newFixedThreadPool(myThreads);
      completionService = new ExecutorCompletionService<>(executor);
   }

   @Override
   protected Void doInBackground() throws Exception {
      for (int i = 0; i < hostList.length; i++) {

         String url = hostList[i];
         Callable<String> callable = new MyCallable(url);
         completionService.submit(callable);
      }
      executor.shutdown();
      for (int i = 0; i < hostList.length; i++) {
         String result = completionService.take().get();
         publish(result);
         int progress = (100 * i) / hostList.length;
         setProgress(progress);
      }
      executor.awaitTermination(TIME_OUT, UNIT);
      setProgress(100);
      return null;
   }

   @Override
   protected void process(List<String> chunks) {
      for (String chunk : chunks) {
         setIntermediateResult(chunk);
      }
   }

   private void setIntermediateResult(String intermediateResult) {
      String oldValue = this.intermediateResult;
      String newValue = intermediateResult;
      this.intermediateResult = intermediateResult;
      firePropertyChange(INTERMEDIATE_RESULT, oldValue, newValue);
   }

}

Который будет выглядеть и работать так:

Если вы хотите отменить уже запущенные вакансии, вам придется использовать Callable вместо Runnable, Когда вы отправляете работу, вы получаете Future который вы можете вызвать отменить () на.

public static void main(String[] args) {
  ExecutorService es = Executors.newFixedThreadPool(10);
  Callable<Integer> callable1 = new CallableImpl();
  Future<Integer> future1 = es.submit(callable1);

  // if you decide to cancel your task gracefully
  future1.cancel()
  ...

Тогда вам решать, ThreadInterrupt в вашей реализации Callable.

public class CallableImpl implements Callable<Integer> {
@Override
public Integer call() throws Exception {
     try {
        while(true) {               
            // do something

            if(Thread.currentThread().isInterrupted()) {
                System.out.println("detected interrupt flag");
                break;
            }
        }
    }
    catch(InterruptedException ie) {
        System.out.println("interrupted");
    }

@Hovercraft, вероятно, прав, если вы кодируете приложение Swing, то SwingWorker - это то, что вы хотите использовать.

Если вы используете JButton, с вашим ExecutorServiceтогда вам, вероятно, следует создать новый поток и выпустить поток обработки событий (EDT):

button.setActionListener(new Action() {
  public void actionPerformed(ActionEvent e) {
    button.setEnabled(false); // disable the button 
    new Thread(new Runnable() {
      public void run() {
        ... your code ...
        button.setEnabled(true);
      }
    }).start();
  } 
});

Как сказал Ховеркрафт, полный угрей, awaitTermination это операция блокировки

А в случае с Swing вы, вероятно, делаете это в действии (как в моем примере), и вы блокируете EDT от выполнения различных операций, таких как реагирование на ввод пользователя:)

Примечание: ExecutorService имеет хороший invokeAll, который может оказаться немного полезным, чем при использовании awaitTermination, Это также блокирует, и вам все равно придется делать свои вещи в другом потоке, если это требуется.

Другие вопросы по тегам