Java 9 Поведение потока Предлагаемый метод SubmissionPublisher

Я играл с Java Flow offer оператор, но после того, как прочитал документацию и сделал мой тест, я не понимаю.

Вот мой тест

@Test
public void offer() throws InterruptedException {
    //Create Publisher for expected items Strings
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    //Register Subscriber
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.offer("item", (subscriber, value) -> false);
    Thread.sleep(500);
}

Оператор предложения получает элемент, подлежащий отправке, и функцию BiPredicate, и, насколько я понимаю, читая документацию, только в том случае, если функция предиката является истинной, элемент будет отправлен.

Но после прохождения теста результат

Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback

Результат не изменится, если вместо false я верну true.

Кто-нибудь может объяснить мне этот оператор немного лучше, пожалуйста.

2 ответа

Решение

Нет, функция предиката используется, чтобы решить, следует ли повторить операцию публикации, как упомянуто в документах:

onDrop - если не ноль, обработчик вызывается при отбрасывании подписчику с аргументами подписчика и элемента; если он возвращает true, предложение повторяется (один раз)

Это не влияет на то, должен ли товар отправляться изначально.

РЕДАКТИРОВАТЬ: пример того, как капли могут возникнуть при использовании offer метод

Я придумал пример того, как могут происходить падения при вызове offer метод. Я не думаю, что вывод на 100% детерминирован, но есть четкая разница, когда он запускается несколько раз. Вы можете просто изменить обработчик, чтобы он возвращал true вместо false, чтобы увидеть, как повторная попытка уменьшает потери из-за насыщенных буферов. В этом примере удаление обычно происходит, потому что максимальная емкость буфера явно мала (передается в конструктор SubmissionPublisher). Но когда повторная попытка включена после небольшого периода сна, отбрасывания удаляются:

public class SubmissionPubliserDropTest {

    public static void main(String[] args) throws InterruptedException {
        // Create Publisher for expected items Strings
        // Note the small buffer max capacity to be able to cause drops
        SubmissionPublisher<String> publisher =
                               new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
        // Register Subscriber
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        // publish 3 items for each subscriber
        for(int i = 0; i < 3; i++) {
            int result = publisher.offer("item" + i, (subscriber, value) -> {
                // sleep for a small period before deciding whether to retry or not
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return false;  // you can switch to true to see that drops are reduced
            });
            // show the number of dropped items
            if(result < 0) {
                System.err.println("dropped: " + result);
            }
        }
        Thread.sleep(3000);
        publisher.close();
    }
}

class CustomSubscriber<T> implements Flow.Subscriber<T> {

    private Subscription sub;

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    @Override
    public void onError(Throwable th) {
        th.printStackTrace();
        sub.cancel();
    }

    @Override
    public void onNext(T arg0) {
        System.out.println("Got : " + arg0 + " --> onNext() callback");
        sub.request(1);
    }

    @Override
    public void onSubscribe(Subscription sub) {
        System.out.println("Subscription done");
        this.sub = sub;
        sub.request(1);
    }

}

SubmissionPublisher.offer говорится, что

Элемент может быть отброшен одним или несколькими подписчиками, если превышены лимиты ресурсов, и в этом случае вызывается данный обработчик (если не ноль), и если он возвращает true повторил один раз.

Просто чтобы понять, в обоих ваших звонках

publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked

publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked

Но все же publisher публикует данный элемент каждому своему текущему подписчику. что происходит в вашем текущем сценарии.


Сценарий проверки того, что указанный вами обработчик вызывается или не вызывается при попытке воспроизведения, сложен с точки зрения ограничений ресурсов, как предполагает доктор:

Элемент может быть отброшен одним или несколькими подписчиками, если превышены пределы ресурса, и в этом случае вызывается данный обработчик (если он не равен нулю), и если он возвращает значение true, повторяется один раз.

Тем не менее, вы можете попробовать сбросить элементы с установленным минимальным временем ожидания, используя перегруженный метод для offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

timeout - сколько времени ждать ресурсов для любого абонента, прежде чем сдаться, в единицах измерения

unit - TimeUnit, определяющий, как интерпретировать параметр тайм-аута

Так как offer методы могут отбрасывать элементы (либо сразу, либо с ограниченным временем ожидания), что даст возможность вставить обработчик и затем повторить попытку.

Другие вопросы по тегам