Рекурсия по реактивным потокам с Project Reactor

Моя цель - просмотреть граф каталогов и записать все их имена, используя реактивные потоки и Project Reactor.

Поскольку файловая система удаленная, вызовы к ней блокируются. Поэтому я хотел бы сохранить выполнение блокирующего вызова отдельно от остальной части моего неблокирующего асинхронного кода. Я делаю это, используя эту рекомендацию: http://projectreactor.io/docs/core/release/reference/.

Вот структура, которую мне нужно пройти:

/
  /jupiter
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

  /earth
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

  /mars
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

И вот код, который я придумал до сих пор:

public class ReactorEngine {

    private static Logger log = LoggerFactory.getLogger(ReactorEngine.class);

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Server server = new Server();

        Flux.fromIterable(server.getChildren("/"))
            .flatMap(parent -> Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()))
            .publishOn(Schedulers.elastic())
            .doOnTerminate(latch::countDown)
            .subscribe(ReactorEngine::handleResponse);

        latch.await();
    }

    private static void handleResponse(List<String> value) {
        log.info("Received: " + value);
    }

}

public class Server {

    public List<String> getChildren(final String path) {
        // Generate some I/O
        ...
    }
}

Поэтому я начинаю с каталогов верхнего уровня и асинхронно запрашиваю первый уровень вниз (своих детей). Все идет хорошо, и это вывод:

15:35:05.902 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:35:07.062 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:35:07.140 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:35:07.140 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:35:07.140 [elastic-5] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:35:08.140 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/earth/phase-1/, /earth/phase-2/, /earth/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/jupiter/phase-1/, /jupiter/phase-2/, /jupiter/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/mars/phase-1/, /mars/phase-2/, /mars/phase-3/]

Теперь мой вопрос заключается в том, как мне вернуть в поток элементы, которые пришли в качестве результатов, чтобы механизм рекурсивно вызывал server.getChildren (parent) до тех пор, пока не будет пройден весь граф каталогов?

Действительно ли рекурсия - это путь, или есть лучший "реактивный" способ сделать это, возможно, с помощью операторов?

Спасибо!

редактировать

expand(Function) Оператор, предложенный Саймоном, хорошо работает для обхода графа. Я изменил код на это:

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Server server = new Server();

    Flux.fromIterable(server.getChildren("/"))
        .expand(p -> Flux.fromIterable(server.getChildren(p)).subscribeOn(Schedulers.elastic()))
        .publishOn(Schedulers.elastic())
        .doOnTerminate(latch::countDown)
        .subscribe(ReactorEngine::handleResponse);

    latch.await();
}

Тем не менее, я потерял асинхронный способ вызова блокировки server.getChildren(String) метод моего сервера. Как вы можете видеть в этих журналах, каждый подкаталог получается синхронно, раз в секунду:

15:57:55.398 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:57:56.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:57:56.593 [main] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:57:56.593 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/
15:57:57.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:57:57.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/
15:57:58.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:57:58.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/
15:57:59.599 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-1/...
15:57:59.599 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-1/
15:58:00.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-2/...
15:58:00.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-2/
15:58:01.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-3/...
15:58:01.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-3/
15:58:02.601 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-4/...
15:58:02.601 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-4/
15:58:03.602 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-1/...
15:58:03.603 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-1/
15:58:04.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-2/...
15:58:04.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-2/
15:58:05.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-3/...
15:58:05.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-3/
15:58:06.605 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-4/...
15:58:06.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-4/
15:58:07.605 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-1/...
15:58:07.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-1/
15:58:08.606 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-2/...
15:58:08.606 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-2/
15:58:09.607 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-3/...
15:58:09.607 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-3/
15:58:10.608 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-4/...
15:58:10.608 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-4/

Не могли бы вы дать подсказку о том, как позвонить Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()) вернуться в схему? Нет никаких Flux.fromCallable() что я могу позвонить, и, возможно, по уважительной причине.

Но так как я действительно новичок в реактивном программировании и в концепциях Project Reactor, мне довольно сложно обдумать этот способ асинхронного выполнения.

Спасибо.

1 ответ

Для этого есть оператор:) Посмотрите на expand а также expandDeep

Другие вопросы по тегам