Java 9 Flow определяет подписчика с лямбдами
Я начинаю играть с Java 9 Flow API, и первое, что я нашел и не люблю, похоже, что мы не можем использовать лямбда-выражения, когда мы передаем реализацию подписчика в издателя, как мы можем сделать с RxJava
Поэтому я должен определить и реализовать свой собственный класс подписчика
public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
А потом просто передайте это моему издателю
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new CustomSubscriber<>());
Это действительно многословно, и, насколько я понимаю, это потому, что нам нужно установить подписку в onSubscribe
Перезвоните
protected Flow.Subscription subscription;
Позже будет использоваться в onNext
продолжить выбросы subscription.request(1);
Я до сих пор не понимаю, зачем нужен этот механизм, но он избегает использования Lambdas, как в примере с RxJava
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
e -> System.out.println("do something in the onError"),
() -> System.out.println("Do something in the onComplete"));
Я думаю, что это невозможно, и я не пропускаю ничего здесь, верно?
2 ответа
Я до сих пор не понимаю, зачем нужен этот механизм
Подписка позволяет общаться от подписчика к издателю. request
Метод позволяет абоненту применять противодавление, сообщая вышестоящим компонентам, что оно перегружено и "нуждается в разрыве". Чтобы сделать это, подписчик должен удержать экземпляр подписки и должен иногда вызывать request
чтобы получить больше предметов.
Никакого давления Subscriber
Если у вас есть сценарий использования, в котором вам не нужно применять противодавление и вы хотите воспользоваться уменьшенной сложностью, вы можете реализовать LaidBackSubscriber
, который:
- инвентарь
onSubscribe
сохраняя подписку и немедленно звоняrequest
в теме - инвентарь
onNext
выполняя лямбду, данную во время строительства, а затем вызываяsubscription.request(1)
- инвентарь
onError
а такжеonComplete
выполняя лямбду, данную во время строительства
Это должно получить вам то, что вы хотели.
Общий совет
Java 9 Flow API был создан как точка интеграции для существующих асинхронных библиотек, а не как приглашение для реализации реактивных компонентов в режиме ad-hoc. С этим приятно поэкспериментировать, но если вы действительно хотите создать реактивную систему, существующие библиотеки, вероятно, хорошо подойдут.
Java 9 Flow API представляет собой набор из 4 интерфейсов и 1 класс моста от нереактивного до реактивного мира. Нет операторов, нет удобных лямбда-версий, больше ничего.
Теоретически, это было введено, чтобы позволить самому JDK создавать внутренние компоненты, основанные на принципах реагирования, но нет никаких обнадеживающих признаков того, что это происходит.
Таким образом, пользователи несут ответственность за создание компонентов на этом API, который сложен, утомителен и подвержен ошибкам. Вам лучше подождать, пока основные библиотеки выпустят совместимые версии, или просто придерживайтесь более доступных библиотек на основе Reactive-Streams.Org, таких как RxJava 2 и Reactor 3.
Если вы все еще заинтересованы в создании поверх API-интерфейса Flow вручную, вы можете взглянуть на мою библиотеку исследований / прототипов Reactive4JavaFlow, в которой реализована требуемая лямбда- перегрузка.