Перезапустите реактивный обмен сообщениями, например, после перенастройки

Как я могу перезапустить или остановить / возобновить реактивный обмен сообщениями, например, после изменения интервала времени? Этот пример взят из руководства Quarkus: https://quarkus.io/guides/kafka-streams

@Outgoing("temperature-values")                             
public Flowable<KafkaRecord<Integer, String>> generate() {

    return Flowable.interval(500, TimeUnit.MILLISECONDS)    
            .onBackpressureDrop()
            .map(tick -> {
                WeatherStation station = stations.get(random.nextInt(stations.size()));
                double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
            });
}

1 ответ

Решение

Вы можете попробовать заменить Flowable с участием Subject, как вариант, и используйте Flowable вводить значения в Subjectсам. Затем, когда вы захотите заменить все, что вам нужно, вы сбросите текущийFlowable и создать новый, который будет кормить Subject

class YourClass {

    private Subject<KafkaRecord<Integer, String>> temperatureSubject = BehaviorSubject.create();
    private Disposable currentSubscription;

    public void setFlowable() {
        if(currentSubscription != null && !currentSubscription.isDisposed()) {
            currentSubscription.dispose();
        }
        currentSubscription = Flowable.interval(5, TimeUnit.SECONDS)
                .map(it -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
                }).subscribe(it -> {
                    temperatureSubject.onNext(it);
                });
    }

    @Outgoing("temperature-values")
    public Flowable<KafkaRecord<Integer, String>> generate() {
        return temperatureSubject.toFlowable(BackpressureStrategy.LATEST);
    }
}
Другие вопросы по тегам