По какой-то причине ConnectableObservable останавливается после испускания 128 элементов

Я пытаюсь создать ConnectableFlowable, который испускает несколько объектов каждую секунду. Следующий код испускает 128 элементов, а затем просто останавливается. Никогда не отправлять данные снова. Я пытался удалить вызов flatMap(), на десять элементов выбрасывается бесконечно. Какие-либо предложения?

 private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
        return createIntervalPlcFlowable().onBackpressureLatest().publish();
    }

    private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
        return Flowable.interval(1, TimeUnit.SECONDS)
                .onBackpressureLatest()
                .flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong -> 
                     mDataPackageFlowable);
    }

    private Flowable<PlcDataPackage> createPlcFlowable() {
       return Flwable.create(...,BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.single())
    }

 mDataPackageFlowable = createPlcFlowable();
 mConnectableFlowable = createConnectablePlcFlowable();
 mConnectableFlowable.connect();

Код подписки:

 addDisposable(mGetPlcUpdatesChanelUseCase.execute()
                              .observeOn(AndroidSchedulers.mainThread())
                              .subscribe(plcDto -> Timber.d(String.valueOf(++dataCount)), Timber::e));

Код использования:

public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {

    private final PlcRepository mPlcRepository;

    public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
        mPlcRepository = plcRepository;
    }

    @Override
    public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
        return mPlcRepository.getUpdatesChannel();
    }

    @Override
    public boolean isParamsRequired() {
        return false;
    }
}

вместилище

public final class PlcRepositoryImpl implements PlcRepository {

    private final PlcCore mPlcCore;
    private final PlcInfoTopPlcDtoTransformer mPlcInfoTopPlcDtoTransformer = new PlcInfoTopPlcDtoTransformer();

    public PlcRepositoryImpl(PlcCore plcCore) {
        mPlcCore = plcCore;
    }

    @Override
    public Flowable<PlcDto> getUpdatesChannel() {
        return mPlcCore.getPlcConnectableFlowable()
                .map(mPlcInfoTopPlcDtoTransformer::transform);
    }

    @Override
    public Flowable<Object> setDeviceProfileDto(DeviceProfileDto deviceProfileDto) {
        return mPlcCore.setDeviceProfileDto(deviceProfileDto);
    }
}
  • [rxJava2 = '2.1.3' и rxAndroid = '2.0.1' ]

0 ответов

Другие вопросы по тегам