Как реализовать таймер с соплинами Kotlin

Я хочу реализовать таймер с использованием сопрограмм Kotlin, что-то похожее на это реализовано в RxJava:

       Flowable.interval(0, 5, TimeUnit.SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .map { LocalDateTime.now() }
                    .distinctUntilChanged { old, new ->
                        old.minute == new.minute
                    }
                    .subscribe {
                        setDateTime(it)
                    }

Он будет излучать LocalDateTime каждую новую минуту.

13 ответов

Решение

Я считаю, что это все еще эксперимент, но вы можете использовать TickerChannel для получения значений каждые X миллисекунд:

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)

repeat(10) {
    tickerChannel.receive()
    val currentTime = LocalDateTime.now()
    println(currentTime)
}

Если вам нужно продолжать выполнять свою работу, пока ваша "подписка" что-то делает для каждого "тика", вы можете launch фоновая сопрограмма, которая будет читать с этого канала и делать то, что вы хотите:

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)

launch {
    for (event in tickerChannel) { // event is of type Unit, so we don't really care about it
        val currentTime = LocalDateTime.now()
        println(currentTime)
    }
}

// when you're done with the ticker and don't want more events
tickerChannel.cancel()

Очень прагматичный подход с Kotlin Flows может заключаться в следующем:

      // Create the timer flow
val timer = (0..Int.MAX_VALUE)
    .asSequence()
    .asFlow()
    .onEach { delay(1_000) } // specify delay

// Consume it
timer.collect { 
    println("bling: ${it}")
}

Другое возможное решение в виде многоразового расширения kotlin для CoroutineScope

fun CoroutineScope.launchPeriodicAsync(
    repeatMillis: Long,
    action: () -> Unit
) = this.async {
    if (repeatMillis > 0) {
        while (true) {
            action()
            delay(repeatMillis)
        }
    } else {
        action()
    }
}

а затем использовать как:

var job = CoroutineScope(Dispatchers.IO).launchPeriodicAsync(100) {
  //...
}

а затем прервать его:

job.cancel()

Вы можете создать такой таймер обратного отсчета

GlobalScope.launch(Dispatchers.Main) {
            val totalSeconds = TimeUnit.MINUTES.toSeconds(2)
            val tickSeconds = 1
            for (second in totalSeconds downTo tickSeconds) {
                val time = String.format("%02d:%02d",
                    TimeUnit.SECONDS.toMinutes(second),
                    second - TimeUnit.MINUTES.toSeconds(TimeUnit.SECONDS.toMinutes(second))
                )
                timerTextView?.text = time
                delay(1000)
            }
            timerTextView?.text = "Done!"
        }

Вот возможное решение с использованием Kotlin Flow

fun tickFlow(millis: Long) = callbackFlow<Int> {
    val timer = Timer()
    var time = 0
    timer.scheduleAtFixedRate(
        object : TimerTask() {
            override fun run() {
                try { offer(time) } catch (e: Exception) {}
                time += 1
            }
        },
        0,
        millis)
    awaitClose {
        timer.cancel()
    }
}

Применение

val job = CoroutineScope(Dispatchers.Main).launch {
   tickFlow(125L).collect {
      print(it)
   }
}

...

job.cancel()

Изменить: Джоффри отредактировал свое решение с лучшим подходом.

Старый:

Решение Джоффри работает для меня, но у меня возникла проблема с циклом for.

Мне нужно отменить тикер в цикле for следующим образом:

            val ticker = ticker(500, 0)
            for (event in ticker) {
                if (...) {
                    ticker.cancel()
                } else {
                    ...
                    }
                }
            }

Но ticker.cancel() выбрасывал cancellationException, потому что цикл for продолжался после этого.

Мне пришлось использовать цикл while, чтобы проверить, не был ли канал закрыт, чтобы не получить это исключение.

                val ticker = ticker(500, 0)
                while (!ticker.isClosedForReceive && ticker.iterator().hasNext()) {
                    if (...) {
                        ticker.cancel()
                    } else {
                        ...
                        }
                    }
                }

