Каков рекомендуемый способ отложить buildSequence Kotlin?
Я пытаюсь опросить API-интерфейс с нумерацией страниц и предоставлять пользователю новые элементы по мере их появления.
fun connect(): Sequence<T> = buildSequence {
while (true) {
// result is a List<T>
val result = dataSource.getFirstPage()
yieldAll(/* the new data in `result` */)
// Block the thread for a little bit
}
}
Вот пример использования:
for (item in connect()) {
// do something as each item is made available
}
Моей первой мыслью было использовать delay
функция, но я получаю это сообщение:
Ограниченные приостановленные функции могут вызывать только функции приостановки члена или расширения в своей ограниченной области сопрограмм.
Это подпись для buildSequence
:
public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>
Я думаю, что это сообщение означает, что я могу использовать только suspend
функции в SequenceBuilder: yield
а также yieldAll
и что с помощью произвольного suspend
вызовы функций не допускаются.
Прямо сейчас я использую это, чтобы блокировать построение последовательности на одну секунду после каждого опроса API:
val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
// do nothing
}
Это работает, но на самом деле это не похоже на хорошее решение. Кто-нибудь сталкивался с этой проблемой раньше?
2 ответа
Почему это не работает? Некоторые исследования
Когда мы смотрим на buildSequence
мы можем видеть, что это занимает builderAction: suspend SequenceBuilder<T>.() -> Unit
в качестве аргумента. Как клиент этого метода, вы сможете передать suspend
лямбда, которая имеет SequenceBuilder
как его приемник (о лямбде с приемником читайте здесь). SequenceBuilder
Сам аннотируется RestrictSuspension
:
@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...
Аннотация определяется и комментируется следующим образом:
/**
* Classes and interfaces marked with this annotation are restricted
* when used as receivers for extension `suspend` functions.
* These `suspend` extensions can only invoke other member or extension
* `suspend` functions on this particular receiver only
* and are restricted from calling arbitrary suspension functions.
*/
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension
Как RestrictSuspension
документация говорит, в случае buildSequence
Вы можете пройти лямбда с SequenceBuilder
в качестве получателя, но с ограниченными возможностями, так как вы сможете звонить только "другой участник или добавочный номер" suspend
функции на этом конкретном приемнике ". Это означает, что блок передан buildSequence
может вызвать любой метод, определенный на SequenceBuilder
(лайк yield
, yieldAll
). Поскольку, с другой стороны, блок "ограничен от вызова произвольных функций приостановки", используя delay
не работает. Получающаяся ошибка компилятора подтверждает это:
Ограниченные приостановленные функции могут вызывать только функции приостановки члена или расширения в своей ограниченной области сопрограмм.
В конечном счете, вы должны знать, что buildSequence
создает сопрограмму, которая является примером синхронной сопрограммы. В вашем примере код последовательности будет выполнен в том же потоке, который использует последовательность, вызвав connect()
,
Как отложить последовательность?
Как мы узнали, buildSequence
создает синхронную последовательность. Здесь можно использовать обычную блокировку потоков:
fun connect(): Sequence<T> = buildSequence {
while (true) {
val result = dataSource.getFirstPage()
yieldAll(result)
Thread.sleep(1000)
}
}
Но вы действительно хотите, чтобы весь поток был заблокирован? В качестве альтернативы, вы можете реализовать асинхронные последовательности, как описано здесь. В результате, используя delay
и другие функции приостановки будут действительны.
Просто для альтернативного решения...
Если то, что вы действительно пытаетесь сделать, это асинхронное создание элементов, вы можете использовать потоки , которые в основном представляют собой асинхронные последовательности.
Вот краткая таблица:
Вы можете преобразовать свойSequence<T>
кFlow<T>
путем заменыsequence { ... }
строитель сflow { ... }
строитель, а затем заменитьyield/yieldAll
сemit/emitAll
:
fun example(): Flow<String> = flow {
(1..5).forEach { getString().let { emit(it) } }
}
suspend fun getString(): String = { ... }
Итак, для вашего примера:
fun connect(): Flow<T> = flow {
while (true) {
// Call suspend function to get data from dataSource
val result: List<T> = dataSource.getFirstPage()
emitAll(result)
// _Suspend_ for a little bit
delay(1000)
}
}