Как манипулировать объектом, поступающим из Flux<Object>, со значением, исходящим от метода, излучающего Mono<Items> неблокирующим способом?

Я пытаюсь манипулировать моими объектами, полученными из Flux, с данными, полученными из Mono, где методы, излучающие Flux объекта и Mono элементов, являются разными вызовами API. Проблема в том, что я не контролирую потоки, а элементы, полученные из Mono, никогда не назначаются моему объекту, если я не намеренно блокирую () этот поток. Пожалуйста, предложите, если какой-либо неблокирующий способ возможен для этого сценария.

Я также посмотрел на планировщики, подписаться, опубликовать, но не смог выяснить конвейер.

public Flux<Object> test {

 method1().map(obj -> {
        if (obj.getTotalItems() > 20) {
            obj.setItems(method2(obj).block());
        }
        return obj;
  });
}

Здесь method1 испускает поток объектов, полученных от попадания API.

И method2 генерирует список элементов, извлеченных из другого попадания API.

Как я могу сделать весь этот поток неблокирующим?

1 ответ

Решение

Пытаться flatMap или же concatMap

с помощью flatMap оператор вы можете расплющить подпоток в неблокирующем публичном

Flux<Object> test {

 method1().flatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

flatMap позволяет выравнивать несколько потоков одновременно, поэтому в случае длительных операций вы можете использовать более эффективные элементы процесса.

Недостатком flatMap является то, что он не сохраняет порядок элементов, поэтому, если у вас есть последовательность вышестоящих элементов, таких как [1, 2, 3, 4] с flatMap есть вероятность, что порядок изменится из-за асинхронного характера подпотоков.

Для сохранения порядка вы можете использовать concatMap которые сглаживают только один раз за раз, поэтому есть гарантии, что порядок элементов сглаживания будет сохранен:

Flux<Object> test {

 method1().concatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

Заметка

Мутация объектов таким способом - не лучшая идея, и я бы предпочел использовать неизменяемый объектный шаблон объекта в реактивном программировании.

Другие вопросы по тегам