Понимание баланса потребительской нагрузки monix

Я учусь monix 3,
Следующий код:

object Main extends TaskApp {
  override def runc = {
    Observable.fromIterable(1 to 10)
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Starting $i, delay = $delay")
        Thread.sleep(delay)     // Imitation of hard execution
        i
      }
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Continue $i, delay = $delay")
        Thread.sleep(delay)
        i
      }
      .consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))   //Compile error here
  }
}

приводит к ошибке компиляции:

отсутствует тип параметра
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))

Я не могу понять, что здесь не так и как заставить этот код компилироваться?

UPD
Второй вопрос, как повторить этот поток каждый n минут?

1 ответ

В качестве ответа на ваш первый вопрос вы должны явно указать параметр типа foreach:

Consumer.foreach[Int](i => println(s"End $i"))

Чтобы ответить на второй вопрос, используйте Observable.intervalAtFixedRate или же Observable.intervalAtFixedDelay,

Пожалуйста, обратитесь к Monix Scaladoc.

Надеюсь, это поможет.

Другие вопросы по тегам