Сбрасываемый паттерн Single Rx
У меня есть следующий дизайн, который я хотел бы создать, но я не уверен, какой шаблон Rx ему подходит. Цель более или менее похожа на одну, но с условной проверкой.
- Есть один
Observable<String>
и возможность любого количества наблюдателей. - Если сначала выполняется запрос, наблюдаемое выполнит какой-либо сетевой запрос, принимая строку, а затем сгенерирует обратный вызов (во многом как завершаемый / одиночный)
- Любой последующий вызов с тем же ключом немедленно возвращает тот же результат
- Однако, если прошло 5 минут и сделан тот же вызов, мы будем повторно получать данные, которые, возможно, истекли, а затем отправлять их любым слушателям. Этот результат будет сохранен еще 5 минут, и цикл повторяется.
- Все данные хранятся на основе отправленного ключа, так же, как и шаблон flyweight. Истечение срока действия основано на времени последнего запроса конкретного ключа.
Моей первоначальной мыслью было создать собственный класс с одновременными хэш-картами. Тем не менее, это будет означать, что мне придется самостоятельно обрабатывать множество механизмов потоков. Я чувствую, что RxJava будет отличным решением для этого, но я не уверен, существуют ли такие шаблоны. У кого-нибудь есть идея?
Я понимаю, что цель Single<T>
предназначен для получения только одного ответа, поэтому мои условия могут быть неверными.
Следующее - моя попытка, которую я буду обновлять по мере продвижения
/**
* Created by Allan Wang on 07/01/18.
*
* Reactive flyweight to help deal with prolonged executions
* Each call will output a [Single], which may be new if none exist or the old one is invalidated,
* or reused if an old one is still valid
*
* Types:
* T input argument for caller
* C condition condition to check against for validity
* R response response within reactive output
*/
abstract class RxFlyweight<in T : Any, C : Any, R : Any> {
/**
* Given an input emit the desired response
* This will be executed in a separate thread
*/
protected abstract fun call(input: T): R
/**
* Given an input and condition, check if
* we may used cache data or if we need to make a new request
* Return [true] to use cache, [false] otherwise
*/
protected abstract fun validate(input: T, cond: C): Boolean
/**
* Given an input, create a new condition to be used
* for future requests
*/
protected abstract fun cache(input: T): C
private val conditionals = mutableMapOf<T, C>()
private val sources = mutableMapOf<T, Single<R>>()
private val lock = Any()
/**
* Entry point to give an input a receive a [Single]
* Note that the observer is not bound to any particular thread,
* as it is dependent on [createNewSource]
*/
operator fun invoke(input: T): Single<R> {
synchronized(lock) {
val source = sources[input]
// update condition and retrieve old one
val condition = conditionals.put(input, cache(input))
// check to reuse observable
if (source != null && condition != null && validate(input, condition))
return source
val newSource = createNewSource(input).cache()
sources.put(input, newSource)
return newSource
}
}
/**
* Open source creator
* Result will then be created with [Single.cache]
* If you don't have a need for cache,
* you likely won't have a need for flyweights
*/
open protected fun createNewSource(input: T): Single<R> =
Single.fromCallable { call(input) }
.timeout(20, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
fun reset() {
synchronized(lock) {
sources.clear()
conditionals.clear()
}
}
}