RxJava подписка на побочный эффект
У меня есть вопрос, это простой бизнес-логика потока:
проверьте, находится ли сотрудник в нескольких отделах, отдел и отношение сотрудника в кеше, сначала проверьте, существуют ли отношения в кеше, если они существуют, проверьте, принадлежит ли ему сотрудник, в случае отсутствия в кэше, получите его из базы данных и проверьте отношения о сотруднике, а затем сохранить информацию об отделе в кэш.
это код:
public Observable isEmployeeInDepartment(List<Long> departmentIds, long employeeId){
//this observable will resolve twice, and cause unnecessary cache access
Observable departmentInfoExsitInCache= checkDepartmentInfoFromCache(...).share();
Observable departInfoNotInCache = departmentInfoExsitInCache.filter(...);
//this observable will resolve twice, and cause unnecessary database access
Observable departmentInfoFromDb=departInfoNotInCache.flatMap(departmentIds->checkFromDb()).share();
Observable<Long> saveResult=departmentInfoFromDb.flatMap(departmentInfo->saveToCache());
Observable<Long> departInfoInCache = departmentInfoExsitInCache.filter(...);
return departInfoInCache.check(userId).merge( departmentInfoFromDb.check(userId)).doOnCompleted(saveResult.subscribe());
}
проблема заключается в том, что отдела InfoExsitInCache и saveResult будут разрешены дважды после подписки клиентского метода.
Я обнаружил, что после удаления кода сохранения подписки.doOnCompleted(saveResult.subscribe()) он станет нормальным и разрешится только один раз. Что-то не так с этим кодом?
1 ответ
Вы не правильно поделились здесь.
Проблема в том, что share()
не поможет вам в этом случае. share()
на самом деле publish().autoConnect()
предназначена для сохранения единой подписки на поток, таким образом, повторный вызов подписки не вызовет логику подписки снова, а просто соединит вас с существующим потоком.
Но после того, как все подписчики отписались от общего потока, Observable откажется от подписки, что означает, что при повторном вызове subscribe()
, вы будете вызывать логику подписки и снова вызывать DB/Cache.
Итак, вы снова подписываетесь на оператора shared после того, как он отписался. (в doOnCompleted()
), что вызовет departmentInfoFromDb
а также departmentInfoExsitInCache
снова подписаться и перейти в БД / Кэш.
Рассмотреть возможность использования cache()/reply()
оператор для сохранения извлеченного значения из DB/Cache между подписками.