Java 9 Flow определяет подписчика с лямбдами

Я начинаю играть с Java 9 Flow API, и первое, что я нашел и не люблю, похоже, что мы не можем использовать лямбда-выражения, когда мы передаем реализацию подписчика в издателя, как мы можем сделать с RxJava

Поэтому я должен определить и реализовать свой собственный класс подписчика

public class CustomSubscriber<T> implements Flow.Subscriber<T> {

        protected Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscription done:");
            subscription.request(1);
        }

        @Override
        public void onNext(T item) {
            System.out.println("Got : " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    }

А потом просто передайте это моему издателю

   SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    publisher.subscribe(new CustomSubscriber<>());

Это действительно многословно, и, насколько я понимаю, это потому, что нам нужно установить подписку в onSubscribe Перезвоните

protected Flow.Subscription subscription;     

Позже будет использоваться в onNext продолжить выбросы subscription.request(1);

Я до сих пор не понимаю, зачем нужен этот механизм, но он избегает использования Lambdas, как в примере с RxJava

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
        e -> System.out.println("do something in the onError"),
        () -> System.out.println("Do something in the onComplete"));

Я думаю, что это невозможно, и я не пропускаю ничего здесь, верно?

2 ответа

Решение

Я до сих пор не понимаю, зачем нужен этот механизм

Подписка позволяет общаться от подписчика к издателю. request Метод позволяет абоненту применять противодавление, сообщая вышестоящим компонентам, что оно перегружено и "нуждается в разрыве". Чтобы сделать это, подписчик должен удержать экземпляр подписки и должен иногда вызывать request чтобы получить больше предметов.

Никакого давления Subscriber

Если у вас есть сценарий использования, в котором вам не нужно применять противодавление и вы хотите воспользоваться уменьшенной сложностью, вы можете реализовать LaidBackSubscriber, который:

  • инвентарь onSubscribe сохраняя подписку и немедленно звоня request в теме
  • инвентарь onNext выполняя лямбду, данную во время строительства, а затем вызывая subscription.request(1)
  • инвентарь onError а также onComplete выполняя лямбду, данную во время строительства

Это должно получить вам то, что вы хотели.

Общий совет

Java 9 Flow API был создан как точка интеграции для существующих асинхронных библиотек, а не как приглашение для реализации реактивных компонентов в режиме ad-hoc. С этим приятно поэкспериментировать, но если вы действительно хотите создать реактивную систему, существующие библиотеки, вероятно, хорошо подойдут.

Java 9 Flow API представляет собой набор из 4 интерфейсов и 1 класс моста от нереактивного до реактивного мира. Нет операторов, нет удобных лямбда-версий, больше ничего.

Теоретически, это было введено, чтобы позволить самому JDK создавать внутренние компоненты, основанные на принципах реагирования, но нет никаких обнадеживающих признаков того, что это происходит.

Таким образом, пользователи несут ответственность за создание компонентов на этом API, который сложен, утомителен и подвержен ошибкам. Вам лучше подождать, пока основные библиотеки выпустят совместимые версии, или просто придерживайтесь более доступных библиотек на основе Reactive-Streams.Org, таких как RxJava 2 и Reactor 3.

Если вы все еще заинтересованы в создании поверх API-интерфейса Flow вручную, вы можете взглянуть на мою библиотеку исследований / прототипов Reactive4JavaFlow, в которой реализована требуемая лямбда- перегрузка.

Другие вопросы по тегам