Перебирать строки в файле параллельно (Scala)?

Я знаю о параллельных коллекциях в Scala. Они удобны! Тем не менее, я хотел бы перебирать строки файла, который слишком велик для памяти, параллельно. Я мог бы, например, создать потоки и установить блокировку для сканера, но было бы здорово, если бы я мог запустить такой код, как:

Source.fromFile(path).getLines.par foreach { line =>

К сожалению, однако

error: value par is not a member of Iterator[String]

Какой самый простой способ добиться некоторого параллелизма здесь? Сейчас я буду читать некоторые строки и обрабатывать их параллельно.

6 ответов

Решение

Вы можете использовать группировку, чтобы легко разделить итератор на куски, которые можно загрузить в память, а затем обработать параллельно.

val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines => 
    lines.par.foreach { line => process(line) }
}

На мой взгляд, что-то вроде этого - самый простой способ сделать это.

Я поставлю это как отдельный ответ, так как он принципиально отличается от моего последнего (и он действительно работает)

Вот схема решения с использованием актеров, что в основном и описывает комментарий Ким Стебель. Существует два класса акторов, один актер FileReader, который читает отдельные строки из файла по требованию, и несколько актеров Worker. Все рабочие отправляют запросы на строки в считыватель и обрабатывают строки параллельно, когда они считываются из файла.

Я использую актеров Akka здесь, но использование другой реализации в основном та же идея.

case object LineRequest
case object BeginProcessing

class FileReader extends Actor {

  //reads a single line from the file or returns None if EOF
  def getLine:Option[String] = ...

  def receive = {
    case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
  }
}

class Worker(reader: ActorRef) extends Actor {

  def process(line:String) ...

  def receive = {
    case BeginProcessing => reader ! LineRequest
    case Some(line) => {
      process(line)
      reader ! LineRequest
    }
    case None => self.stop
  }
}

val reader = actorOf[FileReader].start    
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...

Таким образом, в памяти одновременно может храниться не более 4 (или сколько у вас рабочих) необработанных строк.

Ниже помог мне добиться

source.getLines.toStream.par.foreach( line => println(line))

Я понимаю, что это старый вопрос, но вы можете найти ParIterator реализация в библиотеке итератов, чтобы быть полезной реализацией этого, не требующей сборки:

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads

Комментарии к ответу Дэна Саймона заставили меня задуматься. Почему бы нам не попробовать обернуть источник в поток:

def src(source: Source) = Stream[String] = {
  if (source.hasNext) Stream.cons(source.takeWhile( _ != '\n' ).mkString)
  else Stream.empty
}

Тогда вы можете использовать его параллельно так:

src(Source.fromFile(path)).par foreach process

Я попробовал это, и он компилируется и работает в любом случае. Я не совсем уверен, загружает ли он весь файл в память или нет, но я не думаю, что это так.

В итоге мы создали нестандартное решение в нашей компании, чтобы точно понимать параллелизм.

Другие вопросы по тегам