Преобразуйте Scala Future в Reactor Flux
Я вызываю API, который возвращает Future of Scala
, и я хочу превратиться в Reactor Flux
очевидно без блокировки и ждем ответа будущего. Я пытаюсь использовать EmitterProcessor, и он отлично работает, когда будущее приносит успех, но, к сожалению, это не так, когда у будущего есть тайм-аут.
Вот код
private Flux<D> transformFutureToFlux(Future<T> future) {
EmitterProcessor<D> emitterProcessor = EmitterProcessor.create();
future.onComplete(getOnCompleteFunction(emitterProcessor), defaultExecutionContext());
return Flux.from(emitterProcessor);
}
private <D> OnComplete getOnCompleteFunction(EmitterProcessor<D> emitterProcessor) {
return new OnComplete() {
@Override
public void onComplete(Throwable t, Object result) {
if (t != null) {
processError(t);
} else {
processSucceed(result);
}
}
private void processSucceed(Object result) {
if (result instanceof ConnectorErrorVO) {
publishConnectorErrorException((ConnectorErrorVO) result);
} else {
publishGenericResponse(result);
}
}
private void publishGenericResponse(Object result) {
if (result instanceof List<?>) {
Flux.fromIterable((List<D>) result).subscribe(emitterProcessor);
} else {
Flux.just((D) result).subscribe(emitterProcessor);
}
}
private void publishConnectorErrorException(ConnectorErrorVO result) {
ConnectorErrorVO connectorErrorVO = result;
Flux<D> error = Flux.error(new ConnectorErrorException( String.valueOf(connectorErrorVO.getCode()), connectorErrorVO.getDescription(), connectorErrorVO.getError()));
error.subscribe(emitterProcessor);
}
private void processError(Throwable t) {
ConnectorManagerExecutor.logger.error(null, "Error and recovery from connector manager transaction", t);
if (t instanceof AskTimeoutException) {
Flux.<D>error(new ConnectorErrorException("407", "connector timeout", ConnectorError.TIMEOUT)).subscribe(emitterProcessor);
} else {
Flux.<D>error(new ConnectorErrorException("500", t.getMessage(), ConnectorError.GENERIC)).subscribe(emitterProcessor);
}
}
};
То, что я пытаюсь сделать, правильно? Есть ли лучший способ сделать это?
С уважением
2 ответа
Если вы вернете Future
, это означает, что есть только 1 возврат, а не серия / поток возвращаемого значения. Вы можете конвертировать вFlux
если хотите, но почему бы вместо этого не использовать Mono
?
Кроме того, если вы используете scala, попробуйте использовать модуль response-scala-extensions SMono.fromFuture
Посмотрите пример SMonoTest.
import scala.concurrent.ExecutionContext.Implicits.global
StepVerifier.create(SMono.fromFuture(Future[Long] {
randomValue
}))
.expectNext(randomValue)
.verifyComplete()
Ты пробовала Flux.create()
? Вместо использования процессора вы регистрируете будущий обратный вызов, который вызывает предоставленныйSink
методы. Например. дляList
если вы перебираете список и выдаете каждое значение через sink.next(T)
метод.