Как сделать поток источника Акка FIFO очереди?

Я разрабатываю систему торговли акциями с использованием Акки. Я предлагаю книги заказов в TradeQueue, как показано ниже:

val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
  println("TradeTask Start:"+task)
  task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
     log.info("TradeTask finish:"+task)
 }))(Keep.left).run()


 for (item <- 1 to 100) {
    val task = TradeTask(item)
    tradeQueue.offer(task)
 }   

Но последовательность неупорядочена.

как это:

TradeTask Начало : TradeTask (1)

TradeTask Начало :TradeTask(2)

TradeTask закончить : TradeTask (1)

TradeTask закончить :TradeTask(2)

Но я хочу FIFO и элемент enqueue до предыдущего финиша , вот так

TradeTask Начало : TradeTask (1)

TradeTask закончить : TradeTask (1)

TradeTask Начало :TradeTask(2)

TradeTask закончить :TradeTask(2)

Как это сделать? Спасибо

1 ответ

Решение

Уже ФИФО

Ваш вопрос уже доказывает, что очередь "(F)irst (I)n (F)irst (O)ut". Как показано на выходе первый элемент для ввода потока, TradeTask(1)был первым элементом, который должен быть обработан Sink:

TradeTask Start:TradeTask(1)    // <-- FIRST IN

TradeTask Start:TradeTask(2)

TradeTask finish:TradeTask(1)   // <-- FIRST OUT

TradeTask finish:TradeTask(2)

Косвенный ответ

Вопрос, который вы задаете, полностью противоположен цели / использованию akka-stream. Вы спрашиваете, как создать поток akka, который выполняет всю обработку последовательно, а не асинхронно.

Все дело в akka - асинхронная обработка и связь:

Добро пожаловать в Akka, набор библиотек с открытым исходным кодом для разработки масштабируемых, отказоустойчивых систем, охватывающих процессорные ядра и сети.

Если вы хотите, чтобы обработка выполнялась поочередно, тогда зачем в первую очередь использовать akka? Синхронная обработка Iterable элементы легко выполняются, без акки, используя коллекции scala и для понимания:

val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???

val logTask : Task => Unit = 
  (task) => log.info("TradeTask finish:" + task)

for {
  item    <- 1 to 100
  task    <- TradeTask(item)
  aResult <- functionA(task)
  bResult <- functionB(aResult)
  cResult <- functionC(bResult)
} { 
  logTask(cResult)
}

Точно так же вы можете использовать композицию функций и упрощенную итерацию:

val compositeFunction : Int => Unit = 
  TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask

(1 to 100) foreach compositeFunction
Другие вопросы по тегам