Понимание Java ExecutorService
Я пытаюсь узнать, как использовать executorservice Java,
Я читал следующее обсуждение Java поток простой очереди
В этом есть пример
ExecutorService service = Executors.newFixedThreadPool(10);
// now submit our jobs
service.submit(new Runnable() {
public void run() {
do_some_work();
}
});
// you can submit any number of jobs and the 10 threads will work on them
// in order
...
// when no more to submit, call shutdown
service.shutdown();
// now wait for the jobs to finish
service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
Я попытался реализовать это решение, для этого я создал одну форму и поместил кнопки "Пуск" и "Стоп", но проблема, с которой я сталкиваюсь, заключается в том, что если я вызову этот процесс при нажатии кнопки "Пуск", он повесит полную форму, и нам нужно подождать, пока весь процесс выполнен.
Я также попытался прочитать следующий https://www3.ntu.edu.sg/home/ehchua/programming/java/J5e_multithreading.html
но до сих пор я не могу понять, как заставить его работать, так как после нажатия кнопки "Пуск" я должен получить доступ обратно, предположим, что я хочу остановить процесс.
Может кто-нибудь, пожалуйста, направьте меня в правильном направлении.
Спасибо
Чтобы сделать мою ситуацию более понятной, я добавляю код, который я тестирую.
Проблемы
1) заполненная форма остается замороженной при выполнении программы. 2) Progressbar не работает, будет отображать статус только после завершения всего процесса.
private void btnStartActionPerformed(java.awt.event.ActionEvent evt) {
TestConneciton();
}
private void btnStopActionPerformed(java.awt.event.ActionEvent evt) {
flgStop = true;
}
private static final int MYTHREADS = 30;
private boolean flgStop = false;
public void TestConneciton() {
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS);
String[] hostList = { "http://crunchify.com", "http://yahoo.com",
"http://www.ebay.com", "http://google.com",
"http://www.example.co", "https://paypal.com",
"http://bing.com/", "http://techcrunch.com/",
"http://mashable.com/", "http://thenextweb.com/",
"http://wordpress.com/", "http://wordpress.org/",
"http://example.com/", "http://sjsu.edu/",
"http://ebay.co.uk/", "http://google.co.uk/",
"http://www.wikipedia.org/",
"http://en.wikipedia.org/wiki/Main_Page" };
pbarStatus.setMaximum(hostList.length-1);
pbarStatus.setValue(0);
for (int i = 0; i < hostList.length; i++) {
String url = hostList[i];
Runnable worker = new MyRunnable(url);
executor.execute(worker);
}
executor.shutdown();
// Wait until all threads are finish
// while (!executor.isTerminated()) {
//
// }
System.out.println("\nFinished all threads");
}
public class MyRunnable implements Runnable {
private final String url;
MyRunnable(String url) {
this.url = url;
}
@Override
public void run() {
String result = "";
int code = 200;
try {
if(flgStop == true)
{
//Stop thread execution
}
URL siteURL = new URL(url);
HttpURLConnection connection = (HttpURLConnection) siteURL
.openConnection();
connection.setRequestMethod("GET");
connection.connect();
code = connection.getResponseCode();
pbarStatus.setValue(pbarStatus.getValue()+1);
if (code == 200) {
result = "Green\t";
}
} catch (Exception e) {
result = "->Red<-\t";
}
System.out.println(url + "\t\tStatus:" + result);
}
}
3 ответа
Согласно API ExecutorService, это блокирует:
service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
API цитата:
Блокируется до тех пор, пока все задачи не завершат выполнение после запроса на выключение, или не истечет время ожидания, или текущий поток не прервется, в зависимости от того, что произойдет раньше.
Если вы не хотите, чтобы это блокировало ваш текущий поток, то, возможно, вам следует вызвать его в другом потоке. Кроме того, у вас есть приложение Swing, а затем подумайте об использовании SwingWorker, который, как я считаю, использует ExecutorServices "под капотом".
Основываясь на вашем последнем кусочке кода, я бы использовал
- SwingWorker для управления всеми фоновыми потоками.
- Я бы дал SwingWorker ExecutorService
- а также ExecutorCompletionService, который был инициализирован с ExecutorService, поскольку это позволило бы мне получать результаты задачи по мере их завершения
- Я бы заполнил его Callables, а не Runnables, так как это позволило бы задаче что-то вернуть, возможно, String, чтобы показать прогресс.
- Я бы установил свойство прогресса SwingWorker в (100 * taskCount) / totalTaskCount, и мой JProgressBar изменился бы с 0 на 100.
- Затем я бы использовал пары методов публикации / процесса SwingWorker для извлечения строк, возвращаемых вызываемым объектом.
- Я бы слушал прогресс SwingWorker в моем GUI с помощью PropertyChangeListener
- Затем внесите изменения в графический интерфейс этого слушателя.
- Я бы поменял
if (code == 200) {
вif (code == HttpURLConnection.HTTP_OK) {
избегать магических чисел. - Действие JButton отключит само себя, затем создаст новый объект SwingWorker, добавит ProperChangeListener работника к работнику, а затем выполнит работника.
Например
import java.awt.Font;
import java.awt.event.ActionEvent;
import java.awt.event.KeyEvent;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.DefaultListModel;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JList;
import javax.swing.JPanel;
import javax.swing.JProgressBar;
import javax.swing.JScrollPane;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;
@SuppressWarnings("serial")
public class SwingExecutorCompletionService extends JPanel {
public static final int MYTHREADS = 10;
private static final int LIST_PROTOTYPE_SIZE = 120;
private static final String LIST_PROTOTYPE_STRING = "%" + LIST_PROTOTYPE_SIZE + "s";
public static final String[] HOST_LIST = {
"http://crunchify.com",
"http://yahoo.com",
"http://www.ebay.com",
"http://google.com",
"http://www.example.co",
"https://paypal.com",
"http://bing.com/",
"http://techcrunch.com/",
"http://mashable.com/",
"http://thenextweb.com/",
"http://wordpress.com/",
"http://wordpress.org/",
"http://example.com/",
"http://sjsu.edu/",
"http://ebay.co.uk/",
"http://google.co.uk/",
"http://www.wikipedia.org/",
"http://en.wikipedia.org/wiki/Main_Page" };
private JProgressBar pbarStatus = new JProgressBar(0, 100);
private JButton doItButton = new JButton(new DoItAction("Do It", KeyEvent.VK_D));
private DefaultListModel<String> listModel = new DefaultListModel<>();
private JList<String> resultList = new JList<>(listModel);
public SwingExecutorCompletionService() {
resultList.setVisibleRowCount(10);
resultList.setPrototypeCellValue(String.format(LIST_PROTOTYPE_STRING, ""));
resultList.setFont(new Font(Font.MONOSPACED, Font.PLAIN, 12));
add(pbarStatus);
add(doItButton);
add(new JScrollPane(resultList, JScrollPane.VERTICAL_SCROLLBAR_ALWAYS,
JScrollPane.HORIZONTAL_SCROLLBAR_AS_NEEDED));
}
public void addToCompletionList(String element) {
listModel.addElement(element);
}
public void setStatusValue(int progress) {
pbarStatus.setValue(progress);
}
class DoItAction extends AbstractAction {
public DoItAction(String name, int mnemonic) {
super(name);
putValue(MNEMONIC_KEY, mnemonic);
}
@Override
public void actionPerformed(ActionEvent e) {
setEnabled(false);
DoItWorker worker = new DoItWorker(HOST_LIST, MYTHREADS);
SwingExecutorCompletionService gui = SwingExecutorCompletionService.this;
PropertyChangeListener workerListener = new WorkerChangeListener(gui, this);
worker.addPropertyChangeListener(workerListener);
worker.execute();
}
}
private static void createAndShowGui() {
SwingExecutorCompletionService mainPanel = new SwingExecutorCompletionService();
JFrame frame = new JFrame("Swing ExecutorCompletionService");
frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
frame.getContentPane().add(mainPanel);
frame.pack();
frame.setLocationByPlatform(true);
frame.setVisible(true);
}
public static void main(String[] args) {
SwingUtilities.invokeLater(new Runnable() {
public void run() {
createAndShowGui();
}
});
}
}
class MyCallable implements Callable<String> {
private static final String RED = "->Red<-";
private static final String GREEN = "Green";
private final String url;
private volatile boolean flgStop;
MyCallable(String url) {
this.url = url;
}
@Override
public String call() throws Exception {
String result = "";
int code = HttpURLConnection.HTTP_OK;
try {
// if(flgStop == true)
if (flgStop) {
// Stop thread execution
}
URL siteURL = new URL(url);
HttpURLConnection connection = (HttpURLConnection) siteURL
.openConnection();
connection.setRequestMethod("GET");
connection.connect();
code = connection.getResponseCode();
// No don't set the prog bar in a background thread!
// !! pbarStatus.setValue(pbarStatus.getValue()+1);
// avoid magic numbers
if (code == HttpURLConnection.HTTP_OK) {
result = GREEN;
}
} catch (Exception e) {
result = RED;
}
return String.format("%-40s %s", url + ":", result);
}
}
class WorkerChangeListener implements PropertyChangeListener {
private Action action;
private SwingExecutorCompletionService gui;
public WorkerChangeListener(SwingExecutorCompletionService gui, Action button) {
this.gui = gui;
this.action = button;
}
@Override
public void propertyChange(PropertyChangeEvent evt) {
DoItWorker worker = (DoItWorker)evt.getSource();
if (evt.getNewValue() == SwingWorker.StateValue.DONE) {
action.setEnabled(true);
try {
worker.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
} else if (DoItWorker.INTERMEDIATE_RESULT.equals(evt.getPropertyName())) {
gui.addToCompletionList(evt.getNewValue().toString());
} else if ("progress".equals(evt.getPropertyName())) {
gui.setStatusValue(worker.getProgress());
}
}
}
class DoItWorker extends SwingWorker<Void, String> {
public static final String INTERMEDIATE_RESULT = "intermediate result";
private static final long TIME_OUT = 5;
private static final TimeUnit UNIT = TimeUnit.MINUTES;
private String intermediateResult;
private ExecutorService executor;
private CompletionService<String> completionService;
private String[] hostList;
public DoItWorker(String[] hostList, int myThreads) {
this.hostList = hostList;
executor = Executors.newFixedThreadPool(myThreads);
completionService = new ExecutorCompletionService<>(executor);
}
@Override
protected Void doInBackground() throws Exception {
for (int i = 0; i < hostList.length; i++) {
String url = hostList[i];
Callable<String> callable = new MyCallable(url);
completionService.submit(callable);
}
executor.shutdown();
for (int i = 0; i < hostList.length; i++) {
String result = completionService.take().get();
publish(result);
int progress = (100 * i) / hostList.length;
setProgress(progress);
}
executor.awaitTermination(TIME_OUT, UNIT);
setProgress(100);
return null;
}
@Override
protected void process(List<String> chunks) {
for (String chunk : chunks) {
setIntermediateResult(chunk);
}
}
private void setIntermediateResult(String intermediateResult) {
String oldValue = this.intermediateResult;
String newValue = intermediateResult;
this.intermediateResult = intermediateResult;
firePropertyChange(INTERMEDIATE_RESULT, oldValue, newValue);
}
}
Который будет выглядеть и работать так:
Если вы хотите отменить уже запущенные вакансии, вам придется использовать Callable
вместо Runnable
, Когда вы отправляете работу, вы получаете Future
который вы можете вызвать отменить () на.
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
Callable<Integer> callable1 = new CallableImpl();
Future<Integer> future1 = es.submit(callable1);
// if you decide to cancel your task gracefully
future1.cancel()
...
Тогда вам решать, ThreadInterrupt
в вашей реализации Callable.
public class CallableImpl implements Callable<Integer> {
@Override
public Integer call() throws Exception {
try {
while(true) {
// do something
if(Thread.currentThread().isInterrupted()) {
System.out.println("detected interrupt flag");
break;
}
}
}
catch(InterruptedException ie) {
System.out.println("interrupted");
}
@Hovercraft, вероятно, прав, если вы кодируете приложение Swing, то SwingWorker - это то, что вы хотите использовать.
Если вы используете JButton
, с вашим ExecutorService
тогда вам, вероятно, следует создать новый поток и выпустить поток обработки событий (EDT):
button.setActionListener(new Action() {
public void actionPerformed(ActionEvent e) {
button.setEnabled(false); // disable the button
new Thread(new Runnable() {
public void run() {
... your code ...
button.setEnabled(true);
}
}).start();
}
});
Как сказал Ховеркрафт, полный угрей, awaitTermination
это операция блокировки
А в случае с Swing вы, вероятно, делаете это в действии (как в моем примере), и вы блокируете EDT от выполнения различных операций, таких как реагирование на ввод пользователя:)
Примечание: ExecutorService
имеет хороший invokeAll, который может оказаться немного полезным, чем при использовании awaitTermination
, Это также блокирует, и вам все равно придется делать свои вещи в другом потоке, если это требуется.