Понимание баланса потребительской нагрузки monix
Я учусь monix 3
,
Следующий код:
object Main extends TaskApp {
override def runc = {
Observable.fromIterable(1 to 10)
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Starting $i, delay = $delay")
Thread.sleep(delay) // Imitation of hard execution
i
}
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Continue $i, delay = $delay")
Thread.sleep(delay)
i
}
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i")))) //Compile error here
}
}
приводит к ошибке компиляции:
отсутствует тип параметра
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))
Я не могу понять, что здесь не так и как заставить этот код компилироваться?
UPD
Второй вопрос, как повторить этот поток каждый n
минут?
1 ответ
В качестве ответа на ваш первый вопрос вы должны явно указать параметр типа foreach
:
Consumer.foreach[Int](i => println(s"End $i"))
Чтобы ответить на второй вопрос, используйте Observable.intervalAtFixedRate
или же Observable.intervalAtFixedDelay
,
Пожалуйста, обратитесь к Monix Scaladoc.
Надеюсь, это поможет.