Обработка исключений из задач Java ExecutorService

Я пытаюсь использовать Java ThreadPoolExecutor Класс для запуска большого количества тяжеловесных задач с фиксированным количеством потоков. У каждой из задач есть много мест, в которых она может потерпеть неудачу из-за исключений.

Я подкласс ThreadPoolExecutor и я переопределил afterExecute метод, который должен обеспечить любые необработанные исключения, встречающиеся при выполнении задачи. Тем не менее, я не могу заставить его работать.

Например:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

Вывод этой программы: "Все хорошо - ситуация нормальная!" даже если единственный Runnable, представленный в пул потоков, генерирует исключение. Любой ключ к тому, что здесь происходит?

Спасибо!

13 ответов

Решение

Из документов:

Примечание. Когда действия включены в задачи (например, FutureTask) либо явно, либо с помощью таких методов, как submit, эти объекты задач перехватывают и поддерживают вычислительные исключения, поэтому они не вызывают внезапного завершения, а внутренние исключения не передаются этому методу.,

Когда вы отправите Runnable, он будет помещен в будущее.

Ваш afterExecute должен быть примерно таким:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

ВНИМАНИЕ: Следует отметить, что это решение заблокирует вызывающий поток.


Если вы хотите обрабатывать исключения, сгенерированные задачей, то лучше использовать Callable скорее, чем Runnable,

Callable.call() разрешено выдавать проверенные исключения, и они передаются обратно вызывающему потоку:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Если Callable.call() выдает исключение, это будет завернуто в ExecutionException и брошенный Future.get(),

Это, вероятно, будет намного предпочтительнее, чем создание подклассов ThreadPoolExecutor, Это также дает вам возможность повторно отправить задачу, если исключение является восстанавливаемым.

Объяснение этого поведения прямо в javadoc для afterExecute:

Примечание. Когда действия включены в задачи (например, FutureTask) либо явно, либо с помощью таких методов, как submit, эти объекты задач перехватывают и поддерживают вычислительные исключения, поэтому они не вызывают внезапного завершения, а внутренние исключения не передаются этому методу.,

Я обошел его, обернув прилагаемый runnable, представленный исполнителю.

CompletableFuture.runAsync(

        () -> {
                try {
                        runnable.run();
                } catch (Throwable e) {
                        Log.info(Concurrency.class, "runAsync", e);
                }
        },

        executorService
);

Я использую VerboseRunnable класс из jcabi-log, который проглатывает все исключения и регистрирует их. Очень удобно, например:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

Другим решением будет использование ManagedTask и ManagedTaskListener.

Вам нужен Callable или Runnable, который реализует интерфейс ManagedTask.

Метод getManagedTaskListener возвращает экземпляр, который вы хотите.

