Как отменить подписку на элемент из потока
Рассмотрим следующий поток
FluxSink<String> sink;
Flux<String> flux1 = Flux
.<String>create(emitter -> {
sink = emitter;
},...)
.cache()
.publish()
.autoConnect();
Таким образом, чтобы добавить / подписать элемент, мы можем сделать sink.next(“4”);
flux1.subscribe(item -> log.info(“item: “+item);
Фильтруя flux1
скажем, из элемента "2" не удаляли этот элемент из потока. Я знаю Flux
издатель неизменен.
Если мы можем добавить к нему через раковину, как мы можем удалить элемент из flux1
?
1 ответ
Правильное мышление дает правильные ответы
Думайте о Flux как о неизменном потоке сообщений. Это как река, вы можете добавить к ней немного воды, но вы не можете откатить воду, которую вы уже дали потоку. Тем не менее, вы можете фильтровать эту воду вниз по течению.
Если вам нужно "удалить" недопустимые элементы из потока, вы можете отфильтровать их:
flux1.filter(e -> !e.equal("2"))
.subscribe(item -> log.info(“item: “+item);
Поток - это не структура данных, к которой мы привыкли, а поток данных, который вы не можете изменить в точке подачи данных, но можете манипулировать тем, как они идут в конечную точку