Java 9 Поведение потока Предлагаемый метод SubmissionPublisher
Я играл с Java Flow offer
оператор, но после того, как прочитал документацию и сделал мой тест, я не понимаю.
Вот мой тест
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
Оператор предложения получает элемент, подлежащий отправке, и функцию BiPredicate, и, насколько я понимаю, читая документацию, только в том случае, если функция предиката является истинной, элемент будет отправлен.
Но после прохождения теста результат
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
Результат не изменится, если вместо false я верну true.
Кто-нибудь может объяснить мне этот оператор немного лучше, пожалуйста.
2 ответа
Нет, функция предиката используется, чтобы решить, следует ли повторить операцию публикации, как упомянуто в документах:
onDrop
- если не ноль, обработчик вызывается при отбрасывании подписчику с аргументами подписчика и элемента; если он возвращает true, предложение повторяется (один раз)
Это не влияет на то, должен ли товар отправляться изначально.
РЕДАКТИРОВАТЬ: пример того, как капли могут возникнуть при использовании offer
метод
Я придумал пример того, как могут происходить падения при вызове offer
метод. Я не думаю, что вывод на 100% детерминирован, но есть четкая разница, когда он запускается несколько раз. Вы можете просто изменить обработчик, чтобы он возвращал true вместо false, чтобы увидеть, как повторная попытка уменьшает потери из-за насыщенных буферов. В этом примере удаление обычно происходит, потому что максимальная емкость буфера явно мала (передается в конструктор SubmissionPublisher
). Но когда повторная попытка включена после небольшого периода сна, отбрасывания удаляются:
public class SubmissionPubliserDropTest {
public static void main(String[] args) throws InterruptedException {
// Create Publisher for expected items Strings
// Note the small buffer max capacity to be able to cause drops
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if(result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
publisher.close();
}
}
class CustomSubscriber<T> implements Flow.Subscriber<T> {
private Subscription sub;
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable th) {
th.printStackTrace();
sub.cancel();
}
@Override
public void onNext(T arg0) {
System.out.println("Got : " + arg0 + " --> onNext() callback");
sub.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
System.out.println("Subscription done");
this.sub = sub;
sub.request(1);
}
}
SubmissionPublisher.offer
говорится, что
Элемент может быть отброшен одним или несколькими подписчиками, если превышены лимиты ресурсов, и в этом случае вызывается данный обработчик (если не ноль), и если он возвращает
true
повторил один раз.
Просто чтобы понять, в обоих ваших звонках
publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked
publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked
Но все же publisher
публикует данный элемент каждому своему текущему подписчику. что происходит в вашем текущем сценарии.
Сценарий проверки того, что указанный вами обработчик вызывается или не вызывается при попытке воспроизведения, сложен с точки зрения ограничений ресурсов, как предполагает доктор:
Элемент может быть отброшен одним или несколькими подписчиками, если превышены пределы ресурса, и в этом случае вызывается данный обработчик (если он не равен нулю), и если он возвращает значение true, повторяется один раз.
Тем не менее, вы можете попробовать сбросить элементы с установленным минимальным временем ожидания, используя перегруженный метод для offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
timeout
- сколько времени ждать ресурсов для любого абонента, прежде чем сдаться, в единицах измерения
unit
- TimeUnit, определяющий, как интерпретировать параметр тайм-аута
Так как offer
методы могут отбрасывать элементы (либо сразу, либо с ограниченным временем ожидания), что даст возможность вставить обработчик и затем повторить попытку.