Перезапустите реактивный обмен сообщениями, например, после перенастройки
Как я могу перезапустить или остановить / возобновить реактивный обмен сообщениями, например, после изменения интервала времени? Этот пример взят из руководства Quarkus: https://quarkus.io/guides/kafka-streams
@Outgoing("temperature-values")
public Flowable<KafkaRecord<Integer, String>> generate() {
return Flowable.interval(500, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.map(tick -> {
WeatherStation station = stations.get(random.nextInt(stations.size()));
double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
.setScale(1, RoundingMode.HALF_UP)
.doubleValue();
LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
});
}
1 ответ
Решение
Вы можете попробовать заменить Flowable
с участием Subject
, как вариант, и используйте Flowable
вводить значения в Subject
сам. Затем, когда вы захотите заменить все, что вам нужно, вы сбросите текущийFlowable
и создать новый, который будет кормить Subject
class YourClass {
private Subject<KafkaRecord<Integer, String>> temperatureSubject = BehaviorSubject.create();
private Disposable currentSubscription;
public void setFlowable() {
if(currentSubscription != null && !currentSubscription.isDisposed()) {
currentSubscription.dispose();
}
currentSubscription = Flowable.interval(5, TimeUnit.SECONDS)
.map(it -> {
WeatherStation station = stations.get(random.nextInt(stations.size()));
double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
.setScale(1, RoundingMode.HALF_UP)
.doubleValue();
LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
}).subscribe(it -> {
temperatureSubject.onNext(it);
});
}
@Outgoing("temperature-values")
public Flowable<KafkaRecord<Integer, String>> generate() {
return temperatureSubject.toFlowable(BackpressureStrategy.LATEST);
}
}