Scala потоковая передача живого / растущего файла
Мое приложение Scala запускает внешний процесс, который записывает файл на диск. В отдельном потоке я хочу прочитать этот файл и скопировать его содержимое в OutputStream
пока процесс не будет завершен и файл больше не будет расти.
Есть несколько крайних случаев, чтобы рассмотреть:
- Файл может еще не существовать, когда поток готов к запуску.
- Поток может копировать быстрее, чем процесс пишет. Другими словами, он может достигать конца файла, пока файл все еще растет.
Кстати, я могу передать нить processCompletionFuture
переменная, которая указывает, когда файл закончил расти.
Есть ли элегантный и эффективный способ сделать это? Возможно, используя Akka Streams или актеров? (Я пытался использовать Akka Stream из FileInputStream
, но кажется, что поток заканчивается, как только во входном потоке больше нет байтов, что происходит в случае № 2).
1 ответ
Alpakka, библиотека, построенная на Akka Streams, имеет FileTailSource
Утилита, которая имитирует tail -f
Unix команда. Например:
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._
val path: Path = ???
val maxLineSize = 10000
val tailSource: Source[ByteString, NotUsed] = FileTailSource(
path = path,
maxChunkSize = maxLineSize,
startingPosition = 0,
pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))
Выше tailSource
построчно читает весь файл и непрерывно читает только что добавленные данные каждые 500 миллисекунд. Чтобы скопировать содержимое потока в OutputStream
, подключите источник к StreamConverters.fromOutputStream
раковина:
val stream: Future[IOResult] =
tailSource
.runWith(StreamConverters.fromOutputStream(() => new OutputStream {
override def write(i: Int): Unit = ???
override def write(bytes: Array[Byte]): Unit = ???
}))
(Обратите внимание, что есть FileTailSource.lines
метод, который производит Source[String, NotUsed]
, но в этом случае более удачно работать с ByteString
вместо String
, Вот почему в примере используются FileTailSource.apply()
, который производит Source[ByteString, NotUsed]
.)
Поток потерпит неудачу, если файл не существует во время материализации. Поэтому вам нужно подтвердить существование файла перед запуском потока. Это может быть излишним, но одна идея состоит в том, чтобы использовать Alpakka DirectoryChangesSource
для этого.