Скалярный поток: параллельная мультипоточка
Я только начал использовать библиотеку scalaz-stream и хотел бы реализовать следующий сценарий. У меня есть компонент, который читает некоторые события из исходного потока (очередь скаляр), изменяет его состояние и отправляет некоторые сообщения внешним компонентам. Компонент должен иметь возможность отправлять исходящие сообщения параллельно, поэтому он не должен ждать, пока компонент A получит сообщение, прежде чем он сможет начать отправку другого сообщения. Ниже вы найдете мой код, он работает, но я скептически отношусь к этому и чувствую, что должен быть какой-то стандартный способ делать такие вещи, более идиоматический подход с использованием скаляров.
object Component {
val queue = async.boundedQueue[Event](10)
// updates the internal state and creates new event
def process(event: Event) : Event = _
val componentA: Sink[Task, Event] = _
val componentB: Sink[Task, Event] = _
val componentC: Sink[Task, Event] = _
val processorAsync: Channel[Task, Event, Event] = {
val pf: Event => Task[Event] = { inEvent =>
Task.now(process(inEvent)).flatMap { outEvent =>
val sinks = merge.mergeN(10)(Process(
send(outEvent, componentA),
send(outEvent, componentB),
send(outEvent, componentC)
)).run
sinks.map(_ => inEvent)
}
}
channel.lift(pf)
}
def send(event: Event, out: Sink[Task, Event]): Process[Task, Unit] = {
Process.eval(Task.now(event)) to out
}
def run(): Process[Task, Unit] = {
queue.dequeue through processorAsync to emptySink // toSource isn't defined for Process, only for Process0
}
val emptySink: Sink[Task, Event] = sink.lift[Task, Event](event => Task.now(event))
trait Event
}
PS: возможно, он заслуживает отдельного вопроса в SO, но ответить на него довольно просто: безопасно ли обновлять локальное состояние в скалярном канале без какой-либо синхронизации (блокировки, атомарные и т. Д.)?