public ManagedTaskListener getManagedTaskListener() {

И вы реализуете в ManagedTaskListener свой taskDone метод:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Подробнее о жизненном цикле управляемой задачи и слушателе.

Это работает

  • Он является производным от SingleThreadExecutor, но вы можете легко адаптировать его
  • Java 8 код lamdas, но легко исправить

Это создаст Исполнителя с одним потоком, который может получить много задач; и будет ждать, пока текущий завершит выполнение, чтобы начать со следующего

В случае ошибки или исключения uncaughtExceptionHandler отловит его

открытый финальный класс SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable)  -> {конечный поток newThread = new Thread(runnable, "SingleThreadExptions) newThread.setUncaughtExceptionHandler( (конечный поток caugthThread, последний Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
            }); вернуть newThread; }; вернуть новый FinalizableDelegatedExecutorService
                (новый ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS, новый LinkedBlockingQueue(),
                        factory){защищенный void afterExecute(Runnable runnable, Throwable throwable) {
                        super.afterExecute(runnable, throwable = throwable); null && runnable instanceof Future) {
                            try {
                                Future future = (Future) runnable;
                                if (future.isDone()) {
                                    future.get();
                                }
                            } catch (CancellationException ce) {
                                throwable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause();
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt(); // ignore/reset
                            }
                        }
                        if (throwable!= null) {
                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), бросаемый);}}}); } закрытый статический класс FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

    /**
     * Класс-оболочка, который предоставляет только методы ExecutorService * реализации ExecutorService.
     */ частный статический класс DelegatedExecutorService extends AbstractExecutorService {частный конечный ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(команда Runnable) { e.execute(команда); }
        public void shutdown() { e.shutdown(); } публичный список shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTeridity() {return e.isTeridity(); } public boolean awaitTermination (long timeout, TimeUnit unit) выдает InterruptedException {
            return e.awaitTermination (timeout, unit);
        } public Future submit (Runnable task) {return e.submit (task); } public Future submit (вызываемая задача) {return e.submit (task); } public Future submit (Runnable задача, T результат) {return e.submit (задача, результат); } public List> invokeAll (Collection> tasks) выдает InterruptedException {
            return e.invokeAll (tasks); } public List> invokeAll (Collection> tasks, long timeout, TimeUnit unit) выдает InterruptedException {
            return e.invokeAll (tasks, timeout, unit);
        } public T invokeAny(Collection> tasks) выдает InterruptedException, ExecutionException {
            return e.invokeAny(tasks); } public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) генерирует InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }}



    private SingleThreadExecutorWithExceptions() {}
} 

Пример доктора не дал мне желаемых результатов.

Когда процесс Thread был заброшен (с явнымs) Появлялись исключения.

Также я хотел сохранить функциональность «System.exit», которую имеет обычный основной поток с типичным, я хотел, чтобы программист не был вынужден работать над кодом, беспокоясь о его контексте (... потоке). Если появляется какая-либо ошибка, это должна быть либо ошибка программирования, либо дело должно быть решено в место с ручной защелкой... на самом деле нет необходимости в чрезмерных сложностях.

Поэтому я изменил код, чтобы он соответствовал моим потребностям.

          @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
        super.afterExecute(r, t); 
        if (t == null && r instanceof Future<?>) { 
            Future<?> future = (Future<?>) r; 
            boolean terminate = false; 
                try { 
                    future.get(); 
                } catch (ExecutionException e) { 
                    terminate = true; 
                    e.printStackTrace(); 
                } catch (InterruptedException | CancellationException ie) {// ignore/reset 
                    Thread.currentThread().interrupt(); 
                } finally { 
                    if (terminate) System.exit(0); 
                } 
        } 
    }

Однако будьте осторожны, этот код в основном преобразует ваши потоки в основной поток с точки зрения исключений, сохраняя при этом все свои параллельные свойства... Но давайте будем реальными, проектируя архитектуры в функции параллельного механизма системы () является неправильным подходом IMHO... если строго не требуется дизайн, управляемый событиями.... но тогда... если это требование, возникает вопрос: нужен ли ExecutorService вообще в этом случае?... может быть, нет.

Это похоже на решение ммм, но немного более понятно. Попросите ваши задачи расширить абстрактный класс, который является оболочкой для метода run().

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}

Если вы хотите контролировать выполнение задачи, вы можете вращать 1 или 2 потока (может быть, больше в зависимости от нагрузки) и использовать их для получения задач из оболочки ExecutionCompletionService.

Это из-за AbstractExecutorService :: submit оборачивает ваш runnable в RunnableFuture (ничего кроме FutureTask) как ниже

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

затем execute передам это Worker а также Worker.run() позвоню ниже.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

в заключение task.run(); в приведенном выше коде вызов FutureTask.run(), Вот код обработчика исключений, поэтому вы НЕ получаете ожидаемое исключение.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Если твой ExecutorService исходит из внешнего источника (то есть невозможно подкласс ThreadPoolExecutor и переопределить afterExecute()), вы можете использовать динамический прокси для достижения желаемого поведения:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}

Вместо того чтобы создавать подклассы ThreadPoolExecutor, я бы предоставил ему экземпляр ThreadFactory, который создает новые потоки и предоставляет им UncaughtExceptionHandler.

Другие вопросы по тегам