Поток данных RxJava с SqlBrite и Retrofit

В моем приложении для Android я использую интерфейс репозитория на уровне домена, который поддерживается локальной БД, реализованной с использованием SqlBrite и сетевого API с наблюдаемыми модификациями Retrofit. Итак, у меня есть метод getDomains(): Observable<List<Domain>> в репозитории и два соответствующих метода в моем Retrofit и SqlBrite. Я не хочу объединять или объединять эти две наблюдаемые. Я хочу, чтобы мой репозиторий получал данные только из SqlBrite, и, поскольку SqlBrite возвращает QueryObservable, что вызывает onNext() Каждый раз, когда меняются базовые данные, я могу независимо запускать свой сетевой запрос и сохранять результаты в SqlBrite, а мой Observable обновляется, извлекается из сети и сохраняется в данных БД. Поэтому я попытался реализовать мой репозиторий getDomains() метод следующим образом:

fun getDomains(): Observable<List<Domain>> {
    return db.getDomains()
                   .doOnSubscribe {
                       networkClient.getDomains()
                                    .doOnNext { db.putDomains(it) }
                                    .onErrorReturn{ emptyList() }
                                    .subscribe()
                   }
}

Но в этом случае каждый раз, когда клиент должен подписаться, каждый раз, когда он будет делать сетевые запросы, это не так хорошо. Я думал о другом do... операторы для перемещения запросов туда, но doOnCompleted() в случае QueryObservable никогда не будет вызываться, пока я не позвоню toBlocking() где-то, чего я не буду, doOnEach() также не хорошо, так как он делает запросы каждый раз, когда извлекается элемент из БД. Я также пытался использовать replay() оператор, но хотя Observable кешируется в этом случае, подписка происходит и приводит к сетевым запросам. Итак, как можно объединить эти два наблюдаемых желаемым образом?

2 ответа

Решение

Хорошо, это зависит от конкретного варианта использования, который у вас есть: т. Е. Предполагается, что вы хотите отобразить последние данные из вашей локальной базы данных и время от времени обновлять базу данных, выполняя сетевой запрос в фоновом режиме.

Может быть, есть лучший способ, но, возможно, вы могли бы сделать что-то вроде этого

 fun <T> createDataAwareObservable(databaseQuery: Observable<T>): Observable<T> =
      stateDeterminer.getState().flatMap {
        when (it) {
          State.UP_TO_DATE -> databaseQuery // Nothing to do, data is up to date so observable can be returned directly

          State.NO_DATA ->
            networkClient.getDomains() // no data so first do the network call
                .flatMap { db.save(it) } // save network call result in database
                .flatMap { databaseQuery } // continue with original observable

          State.SYNC_IN_BACKGROUND -> {
            // Execute sync in background
            networkClient.getDomains()
                .flatMap { db.save(it) }
                .observeOn(backgroundSyncScheduler)
                .subscribeOn(backgroundSyncScheduler)
                .subscribe({}, { Timber.e(it, "Error when starting background sync") }, {})

            // Continue with original observable in parallel, network call will then update database and thanks to sqlbrite databaseQuery will be update automatically
            databaseQuery
          }
        }
      }

Итак, в конце вы создаете свой SQLBrite Observable (QueryObservable) и передаете его в createDataAwareObservable() функция. Затем он будет гарантировать, что он загружает данные из сети, если здесь нет данных, в противном случае он проверит, следует ли обновлять данные в фоновом режиме (сохранит их в базе данных, которая затем автоматически обновит SQLBrite QueryObservable), или если данные до настоящего времени.

В основном вы можете использовать это так:

createDataAwareObservable( db.getAllDomains() ).subscribe(...)

Так что для вас, как пользователя этого createDataAwareObservable() вы всегда получаете один и тот же тип Observable<T> назад, как вы передаете в качестве параметра. По сути, кажется, что вы всегда подписывались на db.getAllDomains()...

Если ваша проблема заключается в том, что вы должны подписывать своего наблюдателя каждый раз, когда вы хотите получить данные, вы можете использовать relay, который никогда не отписывает наблюдателей, потому что не реализует onComplete

   /**
 * Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
 * It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
 * and all the next items passed to the pipeline.
 */
@Test
public void testRelay() throws InterruptedException {
    BehaviorRelay<String> relay = BehaviorRelay.create("default");
    relay.subscribe(result -> System.out.println("Observer1:" + result));
    relay.call("1");
    relay.call("2");
    relay.call("3");
    relay.subscribe(result -> System.out.println("Observer2:" + result));
    relay.call("4");
    relay.call("5");
}

Другие примеры здесь https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java

Другие вопросы по тегам