Это хороший способ использовать java.util.concurrent.FutureTask?
Прежде всего, я должен сказать, что я совершенно новичок в API java.util.concurrent, поэтому, возможно, то, что я делаю, совершенно неправильно.
Что я хочу сделать?
У меня есть Java-приложение, которое в основном выполняет 2 отдельные обработки (называемые myFirstProcess, mySecondProcess), но эта обработка должна выполняться одновременно.
Итак, я попытался сделать это:
public void startMyApplication() {
ExecutorService executor = Executors.newFixedThreadPool(2);
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess);
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess);
executor.execute(futureOne);
executor.execute(futureTwo);
while (!(futureOne.isDone() && futureTwo.isDone())) {
try {
// I wait until both processes are finished.
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("Processing finished");
executor.shutdown();
// Do some processing on results
...
}
myFirstProcess и mySecondProcess являются классами, которые реализуют Callable<Object>
и где вся их обработка выполняется в методе call().
Это работает довольно хорошо, но я не уверен, что это правильный способ сделать это. Это хороший способ сделать то, что я хочу? Если нет, можете ли вы дать мне несколько советов по улучшению моего кода (и при этом сохранить его как можно более простым).
6 ответов
Вам лучше использовать get()
метод.
futureOne.get();
futureTwo.get();
Оба из них ожидают уведомления от потока о том, что он завершил обработку, это экономит вам занятое время ожидания с таймером, которое вы сейчас используете, что не эффективно и не элегантно.
В качестве бонуса у вас есть API get(long timeout, TimeUnit unit)
который позволяет вам определить максимальное время, в течение которого поток спит и ожидает ответа, а в противном случае продолжает работать.
Смотрите Java API для получения дополнительной информации.
Использование FutureTask
Выше, терпимо, но определенно не идиоматично. Вы на самом деле обертывание дополнительных FutureTask
вокруг того, что вы представили ExecutorService
, Ваш FutureTask
рассматривается как Runnable
посредством ExecutorService
, Внутренне это оборачивает FutureTask
-как- Runnable
в новом FutureTask
и возвращает его вам как Future<?>
,
Вместо этого вы должны представить свой Callable<Object>
экземпляры к CompletionService
, Вы бросаете два Callable
через submit(Callable<V>)
, затем развернись и позвони CompletionService#take()
дважды (один раз для каждого представленного Callable
). Эти вызовы будут блокироваться до тех пор, пока одна и затем другие отправленные задачи не будут выполнены.
Учитывая, что у вас уже есть Executor
в руки, построить новый ExecutorCompletionService
вокруг него и бросить свои задачи там. Не крутись и не спи в ожидании; CompletionService#take()
будет блокироваться до тех пор, пока одна из ваших задач не будет завершена (либо завершена, либо завершена), либо поток, ожидающий take()
прерывается
Решение Ювала в порядке. В качестве альтернативы вы также можете сделать это:
ExecutorService executor = Executors.newFixedThreadPool();
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess);
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess);
executor.execute(futureOne);
executor.execute(futureTwo);
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// interrupted
}
В чем преимущество этого подхода? На самом деле нет большой разницы, за исключением того, что таким образом вы останавливаете исполнителя, который принимает больше задач (вы можете сделать это и другим способом). Я склонен предпочесть эту идиому этому.
Кроме того, если get() генерирует исключение, вы можете оказаться в той части кода, которая предполагает выполнение обеих задач, что может быть плохо.
Вы можете использовать метод invokeall(Colelction....)
package concurrent.threadPool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class InvokeAll {
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(5);
List<Future<java.lang.String>> futureList = service.invokeAll(Arrays.asList(new Task1<String>(),new Task2<String>()));
System.out.println(futureList.get(1).get());
System.out.println(futureList.get(0).get());
}
private static class Task1<String> implements Callable<String>{
@Override
public String call() throws Exception {
Thread.sleep(1000 * 10);
return (String) "1000 * 5";
}
}
private static class Task2<String> implements Callable<String>{
@Override
public String call() throws Exception {
Thread.sleep(1000 * 2);
int i=3;
if(i==3)
throw new RuntimeException("Its Wrong");
return (String) "1000 * 2";
}
}
}
Вы можете использовать CyclicBarrier, если вы заинтересованы в одновременном запуске потоков или в ожидании их завершения, а затем выполните дополнительную обработку. См. Javadoc для получения дополнительной информации.
Если ваши будущие задачи более 2, пожалуйста, подумайте [ListenableFuture][1]
,
Когда несколько операций должны начаться, как только начинается другая операция - " разветвление " - ListenableFuture просто работает: он запускает все запрошенные обратные вызовы. Чуть больше работы, мы можем "развернуть" или запустить ListenableFuture, чтобы вычислить, как только закончится несколько других фьючерсов.