Почему блокируется ConnectableFlux.connect()?
Я новичок в Spring Reactor. Я пытался понять, как работает класс ConnectableFlux. Я читал документы и видел примеры, размещенные в Интернете, но застрял в проблеме.
Может кто подскажет, почему блокируется метод connect()? Я не вижу ничего в документации, где говорится, что он должен блокироваться… тем более, что он возвращает Disposable для дальнейшего использования. Учитывая мой пример кода ниже, я никогда не обхожу метод connect().
Я пытаюсь имитировать старую парадигму интерфейса Listener, которую я использовал много раз в прошлом. Я хочу узнать, как воссоздать архитектуру класса обслуживания и прослушивателя с помощью реактивных потоков. Там, где у меня есть простой класс Service, и у него есть метод, называемый "addUpdateListener(Listener l)", а затем, когда мой метод класса обслуживания "doStuff()" запускает некоторые события, которые должны быть переданы любым слушателям.
Я должен сказать, что буду писать API для использования другими, поэтому, когда я говорю Service class, я не имею в виду @Service в терминах Spring. Это будет простой одноэлементный класс java.
Я просто использую Spring Reactor для реактивных потоков. Я также смотрел на RxJava... но хотел посмотреть, будет ли работать Spring Reactor Core.
Я начал с тестового класса ниже, чтобы понять синтаксис библиотеки, а затем застрял на проблеме блокировки.
Я думаю, что то, что я ищу, описано здесь: Несколько подписчиков
ОБНОВЛЕНИЕ: запуск моего кода через отладчик, код внутри метода подключения ConnectableFlux никогда не возвращается. Он зависает во внутреннем методе подключения и никогда не возвращается из этого метода.
reactor.core.publisher.ConnectableFlux
public final Disposable connect() {
Disposable[] out = new Disposable[]{null};
this.connect((r) -> {
out[0] = r;
});
return out[0];
}
Любая помощь была бы замечательной!
Вот и мой maven pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SpringReactorTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>log4j:log4j:jar:</exclude>
</excludes>
</artifactSet>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofSeconds;
/**
* Testing ConnectableFlux
*/
public class Main {
private final static Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws InterruptedException {
Main m = new Main();
// Get the connectable
ConnectableFlux<Object> flux = m.fluxPrintTime();
// Subscribe some listeners
// Tried using a new thread for the subscribers, but the connect call still blocks
LOG.info("Subscribing");
Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));
LOG.info("Connecting...");
Disposable connect = flux.connect();// WHY does this block??
LOG.info("Connected..");
// Sleep 5 seconds
TimeUnit.SECONDS.sleep(5);
// Cleanup - Remove listeners
LOG.info("Disposing");
connect.dispose();
disposable.dispose();
disposable2.dispose();
LOG.info("Disposed called");
}
// Just create a test flux
public ConnectableFlux<Object> fluxPrintTime() {
return Flux.create(fluxSink -> {
while (true) {
fluxSink.next(System.currentTimeMillis());
}
}).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
.sample(ofSeconds(2))
.publish();
}
}
Выполнение приведенного выше кода дает следующий результат... он просто печатает время в миллисекундах, пока я не нажму Ctrl-C процесс..
09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...
1 ответ
Я получил ответ от команды Spring Reactor и просто отправляю его здесь на случай, если кто-то еще столкнется с этим...
Суть проблемы в том, что вы входите в бесконечный цикл в Flux.create. В тот момент, когда поток подписывается, он войдет в цикл и никогда не выйдет из него, производя данные так быстро, как это может ЦП. С Flux.create вы, по крайней мере, должны в какой-то момент вызвать функцию ink.complete().
Я предлагаю поэкспериментировать, например. Flux.interval в качестве источника ваших регулярных тиков избавит вас от этой посторонней сложности Flux.create, которая возлагает на вас ответственность за концепции более низкого уровня реактивных потоков (сигналы onNext/onComplete/onError, которые вам понадобятся узнать, но, может быть, не прямо сейчас).
В качестве побочного примечания я бы принял во внимание, что эмуляция API на основе прослушивателя с помощью Reactor (или RxJava) не отражает того, что может делать реактивное программирование. Это ограниченный вариант использования, который, вероятно, отвлечет ваше внимание и ожидания от реальных преимуществ реактивного программирования.
С более высокой точки зрения:
Общая идея ConnectableFlux#connect() состоит в том, что у вас есть "временный" источник, который вы хотите разделить между несколькими подписчиками, но он срабатывает в тот момент, когда кто-то на него подписывается. Итак, чтобы не пропустить ни одного события, вы превращаете источник в ConnectableFlux, выполняете некоторую настройку (подписываете нескольких подписчиков) и вручную запускаете источник (вызывая connect()). Он не блокирует и возвращает Disposable`, который представляет восходящее соединение (в случае, если вы также хотите вручную отменить / удалить всю подписку).
PS: Висмут явно устарел, предпочитайте использовать последнюю версию выпуска Диспрозия.