Как преобразовать Reactor Flux<String> в InputStream

Учитывая, что у меня есть Flux<String> неизвестного размера, как я могу преобразовать его в InputStream что другая библиотека ожидает?

Например, с помощью WebClient я могу добиться этого, используя этот подход

WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }

но я не могу понять, как сделать то же самое, когда у меня есть Flux<String> как вход?

2 ответа

Решение

Вероятно, есть много способов сделать это. Одной из возможностей является использование PipedInputStream и PipedOutputStream.

Это работает так, что вы связываете выходной поток с входным потоком так, что все, что вы записываете в выходной поток, может быть прочитано из связанного входного потока, делая это, создавая канал между двумя из них.

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);

Существует одно предупреждение, хотя, согласно документации по потоковым каналам, процесс записи и процесс чтения должны происходить в отдельных потоках, в противном случае мы можем вызвать взаимоблокировку.

Итак, возвращаясь к нашему сценарию реактивного потока, мы можем создать конвейер (как упомянуто выше) и подписаться на Flux объект и данные, которые вы получаете от него, вы записываете его в поточный поток вывода. Все, что вы там напишите, будет доступно для чтения на другой стороне канала, в соответствующем входном потоке. Этот входной поток является тем, которым вы можете поделиться с нереактивным методом.

Мы просто должны быть очень осторожны, чтобы подписаться на Flux в отдельном потоке, например subscribeOn(Schedulers.elastic()),

Вот очень простая реализация такого подписчика:

