Преобразуйте Scala Future в Reactor Flux

Я вызываю API, который возвращает Future of Scala, и я хочу превратиться в Reactor Fluxочевидно без блокировки и ждем ответа будущего. Я пытаюсь использовать EmitterProcessor, и он отлично работает, когда будущее приносит успех, но, к сожалению, это не так, когда у будущего есть тайм-аут.

Вот код

private Flux<D> transformFutureToFlux(Future<T> future) {
    EmitterProcessor<D> emitterProcessor = EmitterProcessor.create();
    future.onComplete(getOnCompleteFunction(emitterProcessor), defaultExecutionContext());
    return Flux.from(emitterProcessor);
}

private <D> OnComplete getOnCompleteFunction(EmitterProcessor<D> emitterProcessor) {
    return new OnComplete() {
        @Override
        public void onComplete(Throwable t, Object result) {
            if (t != null) {
                processError(t);
            } else {
                processSucceed(result);
            }
        }

        private void processSucceed(Object result) {
            if (result instanceof ConnectorErrorVO) {
                publishConnectorErrorException((ConnectorErrorVO) result);
            } else {
                publishGenericResponse(result);
            }
        }

        private void publishGenericResponse(Object result) {
            if (result instanceof List<?>) {
                Flux.fromIterable((List<D>) result).subscribe(emitterProcessor);
            } else {
                Flux.just((D) result).subscribe(emitterProcessor);
            }
        }

        private void publishConnectorErrorException(ConnectorErrorVO result) {
            ConnectorErrorVO connectorErrorVO = result;
            Flux<D> error = Flux.error(new ConnectorErrorException( String.valueOf(connectorErrorVO.getCode()), connectorErrorVO.getDescription(), connectorErrorVO.getError()));
            error.subscribe(emitterProcessor);
        }

        private void processError(Throwable t) {
            ConnectorManagerExecutor.logger.error(null, "Error and recovery from connector manager transaction", t);
            if (t instanceof AskTimeoutException) {
                Flux.<D>error(new ConnectorErrorException("407", "connector timeout", ConnectorError.TIMEOUT)).subscribe(emitterProcessor);
            } else {
                Flux.<D>error(new ConnectorErrorException("500", t.getMessage(), ConnectorError.GENERIC)).subscribe(emitterProcessor);
            }
        }
    };

То, что я пытаюсь сделать, правильно? Есть ли лучший способ сделать это?

С уважением

2 ответа

Если вы вернете Future, это означает, что есть только 1 возврат, а не серия / поток возвращаемого значения. Вы можете конвертировать вFlux если хотите, но почему бы вместо этого не использовать Mono?

Кроме того, если вы используете scala, попробуйте использовать модуль response-scala-extensions SMono.fromFuture

Посмотрите пример SMonoTest.

    import scala.concurrent.ExecutionContext.Implicits.global
    StepVerifier.create(SMono.fromFuture(Future[Long] {
      randomValue
    }))
      .expectNext(randomValue)
      .verifyComplete()

Ты пробовала Flux.create()? Вместо использования процессора вы регистрируете будущий обратный вызов, который вызывает предоставленныйSinkметоды. Например. дляList если вы перебираете список и выдаете каждое значение через sink.next(T) метод.

Другие вопросы по тегам