Таймер с функциями СТАРТ, ПАУЗА и СТОП.

Применение:

      val timer = Timer(millisInFuture = 10_000L, runAtStart = false)
timer.start()

Timerучебный класс:

      import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow

enum class PlayerMode {
    PLAYING,
    PAUSED,
    STOPPED
}

class Timer(
    val millisInFuture: Long,
    val countDownInterval: Long = 1000L,
    runAtStart: Boolean = false,
    val onFinish: (() -> Unit)? = null,
    val onTick: ((Long) -> Unit)? = null
) {
    private var job: Job = Job()
    private val _tick = MutableStateFlow(0L)
    val tick = _tick.asStateFlow()
    private val _playerMode = MutableStateFlow(PlayerMode.STOPPED)
    val playerMode = _playerMode.asStateFlow()

    private val scope = CoroutineScope(Dispatchers.Default)

    init {
        if (runAtStart) start()
    }

    fun start() {
        if (_tick.value == 0L) _tick.value = millisInFuture
        job.cancel()
        job = scope.launch(Dispatchers.IO) {
            _playerMode.value = PlayerMode.PLAYING
            while (isActive) {
                if (_tick.value <= 0) {
                    job.cancel()
                    onFinish?.invoke()
                    _playerMode.value = PlayerMode.STOPPED
                    return@launch
                }
                delay(timeMillis = countDownInterval)
                _tick.value -= countDownInterval
                onTick?.invoke(this@Timer._tick.value)
            }
        }
    }

    fun pause() {
        job.cancel()
        _playerMode.value = PlayerMode.PAUSED
    }

    fun stop() {
        job.cancel()
        _tick.value = 0
        _playerMode.value = PlayerMode.STOPPED
    }
}

Я черпал вдохновение отсюда .

Вот Flow версия Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) на основе ответа Джоффри:

      fun tickerFlow(start: Long,
               count: Long,
               initialDelayMs: Long,
               periodMs: Long) = flow<Long> {
    delay(initialDelayMs)

    var counter = start
    while (counter <= count) {
        emit(counter)
        counter += 1

        delay(periodMs)
    }
}

//...

tickerFlow(1, 5, 0, 1_000L)

Вы могли бы сделать что-то вроде этого...

      class JitterTimer {
    fun scheduleAtRandom(
        initialDelay : Duration = 2.seconds,
        base: Duration = 3.seconds,
        interval: Duration = 0.5.seconds
    ) = flow {
        delay(initialDelay)
        while (true) {
            emit(Unit)
            val jitter = randomizeJitter(
                baseDelay = base.inWholeMilliseconds,
                interval = interval.inWholeMilliseconds
            )
            delay(jitter)
        }
    }.cancellable()
}

//Использование

      timerJob?.cancel()
JitterTimer()
 .scheduleAtRandom(10.seconds, 3.seconds)
 .onEach {
    //Do some work
  }
 .flowOn(dispatcher)
 .catch { it.remoteLog("Recovery", "Attempts") }
 .launchIn(scope)
 .apply { timerJob = this }

Недавно использовал это для фрагментации значений на основе таймера и максимального размера буфера.

      private object Tick

@Suppress("UNCHECKED_CAST")
fun <T : Any> Flow<T>.chunked(size: Int, initialDelay: Long, delay: Long): Flow<List<T>> = flow {
    if (size <= 0) throw IllegalArgumentException("invalid chunk size $size - expected > 0")
    val chunkedList = mutableListOf<T>()
    if (delay > 0L) {
        merge(this@chunked, timerFlow(initialDelay, delay, Tick))
    } else {
        this@chunked
    }
        .collect {
            when (it) {
                is Tick -> {
                    if (chunkedList.isNotEmpty()) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
                    }
                }
                else -> {
                    chunkedList.add(it as T)
                    if (chunkedList.size >= size) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
                    }
                }
            }
        }
    if (chunkedList.isNotEmpty()) {
        emit(chunkedList.toList())
    }
}

