Идиоматический способ обработки нескольких одновременных потоков в Scala
У меня есть список потоков, которые при вызове их next()
будет спать случайное количество времени, а затем читать один символ из другого источника.
Я пытаюсь написать потребителя (ей), которые будут продолжать вызывать эти потоки до EOF
и создать общий словарь этих потоков во время выполнения.
Пока я использую ConcurrentHashMap
для словаря и просто создания нового потока для каждого из потоковых потребителей.
хотя мое решение работает, оно кажется очень наивным, и мне интересно, есть ли лучшее применение для потоковой библиотеки, такой как monix
или же fs2
1 ответ
Основываясь на описании вопроса и последующих комментариях, я предполагаю, что существует несколько Iterator[Char]
источники:
val allSources : Iterable[Iterator[Char]] = ???
И вопрос в том, как одновременно собирать String
значения из этих итераторов, чтобы сформировать отображение String для подсчета.
Потоковое решение
Сначала нам нужно преобразовать каждый из итераторов в итератор значений String на основе разделителя:
trait Word {
val data : String
}
object EmptyWord extends Word {
override val data = ""
}
case class PartialWord(val data : String) extends Word
case class WholeWord(val data : String) extends Word
val appendToWord : Char => (Word, Char) => Word =
(separator) => (originalWord, appendChar) => originalWord match {
case PartialWord(d) =>
if(appendChar == separator)
WholeWord(d)
else
PartialWord(d + appendChar)
case _ => PartialWord(appendChar.toString)
}
val isWholeWord : Word => Boolean = (_ : Word) match {
case _ : WholeWord => true
case _ => false
}
//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] =
(_ : Iterator[Char])
.scanLeft(EmptyWord)(appendToWord(' '))
.filter(isWholeWord)
.map(_.data)
Теперь мы можем преобразовать все итераторы для генерации строк и объединить все итераторы в один итератор:
val allWordSource : Iterator[String] =
allSources.map(convertCharIterator)
.reduceOption( _ ++ _)
.getOrElse(Iterator.empty[String])
Этот итератор теперь может быть источником потока akka, который вычислит ваш счет:
val addToCounter : (Map[String, Int], String) => Map[String, Int] =
(counter, word) =>
counter.updated(word, counter.getOrElse(word, 0) + 1)
val counter : Future[Map[String, Int]] =
Source
.fromIterator( () => allWordSource)
.runFold(Map.empty[String, Int])(addToCounter)