Хорошая реализация / поддержка java.util.concurrent.Flow.Processor<T, R>
Недавно я нашел хорошую поддержку для Publisher с помощью projectreactor.io:
Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i);
fluxSink.complete();
})
.map(...)
.subscribe(...);
Есть ли хорошая поддержка для Proccessor? Я имею в виду что-то вроде или симуляция:
XXX process = new XXX((inputValue, output) -> {
if(inputValue == 0)
output.error();
else
output.next(inputValue);
});
publisher.subscribe(process);
process.subscribe(...);
Если нет, как я могу реализовать свой собственный или почему я не могу это сделать?
Обновление 1:
После обсуждения (см. Комментарии) оказалось, что в моем случае использования мне нужно использовать flatMap
(см. ответ), мой вопрос был: Хорошая реализация процессора, под этим я подразумевал некоторую функциональность: если он не работает, я могу взять на себя управление и выдавать ошибку. Я думаю flatMap
даст вам достаточно функциональности. В моем случае я использовал:
import org.jsoup.Jsoup;
Flux.just("url")
.flatMap(url -> {
try {
Document document = Jsoup.connect(url).get();
return Flux.just(document);
} catch (IOException e) {
return Flux.error(e);
}
})
.subscribe();
3 ответа
Из того, что вы описали в вашем случае использования, я не ожидаю, что вам действительно Processor
, Скорее используйте flatMap
вызвать асинхронные выборки URL. flatMap
Как и все операторы Reactive Streams, по умолчанию остановится немедленно в случае ошибки.
Единственная часть, где вам может потребоваться процессор, это генерировать Flux<URL>
если вы не знаете URL-адреса заранее (в противном случае Flux.fromIterable
или же Flux.just(...)
будет просто отлично).
Если вам нужно отправить результат (ы) в несколько Subscriber
без повторного запуска запросов, посмотрите на publish().connect()
и / или cache()
,
Вы, вероятно, ищете SubmissionPublisher
что похоже на Flux
реализация в реакторе:
Flow.Publisher
что асинхронно выдает отправленные (ненулевые) элементы текущим подписчикам, пока они не будут закрыты. Каждый текущий подписчик получает вновь представленные элементы в том же порядке, если не встречаются исключения или исключения. ИспользуяSubmissionPublisher
позволяет генераторам элементов действовать как совместимые реактивные потоки. Издатели полагаются на обработку отбрасывания и / или блокировку для управления потоком.
Примечание: пользовательский образец Flow.Processor
является общим в ссылке, которая может быть дополнительно настроена для обработки onError
а также consume
Реализация метода, как требуется для вашего варианта использования.
Это действительно зависит от того, что вы хотите сделать.
Большинство методов на Flux
создать такие процессоры и просто вернуть их как Flux
убедившись, что они подписаны в правильном пути к восходящей Flux
,
Так что если ваш Processor
должен просто генерировать событие для каждого, которое он получает, но другое map
это ваш простой способ создать свой Processor
, Если это создает многократное (или нет) событие для каждого полученного события, используйте flatMap
и так далее.
Вы можете создавать еще более сложные, объединяя эти методы. Я ожидаю, что 99% вариантов использования будут обработаны таким образом, просто отлично.
Если этого недостаточно, рассмотрите различные перегрузки subscribe
где вы можете использовать Consumer
обрабатывать элементы Flux
а также изменения состояния, такие как ошибка, завершение и подписка. И вы можете объединить их, среди прочего, с Flux.create(fluxSink -> ...)
построить довольно гибкий Processors
,