fun <T> timerFlow(initialDelay: Long, delay: Long, o: T) = flow {
    if (delay <= 0) throw IllegalArgumentException("invalid delay $delay - expected > 0")
    if (initialDelay > 0) delay(initialDelay)
    while (currentCoroutineContext().isActive) {
        emit(o)
        delay(delay)
    }
}

Сделал копию Observable.intervalRange(0, 90, 0, 1, TimeUnit.SECONDS)(испускает предмет через 90 секунд каждую 1 секунду):

      fun intervalRange(start: Long, count: Long, initialDelay: Long = 0, period: Long, unit: TimeUnit): Flow<Long> {
        return flow<Long> {
            require(count >= 0) { "count >= 0 required but it was $count" }
            require(initialDelay >= 0) { "initialDelay >= 0 required but it was $initialDelay" }
            require(period > 0) { "period > 0 required but it was $period" }

            val end = start + (count - 1)
            require(!(start > 0 && end < 0)) { "Overflow! start + count is bigger than Long.MAX_VALUE" }

            if (initialDelay > 0) {
                delay(unit.toMillis(initialDelay))
            }

            var counter = start
            while (counter <= count) {
                emit(counter)
                counter += 1

                delay(unit.toMillis(period))
            }
        }
    }

Применение:

      lifecycleScope.launch {
intervalRange(0, 90, 0, 1, TimeUnit.SECONDS)
                .onEach {
                    Log.d(TAG, "intervalRange: ${90 - it}")
                }
                .lastOrNull()
}

Он не использует сопрограммы Kotlin, но если ваш вариант использования достаточно прост, вы всегда можете просто использовать что-то вроде fixedRateTimer или timer(документы здесь), которые разрешают собственный JVM Timer.

Я использовал RxJava intervalдля относительно простого сценария, и когда я переключился на использование таймеров, я увидел значительные улучшения производительности и памяти.

Вы также можете запустить свой код в основном потоке на Android, используя View.post() или это множество вариантов.

Единственное реальное раздражение заключается в том, что вам нужно самостоятельно отслеживать состояние старого времени, а не полагаться на RxJava, который сделает это за вас.

Но это всегда будет намного быстрее (важно, если вы делаете критические для производительности вещи, такие как анимация пользовательского интерфейса и т. Д.) И не будет иметь накладных расходов на память, как у Flowables RxJava.

Вот код вопроса с использованием fixedRateTimer:


var currentTime: LocalDateTime = LocalDateTime.now()

fixedRateTimer(period = 5000L) {
    val newTime = LocalDateTime.now()
    if (currentTime.minute != newTime.minute) {
        post { // post the below code to the UI thread to update UI stuff
            setDateTime(newTime)
        }
        currentTime = newTime
    }
}

введите описание изображения здесь

      enter code here
private val updateLiveShowTicker = flow {
    while (true) {
        emit(Unit)
        delay(1000L * UPDATE_PROGRAM_INFO_INTERVAL_SECONDS)
    }
}

private val updateShowProgressTicker = flow {
    while (true) {
        emit(Unit)
        delay(1000L * UPDATE_SHOW_PROGRESS_INTERVAL_SECONDS)
    }
}

private val liveShow = updateLiveShowTicker
    .combine(channelId) { _, channelId -> programInfoRepository.getShow(channelId) }
    .catch { emit(LiveShow(application.getString(R.string.activity_channel_detail_info_error))) }
    .shareIn(viewModelScope, SharingStarted.WhileSubscribed(), replay = 1)
    .distinctUntilChanged()

Мое решение. Теперь вы можете использовать Flow API для создания собственного потока тикеров:

Другие вопросы по тегам