Мойка для построчного файла IO с противодавлением
У меня есть задание по обработке файлов, которое в настоящее время использует акторы akka с управляемым вручную противодавлением для обработки конвейера обработки, но мне никогда не удавалось успешно справиться с противодавлением на этапе чтения входного файла.
Это задание принимает входной файл и группирует строки по идентификационному номеру, присутствующему в начале каждой строки, а затем, как только он попадает в строку с новым идентификационным номером, он отправляет сгруппированные строки актору обработки через сообщение, а затем продолжает с новый идентификационный номер, вплоть до конца файла.
Похоже, это было бы хорошим вариантом использования Akka Streams, использующего Файл в качестве приемника, но я все еще не уверен в трех вещах:
1) Как я могу прочитать файл построчно?
2) Как сгруппировать по идентификатору, присутствующему в каждой строке? В настоящее время я использую очень императивную обработку для этого, и я не думаю, что у меня будет такая же способность в потоковом конвейере.
3) Как я могу применить противодавление, чтобы я не держал строки в памяти быстрее, чем могу обработать данные в нисходящем направлении?
2 ответа
Акка ручьи groupBy
это один подход. Но у группы есть maxSubstreams
параметр, который потребовал бы, чтобы вы знали, что максимальное число идентификаторов находится в начале. Итак: решение ниже использует scan
идентифицировать блоки с одинаковыми идентификаторами и splitWhen
разделить на подпотоки:
object Main extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
def extractId(s: String) = {
val a = s.split(",")
a(0) -> a(1)
}
val file = new File("/tmp/example.csv")
private val lineByLineSource = FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
val future: Future[Done] = lineByLineSource
.map(extractId)
.scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) )
.drop(1)
.splitWhen(_._1)
.fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) ))
.concatSubstreams
.runForeach(println)
private val reply = Await.result(future, 10 seconds)
println(s"Received $reply")
Await.ready(system.terminate(), 10 seconds)
}
extractId
разбивает строки на id -> кортежи данных. scan
prepends id -> кортежи данных с флагом начала ID диапазона. drop
бросает элемент грунтовки в scan
, splitwhen
запускает новый подпоток для каждого начала диапазона. fold
объединяет подпотоки в списки и удаляет логическое значение диапазона начала идентификатора, так что каждый подпоток создает один элемент. Вместо сгиба вы, вероятно, хотите кастом SubFlow
который обрабатывает потоки строк для одного идентификатора и генерирует некоторый результат для диапазона идентификаторов. concatSubstreams
объединяет подпотоки для диапазона идентификаторов, создаваемые splitWhen, обратно в один поток, который печатается runForEach
,
Бежать с:
$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof
Выход:
(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))
Похоже, что самый простой способ добавить "обратное давление" в вашу систему без внесения огромных изменений - это просто изменить тип почтового ящика групп ввода, потребляющих Actor, на BoundedMailbox.
Измените тип актера, который потребляет ваши линии на BoundedMailbox с высоким
mailbox-push-timeout-time
:bounded-mailbox { mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" mailbox-capacity = 1 mailbox-push-timeout-time = 1h } val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox"))
Создайте итератор из вашего файла, создайте сгруппированный (по идентификатору) итератор из этого итератора. Затем просто перебирайте данные, отправляя группы потребителю Actor. Обратите внимание, что отправка будет блокироваться в этом случае, когда почтовый ящик Актера заполнится.
def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = { def rec(s: Stream[A]): Stream[Seq[A]] = if (s.isEmpty) Stream.empty else { s.span(keyFun(s.head) == keyFun(_)) match { case (prefix, suffix) => prefix.toList #:: rec(suffix) } } rec(iter.toStream).toIterator } val lines = Source.fromFile("input.file").getLines() iterGroupBy(lines){l => l.headOption}.foreach { lines:Seq[String] => actor.tell(lines, ActorRef.noSender) }
Это оно! Возможно, вы захотите переместить материал для чтения файлов в отдельный поток, так как он собирается блокироваться. Также, регулируя mailbox-capacity
Вы можете регулировать количество потребляемой памяти. Но если чтение пакета из файла всегда быстрее, чем обработка, кажется разумным сохранить небольшую емкость, например 1 или 2.
обн iterGroupBy
реализовано с Stream
проверено не производить Stackru
,