Идиоматический способ обработки нескольких одновременных потоков в Scala

У меня есть список потоков, которые при вызове их next() будет спать случайное количество времени, а затем читать один символ из другого источника.

Я пытаюсь написать потребителя (ей), которые будут продолжать вызывать эти потоки до EOF и создать общий словарь этих потоков во время выполнения.

Пока я использую ConcurrentHashMap для словаря и просто создания нового потока для каждого из потоковых потребителей.

хотя мое решение работает, оно кажется очень наивным, и мне интересно, есть ли лучшее применение для потоковой библиотеки, такой как monix или же fs2

1 ответ

Решение

Основываясь на описании вопроса и последующих комментариях, я предполагаю, что существует несколько Iterator[Char] источники:

val allSources : Iterable[Iterator[Char]] = ???

И вопрос в том, как одновременно собирать String значения из этих итераторов, чтобы сформировать отображение String для подсчета.

Потоковое решение

Сначала нам нужно преобразовать каждый из итераторов в итератор значений String на основе разделителя:

trait Word {
  val data : String
}

object EmptyWord extends Word {
  override val data = ""
}

case class PartialWord(val data : String) extends Word

case class WholeWord(val data : String) extends Word

val appendToWord : Char => (Word, Char) => Word = 
  (separator) => (originalWord, appendChar) => originalWord match {
    case PartialWord(d) => 
      if(appendChar == separator)
        WholeWord(d)
      else
        PartialWord(d + appendChar)
    case _ => PartialWord(appendChar.toString)
  }

val isWholeWord : Word => Boolean = (_ : Word) match {
  case _ : WholeWord => true
  case _             => false
}

//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] = 
  (_ : Iterator[Char])
    .scanLeft(EmptyWord)(appendToWord(' '))
    .filter(isWholeWord)
    .map(_.data)

Теперь мы можем преобразовать все итераторы для генерации строк и объединить все итераторы в один итератор:

val allWordSource : Iterator[String] = 
  allSources.map(convertCharIterator)
            .reduceOption( _ ++ _)
            .getOrElse(Iterator.empty[String])

Этот итератор теперь может быть источником потока akka, который вычислит ваш счет:

val addToCounter : (Map[String, Int], String) => Map[String, Int] = 
  (counter, word) => 
    counter.updated(word, counter.getOrElse(word, 0) + 1)

val counter : Future[Map[String, Int]] = 
  Source
    .fromIterator( () => allWordSource)
    .runFold(Map.empty[String, Int])(addToCounter)
Другие вопросы по тегам