Как сделать поток источника Акка FIFO очереди?
Я разрабатываю систему торговли акциями с использованием Акки. Я предлагаю книги заказов в TradeQueue, как показано ниже:
val tradeQueue = Source.queue[TradeTask](1, OverflowStrategy.backpressure)
.map(task=>{
println("TradeTask Start:"+task)
task
})
.via(ProcessA)
.via(ProcessC)
.via(ProcessC)
.toMat(Sink.foreach(task => {
log.info("TradeTask finish:"+task)
}))(Keep.left).run()
for (item <- 1 to 100) {
val task = TradeTask(item)
tradeQueue.offer(task)
}
Но последовательность неупорядочена.
как это:
TradeTask Начало : TradeTask (1)
TradeTask Начало :TradeTask(2)
TradeTask закончить : TradeTask (1)
TradeTask закончить :TradeTask(2)
Но я хочу FIFO и элемент enqueue до предыдущего финиша , вот так
TradeTask Начало : TradeTask (1)
TradeTask закончить : TradeTask (1)
TradeTask Начало :TradeTask(2)
TradeTask закончить :TradeTask(2)
Как это сделать? Спасибо
1 ответ
Уже ФИФО
Ваш вопрос уже доказывает, что очередь "(F)irst (I)n (F)irst (O)ut". Как показано на выходе первый элемент для ввода потока, TradeTask(1)
был первым элементом, который должен быть обработан Sink
:
TradeTask Start:TradeTask(1) // <-- FIRST IN
TradeTask Start:TradeTask(2)
TradeTask finish:TradeTask(1) // <-- FIRST OUT
TradeTask finish:TradeTask(2)
Косвенный ответ
Вопрос, который вы задаете, полностью противоположен цели / использованию akka-stream. Вы спрашиваете, как создать поток akka, который выполняет всю обработку последовательно, а не асинхронно.
Все дело в akka - асинхронная обработка и связь:
Добро пожаловать в Akka, набор библиотек с открытым исходным кодом для разработки масштабируемых, отказоустойчивых систем, охватывающих процессорные ядра и сети.
Если вы хотите, чтобы обработка выполнялась поочередно, тогда зачем в первую очередь использовать akka? Синхронная обработка Iterable
элементы легко выполняются, без акки, используя коллекции scala и для понимания:
val functionA : Task => Task = ???
val functionB : Task => Task = ???
val functionC : Task => Task = ???
val logTask : Task => Unit =
(task) => log.info("TradeTask finish:" + task)
for {
item <- 1 to 100
task <- TradeTask(item)
aResult <- functionA(task)
bResult <- functionB(aResult)
cResult <- functionC(bResult)
} {
logTask(cResult)
}
Точно так же вы можете использовать композицию функций и упрощенную итерацию:
val compositeFunction : Int => Unit =
TradeTask.apply andThen functionA andThen functionB andThen functionC andThen logTask
(1 to 100) foreach compositeFunction