Обработка исключений из задач Java ExecutorService
Я пытаюсь использовать Java ThreadPoolExecutor
Класс для запуска большого количества тяжеловесных задач с фиксированным количеством потоков. У каждой из задач есть много мест, в которых она может потерпеть неудачу из-за исключений.
Я подкласс ThreadPoolExecutor
и я переопределил afterExecute
метод, который должен обеспечить любые необработанные исключения, встречающиеся при выполнении задачи. Тем не менее, я не могу заставить его работать.
Например:
public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything's fine--situation normal!");
}
}
public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}
Вывод этой программы: "Все хорошо - ситуация нормальная!" даже если единственный Runnable, представленный в пул потоков, генерирует исключение. Любой ключ к тому, что здесь происходит?
Спасибо!
13 ответов
Из документов:
Примечание. Когда действия включены в задачи (например, FutureTask) либо явно, либо с помощью таких методов, как submit, эти объекты задач перехватывают и поддерживают вычислительные исключения, поэтому они не вызывают внезапного завершения, а внутренние исключения не передаются этому методу.,
Когда вы отправите Runnable, он будет помещен в будущее.
Ваш afterExecute должен быть примерно таким:
public final class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
System.out.println(t);
}
}
}
ВНИМАНИЕ: Следует отметить, что это решение заблокирует вызывающий поток.
Если вы хотите обрабатывать исключения, сгенерированные задачей, то лучше использовать Callable
скорее, чем Runnable
,
Callable.call()
разрешено выдавать проверенные исключения, и они передаются обратно вызывающему потоку:
Callable task = ...
Future future = executor.submit(task);
try {
future.get();
} catch (ExecutionException ex) {
ex.getCause().printStackTrace();
}
Если Callable.call()
выдает исключение, это будет завернуто в ExecutionException
и брошенный Future.get()
,
Это, вероятно, будет намного предпочтительнее, чем создание подклассов ThreadPoolExecutor
, Это также дает вам возможность повторно отправить задачу, если исключение является восстанавливаемым.
Объяснение этого поведения прямо в javadoc для afterExecute:
Примечание. Когда действия включены в задачи (например, FutureTask) либо явно, либо с помощью таких методов, как submit, эти объекты задач перехватывают и поддерживают вычислительные исключения, поэтому они не вызывают внезапного завершения, а внутренние исключения не передаются этому методу.,
Я обошел его, обернув прилагаемый runnable, представленный исполнителю.
CompletableFuture.runAsync(
() -> {
try {
runnable.run();
} catch (Throwable e) {
Log.info(Concurrency.class, "runAsync", e);
}
},
executorService
);
Я использую VerboseRunnable
класс из jcabi-log, который проглатывает все исключения и регистрирует их. Очень удобно, например:
import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
new VerboseRunnable(
Runnable() {
public void run() {
// the code, which may throw
}
},
true // it means that all exceptions will be swallowed and logged
),
1, 1, TimeUnit.MILLISECONDS
);
Другим решением будет использование ManagedTask и ManagedTaskListener.
Вам нужен Callable или Runnable, который реализует интерфейс ManagedTask.
Метод getManagedTaskListener
возвращает экземпляр, который вы хотите.
public ManagedTaskListener getManagedTaskListener() {
И вы реализуете в ManagedTaskListener свой taskDone
метод:
@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
if (exception != null) {
LOGGER.log(Level.SEVERE, exception.getMessage());
}
}
Подробнее о жизненном цикле управляемой задачи и слушателе.
Это работает
- Он является производным от SingleThreadExecutor, но вы можете легко адаптировать его
- Java 8 код lamdas, но легко исправить
Это создаст Исполнителя с одним потоком, который может получить много задач; и будет ждать, пока текущий завершит выполнение, чтобы начать со следующего
В случае ошибки или исключения uncaughtExceptionHandler отловит его
открытый финальный класс SingleThreadExecutorWithExceptions { public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { ThreadFactory factory = (Runnable runnable) -> {конечный поток newThread = new Thread(runnable, "SingleThreadExptions) newThread.setUncaughtExceptionHandler( (конечный поток caugthThread, последний Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException(caugthThread, throwable); }); вернуть newThread; }; вернуть новый FinalizableDelegatedExecutorService (новый ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, новый LinkedBlockingQueue(), factory){защищенный void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable = throwable); null && runnable instanceof Future) { try { Future future = (Future) runnable; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { throwable = ce; } catch (ExecutionException ee) { throwable = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (throwable!= null) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), бросаемый);}}}); } закрытый статический класс FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } } /** * Класс-оболочка, который предоставляет только методы ExecutorService * реализации ExecutorService. */ частный статический класс DelegatedExecutorService extends AbstractExecutorService {частный конечный ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(команда Runnable) { e.execute(команда); } public void shutdown() { e.shutdown(); } публичный список shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTeridity() {return e.isTeridity(); } public boolean awaitTermination (long timeout, TimeUnit unit) выдает InterruptedException { return e.awaitTermination (timeout, unit); } public Future submit (Runnable task) {return e.submit (task); } public Future submit (вызываемая задача) {return e.submit (task); } public Future submit (Runnable задача, T результат) {return e.submit (задача, результат); } public List> invokeAll (Collection> tasks) выдает InterruptedException { return e.invokeAll (tasks); } public List> invokeAll (Collection> tasks, long timeout, TimeUnit unit) выдает InterruptedException { return e.invokeAll (tasks, timeout, unit); } public T invokeAny(Collection> tasks) выдает InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) генерирует InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); }} private SingleThreadExecutorWithExceptions() {} }
Пример доктора не дал мне желаемых результатов.
Когда процесс Thread был заброшен (с явным
Также я хотел сохранить функциональность «System.exit», которую имеет обычный основной поток с типичным
Поэтому я изменил код, чтобы он соответствовал моим потребностям.
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
Future<?> future = (Future<?>) r;
boolean terminate = false;
try {
future.get();
} catch (ExecutionException e) {
terminate = true;
e.printStackTrace();
} catch (InterruptedException | CancellationException ie) {// ignore/reset
Thread.currentThread().interrupt();
} finally {
if (terminate) System.exit(0);
}
}
}
Однако будьте осторожны, этот код в основном преобразует ваши потоки в основной поток с точки зрения исключений, сохраняя при этом все свои параллельные свойства... Но давайте будем реальными, проектируя архитектуры в функции параллельного механизма системы (
Это похоже на решение ммм, но немного более понятно. Попросите ваши задачи расширить абстрактный класс, который является оболочкой для метода run().
public abstract Task implements Runnable {
public abstract void execute();
public void run() {
try {
execute();
} catch (Throwable t) {
// handle it
}
}
}
public MySampleTask extends Task {
public void execute() {
// heavy, error-prone code here
}
}
Если вы хотите контролировать выполнение задачи, вы можете вращать 1 или 2 потока (может быть, больше в зависимости от нагрузки) и использовать их для получения задач из оболочки ExecutionCompletionService.
Это из-за AbstractExecutorService :: submit
оборачивает ваш runnable
в RunnableFuture
(ничего кроме FutureTask
) как ниже
AbstractExecutorService.java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
execute(ftask);
return ftask;
}
затем execute
передам это Worker
а также Worker.run()
позвоню ниже.
ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); /////////HERE////////
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
в заключение
task.run();
в приведенном выше коде вызовFutureTask.run()
, Вот код обработчика исключений, поэтому вы НЕ получаете ожидаемое исключение.
class FutureTask<V> implements RunnableFuture<V>
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) { /////////HERE////////
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
Если твой ExecutorService
исходит из внешнего источника (то есть невозможно подкласс ThreadPoolExecutor
и переопределить afterExecute()
), вы можете использовать динамический прокси для достижения желаемого поведения:
public static ExecutorService errorAware(final ExecutorService executor) {
return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] {ExecutorService.class},
(proxy, method, args) -> {
if (method.getName().equals("submit")) {
final Object arg0 = args[0];
if (arg0 instanceof Runnable) {
args[0] = new Runnable() {
@Override
public void run() {
final Runnable task = (Runnable) arg0;
try {
task.run();
if (task instanceof Future<?>) {
final Future<?> future = (Future<?>) task;
if (future.isDone()) {
try {
future.get();
} catch (final CancellationException ce) {
// Your error-handling code here
ce.printStackTrace();
} catch (final ExecutionException ee) {
// Your error-handling code here
ee.getCause().printStackTrace();
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
} catch (final RuntimeException re) {
// Your error-handling code here
re.printStackTrace();
throw re;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
} else if (arg0 instanceof Callable<?>) {
args[0] = new Callable<Object>() {
@Override
public Object call() throws Exception {
final Callable<?> task = (Callable<?>) arg0;
try {
return task.call();
} catch (final Exception e) {
// Your error-handling code here
e.printStackTrace();
throw e;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
}
}
return method.invoke(executor, args);
});
}
Вместо того чтобы создавать подклассы ThreadPoolExecutor, я бы предоставил ему экземпляр ThreadFactory, который создает новые потоки и предоставляет им UncaughtExceptionHandler.