Почему блокируется ConnectableFlux.connect()?

Я новичок в Spring Reactor. Я пытался понять, как работает класс ConnectableFlux. Я читал документы и видел примеры, размещенные в Интернете, но застрял в проблеме.

Может кто подскажет, почему блокируется метод connect()? Я не вижу ничего в документации, где говорится, что он должен блокироваться… тем более, что он возвращает Disposable для дальнейшего использования. Учитывая мой пример кода ниже, я никогда не обхожу метод connect().

Я пытаюсь имитировать старую парадигму интерфейса Listener, которую я использовал много раз в прошлом. Я хочу узнать, как воссоздать архитектуру класса обслуживания и прослушивателя с помощью реактивных потоков. Там, где у меня есть простой класс Service, и у него есть метод, называемый "addUpdateListener(Listener l)", а затем, когда мой метод класса обслуживания "doStuff()" запускает некоторые события, которые должны быть переданы любым слушателям.

Я должен сказать, что буду писать API для использования другими, поэтому, когда я говорю Service class, я не имею в виду @Service в терминах Spring. Это будет простой одноэлементный класс java.

Я просто использую Spring Reactor для реактивных потоков. Я также смотрел на RxJava... но хотел посмотреть, будет ли работать Spring Reactor Core.

Я начал с тестового класса ниже, чтобы понять синтаксис библиотеки, а затем застрял на проблеме блокировки.

Я думаю, что то, что я ищу, описано здесь: Несколько подписчиков

ОБНОВЛЕНИЕ: запуск моего кода через отладчик, код внутри метода подключения ConnectableFlux никогда не возвращается. Он зависает во внутреннем методе подключения и никогда не возвращается из этого метода.

reactor.core.publisher.ConnectableFlux

public final Disposable connect() {
        Disposable[] out = new Disposable[]{null};
        this.connect((r) -> {
            out[0] = r;
        });
        return out[0];
    }

Любая помощь была бы замечательной!

Вот и мой maven pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SpringReactorTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>classworlds:classworlds</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>jmock:*</exclude>
                                    <exclude>*:xml-apis</exclude>
                                    <exclude>org.apache.maven:lib:tests</exclude>
                                    <exclude>log4j:log4j:jar:</exclude>
                                </excludes>
                            </artifactSet>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;

import java.util.concurrent.TimeUnit;

import static java.time.Duration.ofSeconds;

/**
 * Testing ConnectableFlux
 */
public class Main {

    private final static Logger LOG = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws InterruptedException {
        Main m = new Main();

        // Get the connectable
        ConnectableFlux<Object> flux = m.fluxPrintTime();

        // Subscribe some listeners
        // Tried using a new thread for the subscribers, but the connect call still blocks
        LOG.info("Subscribing");
        Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
        Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));

        LOG.info("Connecting...");
        Disposable connect = flux.connect();// WHY does this block??
        LOG.info("Connected..");

        // Sleep 5 seconds
        TimeUnit.SECONDS.sleep(5);

        // Cleanup - Remove listeners
        LOG.info("Disposing");
        connect.dispose();
        disposable.dispose();
        disposable2.dispose();
        LOG.info("Disposed called");
    }

    // Just create a test flux
    public ConnectableFlux<Object> fluxPrintTime() {
        return Flux.create(fluxSink -> {
            while (true) {
                fluxSink.next(System.currentTimeMillis());
            }
        }).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
                .sample(ofSeconds(2))
                .publish();
    }
}

Выполнение приведенного выше кода дает следующий результат... он просто печатает время в миллисекундах, пока я не нажму Ctrl-C процесс..

09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...

1 ответ

Решение

Я получил ответ от команды Spring Reactor и просто отправляю его здесь на случай, если кто-то еще столкнется с этим...

Суть проблемы в том, что вы входите в бесконечный цикл в Flux.create. В тот момент, когда поток подписывается, он войдет в цикл и никогда не выйдет из него, производя данные так быстро, как это может ЦП. С Flux.create вы, по крайней мере, должны в какой-то момент вызвать функцию ink.complete().

Я предлагаю поэкспериментировать, например. Flux.interval в качестве источника ваших регулярных тиков избавит вас от этой посторонней сложности Flux.create, которая возлагает на вас ответственность за концепции более низкого уровня реактивных потоков (сигналы onNext/onComplete/onError, которые вам понадобятся узнать, но, может быть, не прямо сейчас).

В качестве побочного примечания я бы принял во внимание, что эмуляция API на основе прослушивателя с помощью Reactor (или RxJava) не отражает того, что может делать реактивное программирование. Это ограниченный вариант использования, который, вероятно, отвлечет ваше внимание и ожидания от реальных преимуществ реактивного программирования.

С более высокой точки зрения:

Общая идея ConnectableFlux#connect() состоит в том, что у вас есть "временный" источник, который вы хотите разделить между несколькими подписчиками, но он срабатывает в тот момент, когда кто-то на него подписывается. Итак, чтобы не пропустить ни одного события, вы превращаете источник в ConnectableFlux, выполняете некоторую настройку (подписываете нескольких подписчиков) и вручную запускаете источник (вызывая connect()). Он не блокирует и возвращает Disposable`, который представляет восходящее соединение (в случае, если вы также хотите вручную отменить / удалить всю подписку).

PS: Висмут явно устарел, предпочитайте использовать последнюю версию выпуска Диспрозия.

Другие вопросы по тегам