Как мы можем буферизовать элементы в каждую миллисекунду и передавать каждый элемент через постоянный интервал времени
Метод onNext publishSubject вызывается непрерывно (в неравное время, приблизительно за 1 миллисекунду). Требуется, чтобы эти элементы передавались каждую 1 секунду, и данные не должны быть потеряны, значит, каждый элемент должен излучать.
publishSubject.onNext("Data1");
publishSubject.onNext("Data2");
publishSubject.onNext("Data3");
publishSubject.onNext("Data4");
publishSubject.onNext("Data5");
publishSubject.onNext("Data6");
publishSubject.onNext("Data7");
и так далее... См. Структуру кода для справки:
var publishSubject = PublishSubject.create<String>()
publishSubject.onNext(stateObject) // Executing at every milliseconds...
publishSubject
/* Business Logic Required Here ?? */
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// Should execute at every 1 second
}
Пожалуйста, помогите, спасибо заранее,
2 ответа
Как насчет хранения предметов в Deque. Затем используйте сопрограмму, которая запускает функцию приостановки, чтобы получить первый элемент deque раз в секунду?
Вот быстрый и грязный код, чтобы убедиться, что он работает. Вы можете запустить этот код онлайн на веб-сайте kotlin. Пожалуйста, имейте в виду, что я очень новичок в Kotlin.
val deque: Deque<String> = ArrayDeque()
var refMillisAdd: Long = 0
var refMillisTake: Long = 0
fun main() {
println(" Delay(ms) -> Action")
println("---------------------")
kotlinx.coroutines.runBlocking {
launch {
refMillisAdd = currentTimeMillis()
refMillisTake = currentTimeMillis()
for(i in 0..20){
oncePer10ms(i.toString())
refMillisAdd = currentTimeMillis()
}
for(i in 0..6){
oncePerSecond()
refMillisTake = currentTimeMillis()
}
}
}
}
suspend fun oncePerSecond(){
kotlinx.coroutines.delay(1_000L)
println(" ${currentTimeMillis() - refMillisTake} -> TAKE ${deque.pop()}")
}
suspend fun oncePer10ms(item: String){
kotlinx.coroutines.delay(10L)
deque.add(item)
println(" ${currentTimeMillis() - refMillisAdd} -> ADD $item")
}
Код выше печатает:
Delay(ms) -> Action
---------------------
17 -> ADD 0
11 -> ADD 1
10 -> ADD 2
10 -> ADD 3
10 -> ADD 4
10 -> ADD 5
10 -> ADD 6
10 -> ADD 7
10 -> ADD 8
11 -> ADD 9
10 -> ADD 10
10 -> ADD 11
10 -> ADD 12
11 -> ADD 13
10 -> ADD 14
10 -> ADD 15
11 -> ADD 16
10 -> ADD 17
10 -> ADD 18
10 -> ADD 19
11 -> ADD 20
1223 -> TAKE 0
1000 -> TAKE 1
1000 -> TAKE 2
1001 -> TAKE 3
1000 -> TAKE 4
1000 -> TAKE 5
1000 -> TAKE 6
Это расширение функции для Observable
класс именно то, что вам нужно:
fun <T> Observable<T>.delayBetweenItems(timeout: Long, unit: TimeUnit): Observable<T> =
Observable.zip(this, Observable.interval(timeout, unit), BiFunction<T, Long, T> { item, _ -> item })
Вы можете объявить это в некотором служебном классе в вашем проекте, а затем применить его так же, как и другие операторы RxJava:
publishSubject
.delayBetweenItems(1000, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// Should execute at every 1 second
}