Scala потоковая передача живого / растущего файла

Мое приложение Scala запускает внешний процесс, который записывает файл на диск. В отдельном потоке я хочу прочитать этот файл и скопировать его содержимое в OutputStream пока процесс не будет завершен и файл больше не будет расти.

Есть несколько крайних случаев, чтобы рассмотреть:

  1. Файл может еще не существовать, когда поток готов к запуску.
  2. Поток может копировать быстрее, чем процесс пишет. Другими словами, он может достигать конца файла, пока файл все еще растет.

Кстати, я могу передать нить processCompletionFuture переменная, которая указывает, когда файл закончил расти.

Есть ли элегантный и эффективный способ сделать это? Возможно, используя Akka Streams или актеров? (Я пытался использовать Akka Stream из FileInputStream, но кажется, что поток заканчивается, как только во входном потоке больше нет байтов, что происходит в случае № 2).

1 ответ

Решение

Alpakka, библиотека, построенная на Akka Streams, имеет FileTailSource Утилита, которая имитирует tail -f Unix команда. Например:

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._

val path: Path = ???

val maxLineSize = 10000

val tailSource: Source[ByteString, NotUsed] = FileTailSource(
  path = path,
  maxChunkSize = maxLineSize,
  startingPosition = 0,
  pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))

Выше tailSource построчно читает весь файл и непрерывно читает только что добавленные данные каждые 500 миллисекунд. Чтобы скопировать содержимое потока в OutputStream, подключите источник к StreamConverters.fromOutputStream раковина:

val stream: Future[IOResult] =
  tailSource
    .runWith(StreamConverters.fromOutputStream(() => new OutputStream {
      override def write(i: Int): Unit = ???
      override def write(bytes: Array[Byte]): Unit = ???
    }))

(Обратите внимание, что есть FileTailSource.lines метод, который производит Source[String, NotUsed], но в этом случае более удачно работать с ByteString вместо String, Вот почему в примере используются FileTailSource.apply(), который производит Source[ByteString, NotUsed].)

Поток потерпит неудачу, если файл не существует во время материализации. Поэтому вам нужно подтвердить существование файла перед запуском потока. Это может быть излишним, но одна идея состоит в том, чтобы использовать Alpakka DirectoryChangesSource для этого.

Другие вопросы по тегам