Каков рекомендуемый способ отложить buildSequence Kotlin?

Я пытаюсь опросить API-интерфейс с нумерацией страниц и предоставлять пользователю новые элементы по мере их появления.

fun connect(): Sequence<T> = buildSequence {
    while (true) {
        // result is a List<T>
        val result = dataSource.getFirstPage()
        yieldAll(/* the new data in `result` */)

        // Block the thread for a little bit
    }
}

Вот пример использования:

for (item in connect()) {
    // do something as each item is made available
}

Моей первой мыслью было использовать delay функция, но я получаю это сообщение:

Ограниченные приостановленные функции могут вызывать только функции приостановки члена или расширения в своей ограниченной области сопрограмм.

Это подпись для buildSequence:

public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>

Я думаю, что это сообщение означает, что я могу использовать только suspend функции в SequenceBuilder: yield а также yieldAll и что с помощью произвольного suspend вызовы функций не допускаются.

Прямо сейчас я использую это, чтобы блокировать построение последовательности на одну секунду после каждого опроса API:

val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
    // do nothing
}

Это работает, но на самом деле это не похоже на хорошее решение. Кто-нибудь сталкивался с этой проблемой раньше?

2 ответа

Решение

Почему это не работает? Некоторые исследования

Когда мы смотрим на buildSequenceмы можем видеть, что это занимает builderAction: suspend SequenceBuilder<T>.() -> Unit в качестве аргумента. Как клиент этого метода, вы сможете передать suspend лямбда, которая имеет SequenceBuilder как его приемник (о лямбде с приемником читайте здесь).
SequenceBuilder Сам аннотируется RestrictSuspension:

@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...

Аннотация определяется и комментируется следующим образом:

/**
 * Classes and interfaces marked with this annotation are restricted
 * when used as receivers for extension `suspend` functions. 
 * These `suspend` extensions can only invoke other member or extension     
 * `suspend` functions on this particular receiver only 
 * and are restricted from calling arbitrary suspension functions.
 */
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension

Как RestrictSuspension документация говорит, в случае buildSequenceВы можете пройти лямбда с SequenceBuilder в качестве получателя, но с ограниченными возможностями, так как вы сможете звонить только "другой участник или добавочный номер" suspend функции на этом конкретном приемнике ". Это означает, что блок передан buildSequence может вызвать любой метод, определенный на SequenceBuilder (лайк yield, yieldAll). Поскольку, с другой стороны, блок "ограничен от вызова произвольных функций приостановки", используя delay не работает. Получающаяся ошибка компилятора подтверждает это:

Ограниченные приостановленные функции могут вызывать только функции приостановки члена или расширения в своей ограниченной области сопрограмм.

В конечном счете, вы должны знать, что buildSequence создает сопрограмму, которая является примером синхронной сопрограммы. В вашем примере код последовательности будет выполнен в том же потоке, который использует последовательность, вызвав connect(),

Как отложить последовательность?

Как мы узнали, buildSequence создает синхронную последовательность. Здесь можно использовать обычную блокировку потоков:

fun connect(): Sequence<T> = buildSequence {
    while (true) {
        val result = dataSource.getFirstPage()
        yieldAll(result)
        Thread.sleep(1000)
    }
}

Но вы действительно хотите, чтобы весь поток был заблокирован? В качестве альтернативы, вы можете реализовать асинхронные последовательности, как описано здесь. В результате, используя delay и другие функции приостановки будут действительны.

Просто для альтернативного решения...

Если то, что вы действительно пытаетесь сделать, это асинхронное создание элементов, вы можете использовать потоки , которые в основном представляют собой асинхронные последовательности.

Вот краткая таблица:

Вы можете преобразовать свойSequence<T>кFlow<T>путем заменыsequence { ... }строитель сflow { ... }строитель, а затем заменитьyield/yieldAllсemit/emitAll:

      fun example(): Flow<String> = flow {
    (1..5).forEach { getString().let { emit(it) } }
}

suspend fun getString(): String = { ... }

Итак, для вашего примера:

      fun connect(): Flow<T> = flow {
    while (true) {

        // Call suspend function to get data from dataSource
        val result: List<T> = dataSource.getFirstPage()
        emitAll(result)

        // _Suspend_ for a little bit
        delay(1000)
    }
}
Другие вопросы по тегам