По какой-то причине ConnectableObservable останавливается после испускания 128 элементов
Я пытаюсь создать ConnectableFlowable, который испускает несколько объектов каждую секунду. Следующий код испускает 128 элементов, а затем просто останавливается. Никогда не отправлять данные снова. Я пытался удалить вызов flatMap(), на десять элементов выбрасывается бесконечно. Какие-либо предложения?
private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
return createIntervalPlcFlowable().onBackpressureLatest().publish();
}
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS)
.onBackpressureLatest()
.flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong ->
mDataPackageFlowable);
}
private Flowable<PlcDataPackage> createPlcFlowable() {
return Flwable.create(...,BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.single())
}
mDataPackageFlowable = createPlcFlowable();
mConnectableFlowable = createConnectablePlcFlowable();
mConnectableFlowable.connect();
Код подписки:
addDisposable(mGetPlcUpdatesChanelUseCase.execute()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(plcDto -> Timber.d(String.valueOf(++dataCount)), Timber::e));
Код использования:
public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {
private final PlcRepository mPlcRepository;
public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
mPlcRepository = plcRepository;
}
@Override
public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
return mPlcRepository.getUpdatesChannel();
}
@Override
public boolean isParamsRequired() {
return false;
}
}
вместилище
public final class PlcRepositoryImpl implements PlcRepository {
private final PlcCore mPlcCore;
private final PlcInfoTopPlcDtoTransformer mPlcInfoTopPlcDtoTransformer = new PlcInfoTopPlcDtoTransformer();
public PlcRepositoryImpl(PlcCore plcCore) {
mPlcCore = plcCore;
}
@Override
public Flowable<PlcDto> getUpdatesChannel() {
return mPlcCore.getPlcConnectableFlowable()
.map(mPlcInfoTopPlcDtoTransformer::transform);
}
@Override
public Flowable<Object> setDeviceProfileDto(DeviceProfileDto deviceProfileDto) {
return mPlcCore.setDeviceProfileDto(deviceProfileDto);
}
}
- [rxJava2 = '2.1.3' и rxAndroid = '2.0.1' ]