Перехват исключений потока из Java ExecutorService

Я работаю над структурой разработки программного обеспечения для параллельных вычислений http://javaseis.org/. Мне нужен надежный механизм для сообщения об исключениях потоков. Во время разработки знание того, откуда происходят исключения, имеет большое значение, поэтому я хотел бы ошибиться в части завышенной отчетности. Я также хотел бы иметь возможность обрабатывать тестирование Junit4 в потоках. Разумный подход ниже или есть лучший способ?

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestThreadFailure {

  public static void main(String[] args) {
    int size = 1;
    ExecutorService exec = Executors.newFixedThreadPool(size);
    ThreadFailTask worker = new ThreadFailTask();
    Future<Integer> result = exec.submit(worker);
    try {
      Integer value = result.get();
      System.out.println("Result: " + value);
    } catch (Throwable t) {
      System.out.println("Caught failure: " + t.toString());
      exec.shutdownNow();
      System.out.println("Stack Trace:");
      t.printStackTrace();
      return;
    }
    throw new RuntimeException("Did not catch failure !!");
  }

  public static class ThreadFailTask implements Callable<Integer> {
    @Override
    public Integer call() {
      int nbuf = 65536;
      double[][] buf = new double[nbuf][nbuf];
      return new Integer((int) buf[0][0]);
    }
  }
}

6 ответов

Решение

Я не верю, что есть стандартные "крючки", чтобы добраться до этих исключений при использовании submit(), Однако, если вам нужно поддержать submit() (что звучит разумно, учитывая, что вы используете Callable), вы всегда можете обернуть Callables и Runnables:

ExecutorService executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()) {
    @Override
    public <T> Future<T> submit(final Callable<T> task) {
        Callable<T> wrappedTask = new Callable<T>() {
            @Override
            public T call() throws Exception {
                try {
                    return task.call();
                }
                catch (Exception e) {
                    System.out.println("Oh boy, something broke!");
                    e.printStackTrace();
                    throw e;
                }
            }
        };

        return super.submit(wrappedTask);
    }
};

Конечно, этот метод работает, только если вы один ExecutorService на первом месте. Кроме того, не забудьте переопределить все три submit() варианты.

Рассмотреть вопрос о звонке execute() вместо submit() на ExecutorService, Thread вызывается с execute() вызовет Thread.UncaughtExceptionHandler когда это не удается.

Просто сделай ThreadFactory который устанавливает Thread.UncaughtExceptionHandler на все Threads а затем вызвать вашу работу с execute() на ExecutorService вместо submit(),

Посмотрите на этот связанный вопрос переполнения стека.

Как объясняется в этом потоке В чем разница между методом submit и execute с ThreadPoolExecutor, использование execute будет работать только в том случае, если вы реализуете Runnable, а не Callable, поскольку execute не может вернуть Future.

Я думаю, что в вашем сценарии вы должны построить будущий объект, чтобы он также мог приспособиться к исключениям. Таким образом, в случае исключения вы создаете объект сообщения об ошибке.

Мой первоначальный вопрос спрашивал, как реализовать "надежную" обработку исключений потоков с помощью Java ExecutorService. Спасибо Анджело и Грегу за указатели на то, как обработка исключений работает с ExecutorService.submit () и Future.get (). Мой исправленный фрагмент кода показан ниже. Ключевой момент, который я здесь узнал, заключается в том, что Future.get () перехватывает все исключения. Если поток был прерван или отменен, вы получите соответствующее исключение, в противном случае исключение будет упаковано и повторно выброшено как ExecutionException.

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
импорт java.util.concurrent.Executors;
import java.util.concurrent.Future;

открытый класс TestThreadFailure {

  public static void main(String[] args) {
    int size = 1;
    ExecutorService exec = Executors.newFixedThreadPool(size);
    ThreadFailTask ​​worker = новый ThreadFailTask ​​();
    Будущий результат = exec.submit(работник);
    пытаться {
      Целочисленное значение = result.get();
      System.out.println("Результат: " + значение);
    } catch (ExecutionException ex) {
      System.out.println("Пойманный сбой: " + ex.toString());
      exec.shutdownNow();
      вернуть;
    } catch (InterruptedException iex) {
      System.out.println("Поток прерван: " + iex.toString());
    } catch (CancellationException cex) {
      System.out.println("Поток отменен: " + cex.toString());
    }
    exec.shutdownNow();
    бросить новое RuntimeException("Не перехватил сбой!!");
  }

  открытый статический класс ThreadFailTask ​​реализует Callable {
    @Override
    public Integer call() {
      int nbuf = 65536;
      double[][] buf = new double[nbuf][nbuf];
      вернуть новое целое число ((int) buf[0][0]);
    }
  }
}

Мне не очень повезло с другими ответами, потому что мне нужен был сам экземпляр исключительной ситуации, а не просто отпечаток стека. Для меня принят принятый ответ с участием ThreadPoolExecutor#afterExecute() на вопрос " Почему UncaughtExceptionHandler не вызывается ExecutorService?".

Смотрите следующий пример кода:

List<Runnable> tasks = new LinkedList<>();
for (int i = 0; i < numThreads; ++i) {
    Runnable task = new Runnable() {
        @Override
        public void run() {
            throw new RuntimeException();
        }
    };

    tasks.add(task);
}

Optional<Throwable> opEmpty = Optional.empty();
/*
 * Use AtomicReference as a means of capturing the first thrown exception, since a
 * spawned thread can't "throw" an exception to the parent thread.
 */
final AtomicReference<Optional<Throwable>> firstThrownException =
        new AtomicReference<>(opEmpty);

/*
 * Use new ThreadPoolExecutor instead of Executors.newFixedThreadPool() so
 * that I can override afterExecute() for the purposes of throwing an
 * exception from the test thread if a child thread fails.
 */
ExecutorService execSvc = new ThreadPoolExecutor(numThreads, numThreads,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) {
    @Override
    public void afterExecute(Runnable task, Throwable failureCause) {
        if(failureCause == null) {
            // The Runnable completed successfully.
            return;
        }
        // only sets the first exception because it will only be empty on the first call.
        firstThrownException.compareAndSet(Optional.<Throwable>empty(), Optional.of(failureCause));
    }
};

for (Runnable task : tasks) {
    execSvc.execute(task);
}
execSvc.shutdown();
execSvc.awaitTermination(1, TimeUnit.HOURS);

assertEquals(firstThrownException.get(), Optional.empty());

Для обработки исключений в ExecutorService вы должны использовать преимущества Callable и Future.

Пожалуйста, смотрите видео ниже для более подробной информации. Надеюсь, это поможет вам.

ВИДЕО: вызываемое и будущее (11 минут)

Другие вопросы по тегам