RxJava подписка на побочный эффект

У меня есть вопрос, это простой бизнес-логика потока:

проверьте, находится ли сотрудник в нескольких отделах, отдел и отношение сотрудника в кеше, сначала проверьте, существуют ли отношения в кеше, если они существуют, проверьте, принадлежит ли ему сотрудник, в случае отсутствия в кэше, получите его из базы данных и проверьте отношения о сотруднике, а затем сохранить информацию об отделе в кэш.

это код:

public Observable  isEmployeeInDepartment(List<Long> departmentIds, long employeeId){

     //this observable will resolve twice, and cause unnecessary cache access
     Observable  departmentInfoExsitInCache= checkDepartmentInfoFromCache(...).share();   

     Observable  departInfoNotInCache = departmentInfoExsitInCache.filter(...);

     //this observable will resolve twice, and cause unnecessary database access
     Observable  departmentInfoFromDb=departInfoNotInCache.flatMap(departmentIds->checkFromDb()).share(); 

     Observable<Long> saveResult=departmentInfoFromDb.flatMap(departmentInfo->saveToCache());

     Observable<Long> departInfoInCache = departmentInfoExsitInCache.filter(...);

     return departInfoInCache.check(userId).merge( departmentInfoFromDb.check(userId)).doOnCompleted(saveResult.subscribe());
}

проблема заключается в том, что отдела InfoExsitInCache и saveResult будут разрешены дважды после подписки клиентского метода.

Я обнаружил, что после удаления кода сохранения подписки.doOnCompleted(saveResult.subscribe()) он станет нормальным и разрешится только один раз. Что-то не так с этим кодом?

1 ответ

Вы не правильно поделились здесь.
Проблема в том, что share() не поможет вам в этом случае. share() на самом деле publish().autoConnect() предназначена для сохранения единой подписки на поток, таким образом, повторный вызов подписки не вызовет логику подписки снова, а просто соединит вас с существующим потоком.
Но после того, как все подписчики отписались от общего потока, Observable откажется от подписки, что означает, что при повторном вызове subscribe(), вы будете вызывать логику подписки и снова вызывать DB/Cache.

Итак, вы снова подписываетесь на оператора shared после того, как он отписался. (в doOnCompleted()), что вызовет departmentInfoFromDb а также departmentInfoExsitInCache снова подписаться и перейти в БД / Кэш.

Рассмотреть возможность использования cache()/reply() оператор для сохранения извлеченного значения из DB/Cache между подписками.

Другие вопросы по тегам