Скалярный поток: параллельная мультипоточка

Я только начал использовать библиотеку scalaz-stream и хотел бы реализовать следующий сценарий. У меня есть компонент, который читает некоторые события из исходного потока (очередь скаляр), изменяет его состояние и отправляет некоторые сообщения внешним компонентам. Компонент должен иметь возможность отправлять исходящие сообщения параллельно, поэтому он не должен ждать, пока компонент A получит сообщение, прежде чем он сможет начать отправку другого сообщения. Ниже вы найдете мой код, он работает, но я скептически отношусь к этому и чувствую, что должен быть какой-то стандартный способ делать такие вещи, более идиоматический подход с использованием скаляров.

  object Component {

    val queue = async.boundedQueue[Event](10)

    // updates the internal state and creates new event
    def process(event: Event) : Event = _

    val componentA: Sink[Task, Event] = _
    val componentB: Sink[Task, Event] = _
    val componentC: Sink[Task, Event] = _

    val processorAsync: Channel[Task, Event, Event] = {
      val pf: Event => Task[Event] = { inEvent =>
        Task.now(process(inEvent)).flatMap { outEvent =>
          val sinks = merge.mergeN(10)(Process(
            send(outEvent, componentA),
            send(outEvent, componentB),
            send(outEvent, componentC)
          )).run
          sinks.map(_ => inEvent)
        }
      }
      channel.lift(pf)
    }

    def send(event: Event, out: Sink[Task, Event]): Process[Task, Unit] = {
      Process.eval(Task.now(event)) to out
    }

    def run(): Process[Task, Unit] = {
      queue.dequeue through processorAsync to emptySink // toSource isn't defined for Process, only for Process0
    }

    val emptySink: Sink[Task, Event] = sink.lift[Task, Event](event => Task.now(event))

    trait Event

  }

PS: возможно, он заслуживает отдельного вопроса в SO, но ответить на него довольно просто: безопасно ли обновлять локальное состояние в скалярном канале без какой-либо синхронизации (блокировки, атомарные и т. Д.)?

0 ответов

Другие вопросы по тегам