Как преобразовать Reactor Flux<String> в InputStream
Учитывая, что у меня есть Flux<String>
неизвестного размера, как я могу преобразовать его в InputStream
что другая библиотека ожидает?
Например, с помощью WebClient я могу добиться этого, используя этот подход
WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }
но я не могу понять, как сделать то же самое, когда у меня есть Flux<String>
как вход?
2 ответа
Вероятно, есть много способов сделать это. Одной из возможностей является использование PipedInputStream и PipedOutputStream.
Это работает так, что вы связываете выходной поток с входным потоком так, что все, что вы записываете в выходной поток, может быть прочитано из связанного входного потока, делая это, создавая канал между двумя из них.
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);
Существует одно предупреждение, хотя, согласно документации по потоковым каналам, процесс записи и процесс чтения должны происходить в отдельных потоках, в противном случае мы можем вызвать взаимоблокировку.
Итак, возвращаясь к нашему сценарию реактивного потока, мы можем создать конвейер (как упомянуто выше) и подписаться на Flux
объект и данные, которые вы получаете от него, вы записываете его в поточный поток вывода. Все, что вы там напишите, будет доступно для чтения на другой стороне канала, в соответствующем входном потоке. Этот входной поток является тем, которым вы можете поделиться с нереактивным методом.
Мы просто должны быть очень осторожны, чтобы подписаться на Flux в отдельном потоке, например subscribeOn(Schedulers.elastic())
,
Вот очень простая реализация такого подписчика:
class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final PipedInputStream in;
private PipedOutputStream out;
PipedStreamSubscriber(PipedInputStream in) {
Objects.requireNonNull(in, "The input stream must not be null");
this.in = in;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
//change if you want to control back-pressure
super.hookOnSubscribe(subscription);
try {
this.out = new PipedOutputStream(in);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnNext(byte[] payload) {
try {
out.write(payload);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnComplete() {
close();
}
@Override
protected void hookOnError(Throwable error) {
//TODO handle the error or at least log it
logger.error("Failure processing stream", error);
close();
}
@Override
protected void hookOnCancel() {
close();
}
private void close() {
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
//probably just ignore this one or simply log it
}
}
}
И используя этого подписчика, я мог бы определить очень простой служебный метод, который превратил Flux<byte[]
в InputStream
несколько следующим образом:
static InputStream createInputStream(Flux<byte[]> flux) {
PipedInputStream in = new PipedInputStream();
flux.subscribeOn(Schedulers.elastic())
.subscribe(new PipedStreamSubscriber(in));
return in;
}
Обратите внимание, что я был очень осторожен, чтобы закрыть поток вывода, когда поток завершен, когда произошла ошибка или подписка отменена, в противном случае мы рискуем заблокировать на стороне чтения, ожидая получения дополнительных входных данных. Закрытие выходного потока - это то, что сигнализирует об окончании входного потока на другой стороне канала.
И теперь этот InputStream может использоваться как любой обычный поток, и поэтому вы можете передать его нереактивному методу, например
Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);
try (InputStream in = createInputStream(jedi)) {
byte[] data = new byte[5];
int size = 0;
while ((size = in.read(data)) > 0) {
System.out.printf("%s", new String(data, 0, size));
}
}
Код выше дает:
Luke
Obi-Wan
Yoda
Ответ Эдвина не помог мне, поскольку ошибки в восходящем потоке были проглочены подписчиком и не распространялись на потребителя
InputStream
. Тем не менее, вдохновленный ответом Эдвина, я нашел другое решение. Вот пример использования
Flux<ByteArray>
и передав его как
InputStream
вниз по течению. Пример включает расшифровку, чтобы выделить возможность манипулирования
OutputStream
даже после
Flux<ByteStream>
был полностью израсходован, что в конечном итоге привело к ошибке, которая распространяется ниже по потоку.
fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
val inputStream = PipedInputStream()
val outputStream = PipedOutputStream(inputStream)
val isStreamEmitted = AtomicBoolean(false)
return flux.handle<InputStream> { byteArray, sink ->
try {
outputStream.write(cipher.update(byteArray))
// emit the input stream as soon as we get the first chunk of bytes
// make sure we do it only once
if (!isStreamEmitted.getAndSet(true)) {
sink.next(inputStream)
}
} catch (e: Exception) {
// catch all errors to pass them to the sink
sink.error(e)
}
}.doOnComplete {
// here we have a last chance to throw an error
outputStream.write(cipher.doFinal())
}.doOnTerminate {
// error thrown here won't get propagated downstream
// since this callback is triggered after flux's completion
outputStream.flush()
outputStream.close()
}
}
Уловка здесь в том, чтобы использовать
handle
оператор для создания
Flux
который излучает не более одного элемента. в отличие
Mono
в
Flux
не будет прекращено сразу после первой эмиссии. Хотя он больше не будет излучать элементы, он остается "открытым", чтобы выдать возможную ошибку, которая возникает после первого выброса.
Ниже приводится пример использования
Flux<InputStream>
и превращая его в
Mono
.
fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
decryptAndGetInputStream(flux, cipher)
// the following operator gets called at most once
.flatMap { inputStream ->
// wrap the blocking operation into mono
// subscribed on another thread to avoid deadlocks
Mono.fromCallable {
processInputStream(inputStream)
}.subscribeOn(Schedulers.elastic())
// to get mono out of flux we implement reduce operator
// although it gets never called
}.reduce { t, _ -> t }
Еще одним преимуществом здесь является то, что поток, потребляющий InputStream, не будет блокироваться, пока не станет доступен первый фрагмент данных.
Я просто написал свой собственный служебный класс для этой цели.
Смотрите JinahyaResponseSpecUtils
Класс предоставляет в основном два способа потребления тела эффективным способом памяти. Один использует файлы, а другой использует каналы.
Вот два метода испытаний для использования.
@MethodSource({"sourceResponseSpecBodyToFluxOfDataBuffers"})
@ParameterizedTest
public void testPipeBodyToStreamAndApply(final WebClient.ResponseSpec responseSpec) throws IOException {
final int pipeSize = 65536;
final long sum = JinahyaResponseSpecUtils.pipeBodyToStreamAndApply(
pipeSize,
responseSpec,
newFixedThreadPool(1),
() -> null,
(s, u) -> {
final LongAdder adder = new LongAdder();
final byte[] b = new byte[pipeSize];
try {
for (int r; (r = s.read(b)) != -1; adder.add(r)) ;
return adder.sum();
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
}
);
assertEquals(DATA_BUFFER_CAPACITY * DATA_BUFFER_COUNT, sum);
}
@MethodSource({"sourceResponseSpecBodyToFluxOfDataBuffers"})
@ParameterizedTest
public void testPipeBodyToChannelAndApply(final WebClient.ResponseSpec responseSpec) throws IOException {
final long sum = JinahyaResponseSpecUtils.pipeBodyToChannelAndApply(
responseSpec,
newFixedThreadPool(1),
() -> null,
(c, u) -> {
final LongAdder adder = new LongAdder();
try {
for (final ByteBuffer b = allocate(65536); c.read(b) != -1; b.clear()) {
adder.add(b.position());
}
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
return adder.sum();
}
);
assertEquals(DATA_BUFFER_CAPACITY * DATA_BUFFER_COUNT, sum);
}
Вы можете конвертировать Flux<String>
известного размера в Mono<byte[]>
который в свою очередь может быть использован для формирования InputStream
, Проверьте это (на Java):
Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
(baos, str) -> {
try {
baos.write(str.getBytes());
} catch (IOException e) {
// do nothing
}
})
.map(baos -> new ByteArrayInputStream(baos.toByteArray()))
.map(inputStream -> ... // call other library);
Это требует простуды Flux<T>
как collect()
будет запущен, когда Flux
выполнен. Для Flux<T>
неизвестного размера (и при условии, что каждый String
это отдельный объект), это становится еще проще:
Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
.map(inputStream -> ... // call other library);
Вы можете уменьшить Flux<DataBuffer>
в Mono<DataBuffer>
, а затем перейти к InputStream
,
Пример кода о загрузке файла в GridFs в WebFlux:
private GridFsTemplate gridFsTemplate;
public Mono<String> storeFile(FilePart filePart) {
HttpHeaders headers = filePart.headers();
String contentType = Objects.requireNonNull(headers.getContentType()).toString();
return filePart.content()
.reduce(DataBuffer::write).map(DataBuffer::asInputStream)
.map(input -> gridFsTemplate.store(input, filePart.filename(), contentType))
.map(ObjectId::toHexString);
}