class PipedStreamSubscriber extends BaseSubscriber<byte[]> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final PipedInputStream in;
    private PipedOutputStream out;

    PipedStreamSubscriber(PipedInputStream in) {
        Objects.requireNonNull(in, "The input stream must not be null");
        this.in = in;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        //change if you want to control back-pressure
        super.hookOnSubscribe(subscription);
        try {
            this.out = new PipedOutputStream(in);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnNext(byte[] payload) {
        try {
            out.write(payload);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnComplete() {
        close();
    }

    @Override
    protected void hookOnError(Throwable error) {
        //TODO handle the error or at least log it
        logger.error("Failure processing stream", error);
        close();
    }

    @Override
    protected void hookOnCancel() {
        close();
    }

    private void close() {
        try {
            if (out != null) {
                out.close();
            }
        } catch (IOException e) {
            //probably just ignore this one or simply  log it
        }
    }
}

И используя этого подписчика, я мог бы определить очень простой служебный метод, который превратил Flux<byte[] в InputStreamнесколько следующим образом:

static InputStream createInputStream(Flux<byte[]> flux) {

    PipedInputStream in = new PipedInputStream();
    flux.subscribeOn(Schedulers.elastic())
        .subscribe(new PipedStreamSubscriber(in));

    return in;
}

Обратите внимание, что я был очень осторожен, чтобы закрыть поток вывода, когда поток завершен, когда произошла ошибка или подписка отменена, в противном случае мы рискуем заблокировать на стороне чтения, ожидая получения дополнительных входных данных. Закрытие выходного потока - это то, что сигнализирует об окончании входного потока на другой стороне канала.

И теперь этот InputStream может использоваться как любой обычный поток, и поэтому вы можете передать его нереактивному методу, например

Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);

try (InputStream in = createInputStream(jedi)) {
    byte[] data = new byte[5];
    int size = 0;
    while ((size = in.read(data)) > 0) {
        System.out.printf("%s", new String(data, 0, size));
    }
} 

Код выше дает:

Luke
Obi-Wan
Yoda

Ответ Эдвина не помог мне, поскольку ошибки в восходящем потоке были проглочены подписчиком и не распространялись на потребителя InputStream. Тем не менее, вдохновленный ответом Эдвина, я нашел другое решение. Вот пример использования Flux<ByteArray> и передав его как InputStreamвниз по течению. Пример включает расшифровку, чтобы выделить возможность манипулирования OutputStream даже после Flux<ByteStream> был полностью израсходован, что в конечном итоге привело к ошибке, которая распространяется ниже по потоку.

fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
    val inputStream = PipedInputStream()
    val outputStream = PipedOutputStream(inputStream)
    val isStreamEmitted = AtomicBoolean(false)
    
    return flux.handle<InputStream> { byteArray, sink ->
        try {
            outputStream.write(cipher.update(byteArray))
            // emit the input stream as soon as we get the first chunk of bytes
            // make sure we do it only once
            if (!isStreamEmitted.getAndSet(true)) {
                sink.next(inputStream)
            }
        } catch (e: Exception) {
            // catch all errors to pass them to the sink
            sink.error(e)
        }
    }.doOnComplete { 
        // here we have a last chance to throw an error  
        outputStream.write(cipher.doFinal())
    }.doOnTerminate {
        // error thrown here won't get propagated downstream
        // since this callback is triggered after flux's completion 
        outputStream.flush()
        outputStream.close()
    }
}

Уловка здесь в том, чтобы использовать handle оператор для создания Fluxкоторый излучает не более одного элемента. в отличие Mono в Fluxне будет прекращено сразу после первой эмиссии. Хотя он больше не будет излучать элементы, он остается "открытым", чтобы выдать возможную ошибку, которая возникает после первого выброса.

Ниже приводится пример использования Flux<InputStream> и превращая его в Mono.

fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
    decryptAndGetInputStream(flux, cipher)
        // the following operator gets called at most once
        .flatMap { inputStream ->
            // wrap the blocking operation into mono
            // subscribed on another thread to avoid deadlocks
            Mono.fromCallable { 
                processInputStream(inputStream)
            }.subscribeOn(Schedulers.elastic())
        // to get mono out of flux we implement reduce operator
        // although it gets never called
        }.reduce { t, _ -> t }

Еще одним преимуществом здесь является то, что поток, потребляющий InputStream, не будет блокироваться, пока не станет доступен первый фрагмент данных.

Я просто написал свой собственный служебный класс для этой цели.

Смотрите JinahyaResponseSpecUtils

Класс предоставляет в основном два способа потребления тела эффективным способом памяти. Один использует файлы, а другой использует каналы.

Вот два метода испытаний для использования.

@MethodSource({"sourceResponseSpecBodyToFluxOfDataBuffers"})
@ParameterizedTest
public void testPipeBodyToStreamAndApply(final WebClient.ResponseSpec responseSpec) throws IOException {
    final int pipeSize = 65536;
    final long sum = JinahyaResponseSpecUtils.pipeBodyToStreamAndApply(
            pipeSize,
            responseSpec,
            newFixedThreadPool(1),
            () -> null,
            (s, u) -> {
                final LongAdder adder = new LongAdder();
                final byte[] b = new byte[pipeSize];
                try {
                    for (int r; (r = s.read(b)) != -1; adder.add(r)) ;
                    return adder.sum();
                } catch (final IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
    );
    assertEquals(DATA_BUFFER_CAPACITY * DATA_BUFFER_COUNT, sum);
}


@MethodSource({"sourceResponseSpecBodyToFluxOfDataBuffers"})
@ParameterizedTest
public void testPipeBodyToChannelAndApply(final WebClient.ResponseSpec responseSpec) throws IOException {
    final long sum = JinahyaResponseSpecUtils.pipeBodyToChannelAndApply(
            responseSpec,
            newFixedThreadPool(1),
            () -> null,
            (c, u) -> {
                final LongAdder adder = new LongAdder();
                try {
                    for (final ByteBuffer b = allocate(65536); c.read(b) != -1; b.clear()) {
                        adder.add(b.position());
                    }
                } catch (final IOException ioe) {
                    throw new RuntimeException(ioe);
                }
                return adder.sum();
            }
    );
    assertEquals(DATA_BUFFER_CAPACITY * DATA_BUFFER_COUNT, sum);
}

Вы можете конвертировать Flux<String> известного размера в Mono<byte[]> который в свою очередь может быть использован для формирования InputStream, Проверьте это (на Java):

Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
                   (baos, str) -> {
                       try {
                           baos.write(str.getBytes());
                       } catch (IOException e) {
                           // do nothing
                       }
                   })
          .map(baos -> new ByteArrayInputStream(baos.toByteArray()))
          .map(inputStream -> ... // call other library);

Это требует простуды Flux<T> как collect() будет запущен, когда Flux выполнен. Для Flux<T> неизвестного размера (и при условии, что каждый String это отдельный объект), это становится еще проще:

Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
          .map(inputStream -> ... // call other library);

Вы можете уменьшить Flux<DataBuffer> в Mono<DataBuffer>, а затем перейти к InputStream,

Пример кода о загрузке файла в GridFs в WebFlux:

    private GridFsTemplate gridFsTemplate;

    public Mono<String> storeFile(FilePart filePart) {
        HttpHeaders headers = filePart.headers();
        String contentType = Objects.requireNonNull(headers.getContentType()).toString();

        return filePart.content()
                .reduce(DataBuffer::write).map(DataBuffer::asInputStream)
                .map(input -> gridFsTemplate.store(input, filePart.filename(), contentType))
                .map(ObjectId::toHexString);
    }
Другие вопросы по тегам