Хорошая реализация / поддержка java.util.concurrent.Flow.Processor<T, R>

Недавно я нашел хорошую поддержку для Publisher с помощью projectreactor.io:

Flux.create(fluxSink -> {
           for (int i = 0; i < 10; i++)
            fluxSink.next(i);
           fluxSink.complete();
        })
                .map(...)
                .subscribe(...);

Есть ли хорошая поддержка для Proccessor? Я имею в виду что-то вроде или симуляция:

XXX process = new XXX((inputValue, output) -> {
    if(inputValue == 0)
       output.error();
    else
       output.next(inputValue);
});

publisher.subscribe(process);  
process.subscribe(...);

Если нет, как я могу реализовать свой собственный или почему я не могу это сделать?

Обновление 1:

После обсуждения (см. Комментарии) оказалось, что в моем случае использования мне нужно использовать flatMap (см. ответ), мой вопрос был: Хорошая реализация процессора, под этим я подразумевал некоторую функциональность: если он не работает, я могу взять на себя управление и выдавать ошибку. Я думаю flatMap даст вам достаточно функциональности. В моем случае я использовал:

        import org.jsoup.Jsoup;

        Flux.just("url")
            .flatMap(url -> {
                try {
                    Document document = Jsoup.connect(url).get();
                    return Flux.just(document);
                } catch (IOException e) {
                    return Flux.error(e);
                }
            })
            .subscribe();

3 ответа

Решение

Из того, что вы описали в вашем случае использования, я не ожидаю, что вам действительно Processor, Скорее используйте flatMap вызвать асинхронные выборки URL. flatMapКак и все операторы Reactive Streams, по умолчанию остановится немедленно в случае ошибки.

Единственная часть, где вам может потребоваться процессор, это генерировать Flux<URL> если вы не знаете URL-адреса заранее (в противном случае Flux.fromIterable или же Flux.just(...) будет просто отлично).

Если вам нужно отправить результат (ы) в несколько Subscriber без повторного запуска запросов, посмотрите на publish().connect() и / или cache(),

Вы, вероятно, ищете SubmissionPublisher что похоже на Flux реализация в реакторе:

Flow.Publisher что асинхронно выдает отправленные (ненулевые) элементы текущим подписчикам, пока они не будут закрыты. Каждый текущий подписчик получает вновь представленные элементы в том же порядке, если не встречаются исключения или исключения. Используя SubmissionPublisher позволяет генераторам элементов действовать как совместимые реактивные потоки. Издатели полагаются на обработку отбрасывания и / или блокировку для управления потоком.

Примечание: пользовательский образец Flow.Processor является общим в ссылке, которая может быть дополнительно настроена для обработки onError а также consume Реализация метода, как требуется для вашего варианта использования.

Это действительно зависит от того, что вы хотите сделать.

Большинство методов на Flux создать такие процессоры и просто вернуть их как Flux убедившись, что они подписаны в правильном пути к восходящей Flux,

Так что если ваш Processor должен просто генерировать событие для каждого, которое он получает, но другое map это ваш простой способ создать свой Processor, Если это создает многократное (или нет) событие для каждого полученного события, используйте flatMap и так далее.

Вы можете создавать еще более сложные, объединяя эти методы. Я ожидаю, что 99% вариантов использования будут обработаны таким образом, просто отлично.

Если этого недостаточно, рассмотрите различные перегрузки subscribe где вы можете использовать Consumer обрабатывать элементы Flux а также изменения состояния, такие как ошибка, завершение и подписка. И вы можете объединить их, среди прочего, с Flux.create(fluxSink -> ...) построить довольно гибкий Processors,

Другие вопросы по тегам