Как я могу сделать исключение без проверки, выброшенное из потока для распространения в основной поток?

Я не могу найти способ (ни через SO, ни в отладке моего кода) разрешить распространение исключения, созданного из протектора, в основной поток. Я уже пытался использовать Thread.setUncaughtExceptionHandler() и используя CompletableFuture: .exceptionally(), .handle(),...

Дело в том, что с этими механизмами (как я отлаживаю) фактическая обработка выполняется в рабочем потоке, а не в основном потоке, и мне не удается передать его в основной поток.

Суть в том, что я пишу тест, и если в рабочем потоке возникает исключение, оно никогда не попадает в основной поток, в котором выполняется тест, заставляя тест пройти, даже если что-то пошло не так.

Мне нужно, чтобы это исключение вызывало асинхронность; Я не могу дождаться завершения будущего, так как мне нужно немедленно вернуть поток (PipedStream) без блокировки из основного шага.

Единственный совет, который я получаю, это ошибка консоли (и только когда я использую традиционный Thread + uncaughtExceptionHandler подход, нет журнала вообще, когда я пытаюсь с CompletableFuture):

Exception: com.example.exception.MyException thrown from the UncaughtExceptionHandler in thread "Thread-5"

или это если я не определяю обработчик исключений:

Exception in thread "Thread-5" com.example.MyException: Exception message

Я предоставляю некоторый код:

try {
    @SuppressWarnings("squid:S2095") final PipedOutputStream pos = new PipedOutputStream();
    final PipedInputStream pis = new PipedInputStream(pos);

    CompletableFuture.runAsync(pipeDecryptorRunnable(inputStream, pos));

    return pis;

} catch (IOException e) {
    throw new CryptographyException(e.getMessage(), e);
}

внутри pipeDecryptorRunnable является CipherStream, который расшифровывает данные Исключение там брошено. Но я не могу поймать его в главном потоке и становится невидимым. pisпоток, возвращаемый из этого метода, используется для чтения, а рабочий поток дешифрует данные на лету в процессе чтения. pis,

РЕДАКТИРОВАТЬ:UncaughtExceptionHandler поскольку ответы в похожих вопросах показывают, что это не работает для моего сценария, так как код обработчика вызывается рабочим потоком, а не основным.

1 ответ

Решение

Благодаря подсказке @RalfKleberhoff я пришел с решением, которое искал. Дело в том, что для достижения желаемого поведения необходим механизм связи между потоками. И учитывая, что я уже работал с PipedStreams, я могу использовать его для достижения цели - я также подумал о каком-то решении, включающем в себя шину событий для передачи сигналов или обмена данными из одного потока в другой (основной поток / рабочий поток), и я думаю, что некоторые библиотеки шины событий могут достичь этого как Что ж.

Итак, вернемся к потоковым каналам у меня был pis читать в основной ветке и связать ее pos написать в рабочем потоке. Так что, когда в работнике возникло исключение, мне нужен главный поток, чтобы это заметить.

Для достижения этого вы можете продлить PipedOutputStream класс, добавив метод для оповещения подключенного канала, когда происходит исключение. Таким же образом, вам нужно расширить PipedInputStream чтобы получить сигнал об исключении, сохраните исключение и переопределите методы чтения, чтобы проверить, произошло ли исключение первым, и, в этом случае, выбросить исключение, заключенное в IOException метода чтения.

Вот код:

/**
 * This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
 * thread to reach the reading thread.
 *
 * @author Gerard on 10/11/2017.
 */
public class ExceptionAwarePipedOutputStream extends PipedOutputStream {

    private final ExceptionAwarePipedInputStream sink;

    public ExceptionAwarePipedOutputStream(ExceptionAwarePipedInputStream sink) throws IOException {
        super(sink);
        this.sink = sink;
    }

    /**
     * Signals connected {@link ExceptionAwarePipedInputStream} that an exception ocurred allowing to propagate it
     * across respective threads. This works as inter thread communication mechanism. So it allows to the reading thread
     * notice that an exception was thrown in the writing thread.
     *
     * @param exception The exception to propagate.
     */
    public void signalException(Throwable exception) {
        sink.signalException(exception);
    }
}

·

/**
 * This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
 * thread to reach the reading thread.
 *
 * @author Gerard on 10/11/2017.
 */
public class ExceptionAwarePipedInputStream extends PipedInputStream {

    private volatile Throwable exception;

    void signalException(Throwable exception) {
        this.exception = exception;
    }

    @Override
    public int read(byte[] b) throws IOException {
        final int read = super.read(b);
        checkException();
        return read;
    }

    @Override
    public synchronized int read() throws IOException {
        final int read = super.read();
        checkException();
        return read;
    }

    @Override
    public synchronized int read(byte[] b, int off, int len) throws IOException {
        final int read = super.read(b, off, len);
        checkException();
        return read;
    }

    private void checkException() throws IOException {
        if (exception != null) {
            throw new IOException(exception.getMessage(), exception);
        }
    }
}

Код клиента:

public InputStream decrypt(InputStream inputStream) {

    assert supportedStreamModes.contains(mode) : "Unsupported cipher mode for stream decryption " + mode;

    @SuppressWarnings("squid:S2095") final ExceptionAwarePipedInputStream pis = new ExceptionAwarePipedInputStream();
    final ExceptionAwarePipedOutputStream pos = newConnectedPipedOutputStream(pis);
    final Cipher decryptor = newDecryptorInitialized(inputStream);

    CompletableFuture.runAsync(
        pipeDecryptorRunnable(inputStream, pos, decryptor));

    return pis;
}

private ExceptionAwarePipedOutputStream newConnectedPipedOutputStream(ExceptionAwarePipedInputStream pis) {
    try {
        return new ExceptionAwarePipedOutputStream(pis);
    } catch (IOException e) {
        throw new CryptographyException(e.getMessage(), e);
    }
}

И часть обработки исключений (обратите внимание на сигнализацию потока):

private Runnable pipeDecryptorRunnable(InputStream inputStream, ExceptionAwarePipedOutputStream pos, Cipher decryptor) {
    return () -> {
        try {

            // do stuff... and write to pos

        } catch (Exception e) {
            // Signaling any (checked or unchecked) exception
            pos.signalException(new CryptographyException(e.getMessage(), e));
        } finally {
            closePipedStream(pos);
        }
    };
}
Другие вопросы по тегам