Kotlin Coroutine Flow: ограничение количества сборщиков
Есть ли способ ограничить количество сборщиков в функции, которая возвращает поток, используя построитель потока?
У меня есть этот общедоступный метод в ViewModel
fun fetchAssets(limit: String) {
viewModelScope.launch {
withContext(Dispatchers.IO){
getAssetsUseCase(AppConfigs.ASSET_PARAMS, limit).onEach {
when (it) {
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
Этот метод вызывается в модели ViewModel.
init
block, но также может быть вызван вручную в пользовательском интерфейсе.
Этот поток испускает значение каждые 10 секунд.
Репозиторий
override fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
interceptor.baseUrl = AppConfigs.ASSET_BASE_URL
emit(RequestStatus.Loading())
val domainModel = mapper.mapToDomainModel(service.getAssetItems(query, limit))
emit(RequestStatus.Success(domainModel))
} catch (e: HttpException) {
emit(RequestStatus.Failed(e))
} catch (e: IOException) {
emit(RequestStatus.Failed(e))
}
delay(10_000)
}
}
К сожалению каждый раз
fetch()
был вызван из пользовательского интерфейса, я заметил, что он создает другие сборщики, поэтому может получиться множество сборщиков, что действительно плохо и неправильно.
Идея состоит в том, чтобы иметь поток, который выдает значение каждые 10 секунд, но его также можно вызывать вручную через пользовательский интерфейс для немедленного обновления данных без использования нескольких коллекторов.
1 ответ
В итоге я переместил таймер в ViewModel, поскольку я могу запрашивать выборку по запросу, а также не иметь нескольких сборщиков, работающих одновременно.
private var job: Job? = null
private val _assetState = defaultMutableSharedFlow<AssetState>()
fun getAssetState() = _assetState.asSharedFlow()
init {
job = viewModelScope.launch {
while(true) {
if (lifecycleState == LifeCycleState.ON_START || lifecycleState == LifeCycleState.ON_RESUME)
fetchAssets()
delay(10_000)
}
}
}
fun fetchAssets() {
viewModelScope.launch {
withContext(Dispatchers.IO) {
getAssetsUseCase(
AppConfigs.ASSET_BASE_URL,
AppConfigs.ASSET_PARAMS,
AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
).onEach {
when(it){
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
override fun onCleared() {
job?.cancel()
super.onCleared()
}
Пожалуйста, поправьте меня, если это запах кода.